All posts

Using Streams in Snowflake for Change Data Capture

Date
  • Jeff Skoldberg
    Principal Consultant at Green Mountain Data Solutions

Intro to Snowflake Streams and Change Data Capture

A Snowflake Stream is responsible for Change Data Capture (CDC), helping users understand what has changed in a table since a certain time. But what is CDC?

Simply put, CDC is the process of answering “what has changed in my data source since last time I loaded or processed the data?”

Putting it more technically, CDC is a method of identifying and capturing changes made to data in a data source. It does this by keeping track of all insertions, updates, and deletions that happen for a given database table. This form of tracking changes is much more flexible, compared to snapshot based approaches used in data loading tools like Fivetran, since it allows you to re-produce the exact state of your table at any given point in time.

Streams are Snowflake’s answer to internal CDC

Quoting Snowflake’s documentation:

A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data.

Let’s unpack this some more.

In Snowflake, a Stream Table, referred to as a Stream, is a set of records from a source table that have changed since the last time the stream’s data was used in a DML transaction. Snowflake accomplishes this by tracking new micro-partitions that have been added since the last time the stream was consumed.

The first important concept to understand about a stream is that it can be queried in the from clause of a SQL statement and acts very much like a view. What makes it special is that it returns rows from a source table that have changed since a point in time, called an offset. The offset is reset by using the stream data in a DML query (insert, update, delete, CTAS).

Understanding the Stream Offset

When you create a stream and select from it right away, the default behavior is the stream will be empty when it is initialized. If you then insert or change 5 records in the source table, querying stream will return 5 records. You can select from the stream without changing the offset. The offset is only reset when you query the stream with a DML statement, insert, update, merge, or CTAS.

Example Stream Usage

Source data

In this example, I’m going to create a stream on a view, because the source table has too many columns and the columns are in a strange order. You’ll see that creating streams on views is easy; it is much the same process as creating streams on tables.

I have a view called raw_sales_data defined as:

create view raw_sales_data as
select
sales_order_id,
name, -- the customer's Full Name
email, -- the customer's email
ordered_at_utc -- unix timestamp in milliseconds
from fake_sales_orders -- the table where data is added
;

Create and Query the Stream

Let’s create a stream on this source view:

create or replace stream sales_stream
on view raw_sales_orders;

Let’s select from the stream and apply a light transformation:

select
sales_order_id as order_id,
name as customer_name,
email as customer_email,
(ordered_at_utc / 1e9)::timestamptz as ordered_at_utc
from
sales_stream;

Since we have not manipulated or added data in the source table, the stream table will be empty:

Add data to the source, query the stream

I’m going to add 1000 rows to the table by running a Python Extract/Load script, then select from the stream again:

The stream now has 1000 rows.

Use the stream data to reset the offset / clear the stream

In order to clear the stream, or reset the offset, I need to use the stream data in a DML transaction. For this simple example, I’m going to insert the data into a temporary table , which will be the simplest way I can clear the stream. But likely you will merge this data into a permanent target table.

create temp table
clear_the_stream as
(select * from sales_stream);
select count(*) from sales_stream; --now returns 0 since the offset has been reset.

After “consuming” the stream data, the stream has been cleared. A new offset is created, so we can repeat the process of manipulating the source data, querying the stream, and consuming the stream.

Update and delete source data

We looked at adding new records to source data, but what about updating the source data? Let’s update a single record, and delete a single record.

update fake_sales_orders
-- note: table `fake_sales_orders` is the basis for the view `raw_sales_orders`
set name = 'Jeff Skoldberg'
where sales_order_id = '59472696-660a-4935-bc30-2078ed35f044'
;
delete from fake_sales_orders
where sales_order_id = '4dfc5e0f-4268-4a46-9dbf-816acf48588e'
;

Now that I have changed two rows in my data, how many rows do you think the stream will return?

If you thought “two”… you would be incorrect 🤔😀. It was a trick question.

The correct answer is three rows:

  • The updated row will have two rows in the stream. One row will be flagged with METADATA$ACTION = DELETE and a second row will have METADATA$ACTION = INSERT.
  • The deleted row will show a single record in the stream with METADATA$ACTION = DELETE
select
sales_order_id,
name,
metadata$action,
metadata$isupdate,
metadata$row_id
from
sales_stream;

Updating the target table

Here is the SQL I would use to handle deleting the deleted record from the target and updating the updated record:

