Loading Data into Snowflake using Snowpark DataFrames

Factspan
5 min readSep 9, 2024

--

Loading Data into Snowflake using Snowpark DataFrames

Why this blog?

Managing data within Snowflake can be challenging, especially when dealing with complex data pipelines and various data sources. Developers often struggle with efficiently writing data from DataFrames into Snowflake tables while maintaining data integrity and performance. This blog helps by offering practical solutions and clear examples, guiding you through different writing modes, table types, and view creation methods. By following this guide, you’ll enhance your data engineering capabilities and streamline your data workflows in Snowflake.

Overview

Snowpark is a developer framework from Snowflake that allows developers to interact with Snowflake directly and build complex data pipelines using Python.

This article explores using Snowpark DataFrames to write data into Snowflake tables.

How can you write data from a Snowpark DataFrame into a Snowflake table?

DataFrameWriter class in Snowflake provides methods for writing data from a DataFrame to desired destinations within the Snowflake ecosystem.

To write data from DataFrame into a table:

  • Create a DataFrame containing the data to be written into a Snowflake table.
  • Create a DataFrameWriter object by calling the DataFrame.write property on the DataFrame.
  • Specify the write mode by calling the mode() method on the DataFrameWriter object. This returns a new DataFrameWriter object that is configured with the specified mode.
  • Call the save_as_table method on the DataFrameWriter object to save the contents of the DataFrame to a specified table.

Syntax:

DataFrame.write.mode(save_mode).save_as_table(table_name)

Methods:

mode(save_mode): Configures the DataFrameWriter’s save mode. The following values are supported by the save_mode

  • “append”: Appends data from the DataFrame to the existing table. If the table does not exist, it creates a new one.
  • “overwrite”: Overwrite the existing table with the data from the DataFrame.
  • “errorifexists”: Throws an exception if the table already exists.
  • “ignore”: Ignore this operation if the table already exists.

save_as_table(table_name) : Writes the data to the specified table in a Snowflake database

Demonstration:

To illustrate, let’s read data from an existing Snowflake table and construct a DataFrame. The data that we have extracted from Snowflake will next be converted, and in the end, the changed data will be saved as a new table.

In the example below, data is read from a Snowflake table

--- ("SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER")
--- filtered rows where "C_NATIONKEY" is "15," and only the columns "C_CUSTKEY" and "C_NAME" are selected.
from snowflake.snowpark.functions import coldf_customer = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER")
df_customer_filter = df_customer.filter(col("C_NATIONKEY")=='15')
df_customer_select = df_customer_filter.select(col("C_CUSTKEY"),col("C_NAME"))

OverWrite Data:

The following code writes the contents of the df_customer_select DataFrame to the specified Snowflake table, overwriting the table’s existing data if it already exists.

customer_wrt= df_customer_select.write.mode("overwrite").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER")

When this code runs, the Snowpark API in Snowflake translates it and runs it as SQL. The SQL statement that is produced is as follows:

CREATE OR REPLACE TABLE SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER (
    "C_CUSTKEY" BIGINT NOT NULL,    "C_NAME" STRING(25) NOT NULL) AS SELECT * FROM (    SELECT "C_CUSTKEY", "C_NAME" FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER WHERE ("C_NATIONKEY" = '15'));

The following code confirms that the table is created and displays the count of records loaded into the CUSTOMER table.

session.table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER").count()
+----+|5921|+----+

Append Data:

The following code appends the contents of the df_customer_select DataFrame to the specified Snowflake table, adding new records to the existing ones.

customer_wrt = df_customer_select.write.mode("append").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER")

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API. The resulting SQL statement is as follows:

  • Verifies if specified table already exists
  • Show tables like ‘CUSTOMER’ in schema SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA
  • Inserts data as table is existing already; otherwise creates the table and inserts data.
INSERT INTO SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER
SELECT "C_CUSTKEY", "C_NAME"FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMERWHERE ("C_NATIONKEY" = '15')

The following code displays the count of records in the CUSTOMER table. Since we have already created and inserted data in the preceding step, the record count indicates that data got appended.

session.table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER").count()
+------+|11842 |+------+

Ignore Data:

The following code ignores the write operation if the specified table already exists.

customer_wrt = df_customer_select.write.mode("errorifexists").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER")

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API.

The resulting SQL statement is as follows:

--- Creates table only if not already existing
CREATE TABLE IF NOT EXISTS SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER (    "C_CUSTKEY" BIGINT NOT NULL,    "C_NAME" STRING(25) NOT NULL) ASSELECT * FROM (    SELECT "C_CUSTKEY", "C_NAME"    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER    WHERE "C_NATIONKEY" = '15');

The following code displays the count of records in the CUSTOMER table. Since we have already created and inserted data in the preceding steps, the record count indicates that no data got written into the table.

session.table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER").count()
+------+|11842 |+------+

Throw Error:

The following code throws an exception if the specified table already exists.

customer_wrt = df_customer_select.write.mode("errorifexists").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER")
// SQL compilation error: Object 'SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER' already exists.

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API and throws error. The resulting SQL statement and the result are as follows:

--- "create table" is used instead of "create or replace"
CREATE TABLE SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER (    "C_CUSTKEY" BIGINT NOT NULL,    "C_NAME" STRING(25) NOT NULL) AS SELECT * FROM (    SELECT "C_CUSTKEY", "C_NAME" FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER WHERE ("C_NATIONKEY" = '15'));
// SQL compilation error: Object 'SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER' already exists.

How can you specify the type of table when writing data from a Snowpark DataFrame into Snowflake?

By default, the save_as_table method creates a permanent table. To create tables of the temporary or transient type, include an additional parameter table_type along with the table_name.

The supported values of table_type are: temp, temporary, and transient.

Syntax:

DataFrame.write.mode(save_mode).save_as_table(table_name, table_type= "{,temp, temporary, and transient}")
--- The following code writes the contents of the df_customer_select DataFrame to a temporary table named TEMP_CUSTOMER.customer_wrt = df_customer_select.write.mode("overwrite").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.TEMP_CUSTOMER" ,table_type="temp")

How Can a Snowpark DataFrame View Be Created?

To create a view from a DataFrame, call the create_or_replace_view method, which creates a new view immediately.

Syntax:

--- DataFrame.create_or_replace_view("view_name")

The following code creates a view named VW_CUSTOMER using the computation expressed by the df_customer_select DataFrame.

customer_view = df_customer_select.create_or_replace_view("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.VW_CUSTOMER")

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API. The resulting SQL statement is as follows:

--- Creating view based on DataFrame expression
CREATE OR REPLACE VIEW SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.VW_CUSTOMER AS
SELECT * FROM (
SELECT "C_CUSTKEY", "C_NAME" FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER WHERE "C_NATIONKEY" = '15');

Alternatively, the create_or_replace_temp_view method can be used to create a temporary view. The temporary view is only available in the session in which it is created.

Sourced from Factspan

--

--

Factspan
Factspan

Written by Factspan

Factspan is a pure play analytics company. We partner with you to build an analytics center of excellence, uncovering insights and solutions from your data.

No responses yet