A Snowflake Stream is responsible for Change Data Capture (CDC), helping users understand what has changed in a table since a certain time. But what is CDC?
Simply put, CDC is the process of answering “what has changed in my data source since last time I loaded or processed the data?”
Putting it more technically, CDC is a method of identifying and capturing changes made to data in a data source. It does this by keeping track of all insertions, updates, and deletions that happen for a given database table. This form of tracking changes is much more flexible, compared to snapshot based approaches used in data loading tools like Fivetran, since it allows you to re-produce the exact state of your table at any given point in time.
Quoting Snowflake’s documentation:
A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data.
Let’s unpack this some more.
In Snowflake, a Stream Table, referred to as a Stream, is a set of records from a source table that have changed since the last time the stream’s data was used in a DML transaction. Snowflake accomplishes this by tracking new micro-partitions that have been added since the last time the stream was consumed.
The first important concept to understand about a stream is that it can be queried in the from
clause of a SQL statement and acts very much like a view. What makes it special is that it returns rows from a source table that have changed since a point in time, called an offset
. The offset
is reset by using the stream data in a DML query (insert, update, delete, CTAS).
When you create a stream and select from it right away, the default behavior is the stream will be empty when it is initialized. If you then insert or change 5 records in the source table, querying stream will return 5 records. You can select from the stream without changing the offset. The offset is only reset when you query the stream with a DML statement, insert, update, merge, or CTAS.
In this example, I’m going to create a stream on a view, because the source table has too many columns and the columns are in a strange order. You’ll see that creating streams on views is easy; it is much the same process as creating streams on tables.
I have a view called raw_sales_data
defined as:
create view raw_sales_data as
select
sales_order_id,
name, -- the customer's Full Name
email, -- the customer's email
ordered_at_utc -- unix timestamp in milliseconds
from fake_sales_orders -- the table where data is added
;
Let’s create a stream on this source view:
create or replace stream sales_stream
on view raw_sales_orders;
Let’s select from the stream and apply a light transformation:
select
sales_order_id as order_id,
name as customer_name,
email as customer_email,
(ordered_at_utc / 1e9)::timestamptz as ordered_at_utc
from
sales_stream;
Since we have not manipulated or added data in the source table, the stream table will be empty:
I’m going to add 1000 rows to the table by running a Python Extract/Load script, then select from the stream again:
The stream now has 1000 rows.
In order to clear the stream, or reset the offset
, I need to use the stream data in a DML transaction. For this simple example, I’m going to insert the data into a temporary table
, which will be the simplest way I can clear the stream. But likely you will merge
this data into a permanent target table.
create temp table
clear_the_stream as
(select * from sales_stream);
select count(*) from sales_stream; --now returns 0 since the offset has been reset.
After “consuming” the stream data, the stream has been cleared. A new offset
is created, so we can repeat the process of manipulating the source data, querying the stream, and consuming the stream.
We looked at adding new records to source data, but what about updating the source data? Let’s update a single record, and delete a single record.
update fake_sales_orders
-- note: table `fake_sales_orders` is the basis for the view `raw_sales_orders`
set name = 'Jeff Skoldberg'
where sales_order_id = '59472696-660a-4935-bc30-2078ed35f044'
;
delete from fake_sales_orders
where sales_order_id = '4dfc5e0f-4268-4a46-9dbf-816acf48588e'
;
Now that I have changed two rows in my data, how many rows do you think the stream will return?
If you thought “two”… you would be incorrect 🤔😀. It was a trick question.
The correct answer is three rows:
METADATA$ACTION
= DELETE
and a second row will have METADATA$ACTION
= INSERT
.METADATA$ACTION
= DELETE
select
sales_order_id,
name,
metadata$action,
metadata$isupdate,
metadata$row_id
from
sales_stream;
Here is the SQL I would use to handle deleting the deleted record from the target and updating the updated record:
MERGE INTO clear_the_stream AS tgt -- the temp table from earlier
USING (
SELECT *
FROM sales_stream
) AS src
ON tgt.sales_order_id = src.sales_order_id
-- For rows marked as insert or update
WHEN MATCHED AND src.metadata$action = 'INSERT'
THEN UPDATE
SET
tgt.name = src.name
-- you would use all the columns
-- I'm only using the "NAME" field for simplicity,
-- because I know which field was updated
After running the query, let’s check the results:
select * from
clear_the_stream --the temp table acting as my target table
where sales_order_id in -- the two rows I manipulated in the source
(
'4dfc5e0f-4268-4a46-9dbf-816acf48588e',
'59472696-660a-4935-bc30-2078ed35f044'
);
It properly deleted the sales_order_id
beginning with 4df
... and it updated the 594...
id so the name
field now says Jeff Skoldberg
. Sweet!
So far we have created streams using the default behavior. There is an important optional parameter called show_initial_rows
that can be set when creating a stream:
create or replace stream my_stream on table my_table
show_initial_rows = TRUE
In this case, the stream will not be empty when it is created. When you select from my_stream
it will contain all of the unprocessed rows: the rows that existed before you created the stream and any data that has changed or arrived in the moments since then.
This property solves some challenges that would otherwise exist in building complete downstream targets. Without this property, a work-around would be needed; for example, unioning existing records with changed records could be one approach. But for most data modeling scenarios, enabling show_initial_rows
will be exactly what you want to do.
We have already been using the Stream Metadata columns, but let’s take a moment to properly define each of them. For efficiency and completeness, I have copied this section directly from Snowflake’s documentation:
**METADATA$ACTION:**
** **Indicates the DML operation (INSERT, DELETE) recorded.**METADATA$ISUPDATE:**
** **Indicates whether the operation was part of an UPDATE statement. Updates to rows in the source object are represented as a pair of DELETE and INSERT records in the stream with a metadata column METADATA$ISUPDATE values set to TRUE. Note that streams record the differences between two offsets. If a row is added and then updated in the current offset, the delta change is a new row. The METADATA$ISUPDATE row records a FALSE value.**METADATA$ROW_ID:**
** **specifies the unique and immutable ID for the row, which can be used to track changes to specific rows over time.A common use case is to schedule the consumption of a stream in a Snowflake task. To accomplish this, you can put the Snowflake MERGE statement right into the task definition:
CREATE TASK consume_the_stream
SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
AS
-- copy / paste the merge statement from above
;
alter task consume_the_stream resume;
The above task will run 9 AM to 5 PM every day.
Alternatively, you can wrap the merge statement in a Snowflake stored procedure and have the task execute the stored procedure. This keeps your task definition tidy.
Snowflake has 3 types of streams:
create or replace stream my_stream
on table my_table
append_only=true;
3. Insert Only: This is similar to Append Only, but it is for files in cloud storage that are behind External Tables, Iceberg tables, or Dynamic External Tables. Data in new files added to cloud storage will appear in the stream. Removing a file from cloud storage will not affect the stream. Example usage:
create or replace stream my_stream
on external table my_external_table
insert_only=TRUE;
We’ve already seen that you can check if a stream has data by doing something like select count(*) from sales_stream
or select * from sales_stream
. But, there is a useful system function to be aware of called SYSTEM$STREAM_HAS_DATA
. You can use it in a select statement and it will return TRUE
or FALSE
depending if the stream has data.
select
SYSTEM$STREAM_HAS_DATA('SALES_STREAM')
as stream_has_data;
-- returns FALSE since we just reset the offset by consuming the stream in DML.
The main cost you will incur with Streams are the compute costs associated with querying the stream or using it in a transformation step.
A stream table itself doesn’t contain any data, although you can query it like it does. Because the changed data is not replicated, streams have very little storage costs. When you enable change tracking on a table, the three metadata columns are added to the source table. The only additional storage costs you incur is for the small amount of storage needed for those extra columns.
Please check here for caveats with time travel and data retention time. For a refresher on Snowflake’s costs, check out SELECT’s Snowflake Pricing & Billing Guide.
You can create as many streams on a table as you’d like. It is best practice that each consumer or target table has a unique stream associated with it. Do not use a single stream to write to two different tables, as the offset
will get cleared and your data will get out of sync. If you want to transform one source table in 5 different ways, you can create 5 streams, no problem.
As we’ve seen in the example above, you can enable a stream on a view just like you can a table. But there are a few limitations:
union
without all
and left outer join
are not supported.For more details, check the docs.
You can find all streams in your account, database, or schema with any of these commands:
show streams in account;
show streams in database <db_name>;
show streams in schema <qualified_schema_name>;
show streams; -- uses your worksheet or connection context / default schema.
In the UI, you can find streams within a schema:
Streams can be dropped using the drop
command: drop stream db.schema.stream_name;
When change tracking is turned on, either with alter table <table_name> set change_tracking = TRUE;
or create stream <stream_name> on table <table_name>
, you can query the changes from a particular point in time, even if the offset
has been reset.
By adding the keyword changes
to the from
clause and passing a at (timestamp => <timestamp>
, you can see all of the changes that have happened since that timestamp, without resetting the offset
.
set ts = dateadd('hours', -3, current_timestamp);
SELECT *
FROM fake_sales_orders
CHANGES(INFORMATION => DEFAULT)
AT(TIMESTAMP => $ts);
The query above shows all of the changes on the table in the last 3 hours. The metadata columns are returned in the query results, so you can use them in your transformation process.
It could be a good idea to use changes
if:
The topic of “selecting from changes” is big enough for its own blog post, as the examples and use cases are many; however, I’m just briefly mentioning it here for completeness.
Before creating a new transformation process using steams, it may be worth while to consider if Snowflake’s native Dynamic Tables feature offers a simpler solution to your use case. Often times you can accomplish something similar using a dynamic table in Snowflake, but it all depends on what you are trying to do. If you want to isolate new or changed records, Streams and Changes are great tools to have!
Hopefully you now feel confident to use streams in production! Streams are a great way to capture new or changed data on large source tables. Streams are most often used when the source data is quite large, and you want a reliable way to incrementally capture new or changed records.
Jeff Skoldberg is a Sales Engineer at SELECT, helping customers get maximum value out of the SELECT app to reduce their Snowflake spend. Prior to joining SELECT, Jeff was a Data and Analytics Consultant with 15+ years experience in automating insights and using data to control business processes. From a technology standpoint, he specializes in Snowflake + dbt + Tableau. From a business topic standpoint, he has experience in Public Utility, Clinical Trials, Publishing, CPG, and Manufacturing.
Get up and running with SELECT in 15 minutes.
Gain visibility into Snowflake usage, optimize performance and automate savings with the click of a button.