MERGE INTO clear_the_stream AS tgt -- the temp table from earlier
USING (
SELECT *
FROM sales_stream
) AS src
ON tgt.sales_order_id = src.sales_order_id
-- For rows marked as insert or update
WHEN MATCHED AND src.metadata$action = 'INSERT'
THEN UPDATE
SET
tgt.name = src.name
-- you would use all the columns
-- I'm only using the "NAME" field for simplicity,
-- because I know which field was updated
-- For rows marked as deleted
WHEN MATCHED AND src.metadata$action = 'DELETE' THEN
DELETE
-- If a row is new (not present in target), insert it
WHEN NOT MATCHED AND src.metadata$action = 'INSERT' THEN
INSERT (
sales_order_id,
name,
email,
ordered_at_utc,
metadata$action,
metadata$isupdate,
metadata$row_id
)
VALUES (
src.sales_order_id,
src.name,
src.email,
src.ordered_at_utc,
src.metadata$action,
src.metadata$isupdate
src.metadata$row_id
);

After running the query, let’s check the results:

select * from
clear_the_stream --the temp table acting as my target table
where sales_order_id in -- the two rows I manipulated in the source
(
'4dfc5e0f-4268-4a46-9dbf-816acf48588e',
'59472696-660a-4935-bc30-2078ed35f044'
);

It properly deleted the sales_order_id beginning with 4df... and it updated the 594... id so the name field now says Jeff Skoldberg. Sweet!

When to use SHOW_INITIAL_ROWS

So far we have created streams using the default behavior. There is an important optional parameter called show_initial_rows that can be set when creating a stream:

create or replace stream my_stream on table my_table
show_initial_rows = TRUE

In this case, the stream will not be empty when it is created. When you select from my_stream it will contain all of the unprocessed rows: the rows that existed before you created the stream and any data that has changed or arrived in the moments since then.

This property solves some challenges that would otherwise exist in building complete downstream targets. Without this property, a work-around would be needed; for example, unioning existing records with changed records could be one approach. But for most data modeling scenarios, enabling show_initial_rows will be exactly what you want to do.

Stream Metadata Columns

We have already been using the Stream Metadata columns, but let’s take a moment to properly define each of them. For efficiency and completeness, I have copied this section directly from Snowflake’s documentation:

  • **METADATA$ACTION:**** **Indicates the DML operation (INSERT, DELETE) recorded.
  • **METADATA$ISUPDATE:**** **Indicates whether the operation was part of an UPDATE statement. Updates to rows in the source object are represented as a pair of DELETE and INSERT records in the stream with a metadata column METADATA$ISUPDATE values set to TRUE. Note that streams record the differences between two offsets. If a row is added and then updated in the current offset, the delta change is a new row. The METADATA$ISUPDATE row records a FALSE value.
  • **METADATA$ROW_ID:**** **specifies the unique and immutable ID for the row, which can be used to track changes to specific rows over time.

Using streams in a task

A common use case is to schedule the consumption of a stream in a Snowflake task. To accomplish this, you can put the Snowflake MERGE statement right into the task definition:

CREATE TASK consume_the_stream
SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
AS
-- copy / paste the merge statement from above
;
alter task consume_the_stream resume;

The above task will run 9 AM to 5 PM every day.

Alternatively, you can wrap the merge statement in a Snowflake stored procedure and have the task execute the stored procedure. This keeps your task definition tidy.

Types of streams

Snowflake has 3 types of streams:

  1. Standard Streams: tracks all inserts updates and deletes on a source. This is what we used in the walk-through above.
  2. Append Only: tracks only inserted rows on the source. Example: Let’s say we have a fresh offset and no data in a stream. 5 records are added to the source table, and a different 5 records are updated, and a different 5 records were deleted. The resulting stream would only contain the 5 inserted records. Example usage:
create or replace stream my_stream
on table my_table
append_only=true;
  1. Insert Only: This is similar to Append Only, but it is for files in cloud storage that are behind External Tables, Iceberg tables, or Dynamic External Tables. Data in new files added to cloud storage will appear in the stream. Removing a file from cloud storage will not affect the stream. Example usage:
create or replace stream my_stream
on external table my_external_table
insert_only=TRUE;

Checking if a stream has data using STREAM_HAS_DATA

We’ve already seen that you can check if a stream has data by doing something like select count(*) from sales_stream or select * from sales_stream. But, there is a useful system function to be aware of called SYSTEM$STREAM_HAS_DATA. You can use it in a select statement and it will return TRUE or FALSE depending if the stream has data.

select
SYSTEM$STREAM_HAS_DATA('SALES_STREAM')
as stream_has_data;
-- returns FALSE since we just reset the offset by consuming the stream in DML.

FAQ

How much do streams cost?

The main cost you will incur with Streams are the compute costs associated with querying the stream or using it in a transformation step.

A stream table itself doesn’t contain any data, although you can query it like it does. Because the changed data is not replicated, streams have very little storage costs. When you enable change tracking on a table, the three metadata columns are added to the source table. The only additional storage costs you incur is for the small amount of storage needed for those extra columns.

Please check here for caveats with time travel and data retention time. For a refresher on Snowflake’s costs, check out SELECT’s Snowflake Pricing & Billing Guide.

Can you create multiple streams on the same table?

You can create as many streams on a table as you’d like. It is best practice that each consumer or target table has a unique stream associated with it. Do not use a single stream to write to two different tables, as the offset will get cleared and your data will get out of sync. If you want to transform one source table in 5 different ways, you can create 5 streams, no problem.

What are the limitations?

As we’ve seen in the example above, you can enable a stream on a view just like you can a table. But there are a few limitations:

  • The underlying tables must be native Snowflake tables.

  • Nested views, CTEs and Subqueries are supported. But the fully-expended SQL Query can only use these operations: Projections, filters, union all, inner or cross join. This means union without all and left outer join are not supported.

  • For more details, check the docs.

How can you manage streams?

You can find all streams in your account, database, or schema with any of these commands:

show streams in account;
show streams in database <db_name>;
show streams in schema <qualified_schema_name>;
show streams; -- uses your worksheet or connection context / default schema.

In the UI, you can find streams within a schema:

Streams can be dropped using the drop command: drop stream db.schema.stream_name;

CHANGES: a read-only alternative to streams

When change tracking is turned on, either with alter table <table_name> set change_tracking = TRUE; or create stream <stream_name> on table <table_name>, you can query the changes from a particular point in time, even if the offset has been reset.

By adding the keyword changes to the from clause and passing a at (timestamp => <timestamp>, you can see all of the changes that have happened since that timestamp, without resetting the offset.

set ts = dateadd('hours', -3, current_timestamp);
SELECT *
FROM fake_sales_orders
CHANGES(INFORMATION => DEFAULT)
AT(TIMESTAMP => $ts);

The query above shows all of the changes on the table in the last 3 hours. The metadata columns are returned in the query results, so you can use them in your transformation process.

When to use changes instead of streams

It could be a good idea to use changes if:

  1. You have multiple ELT consumers on the same source and don’t want to manage multiple streams.

  2. You are pulling data outside of Snowflake. For example, loading data from Snowflake to Mixpanel or Amplitude, it makes sense to store your own cursor timestamp and select data that has changed since that timestamp. This is because:

    • You may no have any DML action inside of Snowflake to clear a stream.
    • You want flexibility to re-load a a certain portion of the data.

The topic of “selecting from changes” is big enough for its own blog post, as the examples and use cases are many; however, I’m just briefly mentioning it here for completeness.

Using Dynamic Tables instead of Streams

Before creating a new transformation process using steams, it may be worth while to consider if Snowflake’s native Dynamic Tables feature offers a simpler solution to your use case. Often times you can accomplish something similar using a dynamic table in Snowflake, but it all depends on what you are trying to do. If you want to isolate new or changed records, Streams and Changes are great tools to have!

Wrapping up

Hopefully you now feel confident to use streams in production! Streams are a great way to capture new or changed data on large source tables. Streams are most often used when the source data is quite large, and you want a reliable way to incrementally capture new or changed records.

Jeff Skoldberg
Principal Consultant at Green Mountain Data Solutions
Jeff Skoldberg is a Data and Analytics Consultant with 15+ years experience in automating insights and using data to control business processes. From a technology standpoint, he specializes in Snowflake + dbt + Tableau. From a business topic standpoint, he has experience in Public Utility, Clinical Trials, Publishing, CPG, and Manufacturing. Jeff adds unique value for his supply chain clients, as he is well versed in all topics related to planning, forecasting, inventory and supply chain KPIs. Reach out any time! [email protected]

Get up and running with SELECT in 15 minutes.

Snowflake optimization & cost management platform

Gain visibility into Snowflake usage, optimize performance and automate savings with the click of a button.

SELECT web application screenshot

Want to hear about our latest Snowflake learnings? 
Subscribe to get notified.