For data-driven decision making, keeping your analytics up-to-date is crucial. But how do you efficiently track and process the constant flow of changes in your data? Change Data Capture (CDC) is the solution for the modern data architectures. In this post, we'll explain how to implement robust CDC workflows in Snowflake using streams and tasks with a sample use-case to illustrate the concept.
Why CDC Matters
Snowflake's cloud-native architecture makes it an ideal platform for implementing CDC workflows. Its separation of storage and compute allows for efficient processing of incremental changes without the need to scan entire datasets. Two features make this possible:
- Streams: These act as change-tracking mechanisms for tables.
- Tasks: Automated, scheduled jobs that can process data from streams.
Let's dive into how we can use these to build a robust CDC pipeline.
Our Example Scenario
For this tutorial, we'll use a simplified e-commerce dataset with three main tables:
Our goal is to maintain an up-to-date analytics layer that reflects changes in these source tables.
-- Create our source tables
CREATE OR REPLACE TABLE customers (
customer_id INT,
name STRING,
email STRING,
last_updated TIMESTAMP_NTZ
);
CREATE OR REPLACE TABLE orders (
order_id INT,
customer_id INT,
order_date DATE,
total_amount DECIMAL(10,2),
status STRING
);
CREATE OR REPLACE TABLE products (
product_id INT,
name STRING,
category STRING,
price DECIMAL(10,2),
inventory_count INT
);
-- Insert some sample data
INSERT INTO customers VALUES
(1, 'Alice Johnson', 'alice@example.com', CURRENT_TIMESTAMP()),
(2, 'Bob Smith', 'bob@example.com', CURRENT_TIMESTAMP());
INSERT INTO orders VALUES
(101, 1, CURRENT_DATE(), 99.99, 'Shipped'),
(102, 2, CURRENT_DATE(), 149.99, 'Processing');
INSERT INTO products VALUES
(1001, 'Ergonomic Keyboard', 'Electronics', 79.99, 100),
(1002, 'Wireless Mouse', 'Electronics', 29.99, 200);
With our tables set up, let's start building our CDC workflow.
Step 1: Creating Streams
Streams in Snowflake are the foundation of our CDC process. They keep track of DML changes (inserts, updates, deletes) on the source tables. Here's how we create streams for each of our tables:
-- Create streams on our source tables
CREATE OR REPLACE STREAM customer_changes ON TABLE customers;
CREATE OR REPLACE STREAM order_changes ON TABLE orders;
CREATE OR REPLACE STREAM product_changes ON TABLE products;
These streams will now capture all changes made to their respective tables. But how do we process these changes? That's where tasks come in.
Step 2: Creating Tasks
Tasks allow us to automatically process the data from our streams at regular intervals. Let's create a task for each of our streams:
-- Create a task to process customer changes
CREATE OR REPLACE TASK process_customer_changes
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = '1 minute'
AS
MERGE INTO analytics.customer_summary tgt
USING (SELECT customer_id, name, email, last_updated
FROM customer_changes) src
ON tgt.customer_id = src.customer_id
WHEN MATCHED THEN UPDATE
SET tgt.name = src.name,
tgt.email = src.email,
tgt.last_updated = src.last_updated
WHEN NOT MATCHED THEN INSERT
(customer_id, name, email, last_updated)
VALUES
(src.customer_id, src.name, src.email, src.last_updated);
-- Create a task to process order changes
CREATE OR REPLACE TASK process_order_changes
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = '1 minute'
AS
MERGE INTO analytics.order_summary tgt
USING (SELECT order_id, customer_id, order_date, total_amount, status
FROM order_changes) src
ON tgt.order_id = src.order_id
WHEN MATCHED THEN UPDATE
SET tgt.customer_id = src.customer_id,
tgt.order_date = src.order_date,
tgt.total_amount = src.total_amount,
tgt.status = src.status
WHEN NOT MATCHED THEN INSERT
(order_id, customer_id, order_date, total_amount, status)
VALUES
(src.order_id, src.customer_id, src.order_date, src.total_amount, src.status);
-- Create a task to process product changes
CREATE OR REPLACE TASK process_product_changes
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = '1 minute'
AS
MERGE INTO analytics.product_summary tgt
USING (SELECT product_id, name, category, price, inventory_count
FROM product_changes) src
ON tgt.product_id = src.product_id
WHEN MATCHED THEN UPDATE
SET tgt.name = src.name,
tgt.category = src.category,
tgt.price = src.price,
tgt.inventory_count = src.inventory_count
WHEN NOT MATCHED THEN INSERT
(product_id, name, category, price, inventory_count)
VALUES
(src.product_id, src.name, src.category, src.price, src.inventory_count);
Let's break down what's happening in these tasks:
- We're using MERGE statements to upsert data into our analytics tables.
- The source for each MERGE is the corresponding stream.
- We're scheduling these tasks to run every minute, but you can adjust this based on your needs.
Step 3: Activating the CDC Workflow
With our streams and tasks in place, we just need to activate our tasks:
ALTER TASK process_customer_changes RESUME;
ALTER TASK process_order_changes RESUME;
ALTER TASK process_product_changes RESUME;
Now our CDC workflow is up and running! Any changes to the source tables will be captured by the streams and processed by the tasks, keeping our analytics layer up-to-date.
Monitoring and Optimization
While our basic CDC workflow is functional, there's always room for improvement. Here are some tips to keep your CDC process running smoothly:
- Monitor task history: Regularly check the TASK_HISTORY view to ensure your tasks are running successfully.
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
ORDER BY SCHEDULED_TIME DESC
LIMIT 10;
- Optimize task frequency: If you're processing a high volume of changes, you might need to run your tasks more frequently. Conversely, for low-volume tables, you could reduce the frequency to save on compute costs.
- Use task dependencies: For complex workflows, you can create dependencies between tasks. For example, you might want to process customer changes before order changes:
ALTER TASK process_order_changes
ADD AFTER process_customer_changes;
- Implement error handling: Add error handling logic to your tasks to manage unexpected data or failures gracefully.
Real-world Impact: A Case Study
We worked with a large online retailer that struggled with inventory management. They were running batch updates every 6 hours, which led to overselling popular items during flash sales. By implementing a CDC workflow similar to what we've discussed, they were able to update their inventory analytics every 5 minutes.
The result? A 30% reduction in stockouts and a 15% increase in customer satisfaction scores. The near real-time inventory updates allowed them to trigger restock alerts faster and adjust promotional activities on the fly.
Advanced Techniques: Handling Complex Scenarios
As your CDC grows, you might encounter more complex scenarios. Here are a few advanced techniques to consider:
1. Handling Deletes
Our current setup doesn't explicitly handle deletes. To capture deletes, you can use Snowflake's CHANGE_TRACKING feature:
-- Enable change tracking on the source table
ALTER TABLE products SET CHANGE_TRACKING = TRUE;
-- Modify the product_changes stream to capture deletes
CREATE OR REPLACE STREAM product_changes ON TABLE products
SHOW_INITIAL_ROWS = TRUE;
-- Update the process_product_changes task
CREATE OR REPLACE TASK process_product_changes
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = '1 minute'
AS
MERGE INTO analytics.product_summary tgt
USING (
SELECT product_id, name, category, price, inventory_count,
METADATA$ACTION, METADATA$ISUPDATE
FROM product_changes
) src
ON tgt.product_id = src.product_id
WHEN MATCHED AND src.METADATA$ACTION = 'DELETE' THEN DELETE
WHEN MATCHED AND src.METADATA$ISUPDATE THEN UPDATE
SET tgt.name = src.name,
tgt.category = src.category,
tgt.price = src.price,
tgt.inventory_count = src.inventory_count
WHEN NOT MATCHED AND src.METADATA$ACTION = 'INSERT' THEN INSERT
(product_id, name, category, price, inventory_count)
VALUES
(src.product_id, src.name, src.category, src.price, src.inventory_count);
2. Handling Schema Changes
Schema changes can break your CDC pipeline. One way to make your pipeline more resilient is to use dynamic SQL:
CREATE OR REPLACE TASK process_customer_changes
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = '1 minute'
AS
DECLARE
v_columns STRING;
BEGIN
-- Get all columns except METADATA$ columns
SELECT LISTAGG(column_name, ', ')
INTO v_columns
FROM information_schema.columns
WHERE table_name = 'CUSTOMERS'
AND table_schema = 'PUBLIC'
AND column_name NOT LIKE 'METADATA$%';
EXECUTE IMMEDIATE 'MERGE INTO analytics.customer_summary tgt
USING (SELECT ' || v_columns || '
FROM customer_changes) src
ON tgt.customer_id = src.customer_id
WHEN MATCHED THEN UPDATE
SET ' || REGEXP_REPLACE(v_columns, '([^,]+)', 'tgt.\\1 = src.\\1') || '
WHEN NOT MATCHED THEN INSERT
(' || v_columns || ')
VALUES
(' || REGEXP_REPLACE(v_columns, '([^,]+)', 'src.\\1') || ')';
END;
This approach dynamically builds the SQL statement based on the current schema, making it more adaptable to changes.
3. Managing Large Volumes of Changes
For tables with a high volume of changes, processing all changes in a single task run might take too long. You can implement a windowing strategy:
CREATE OR REPLACE TASK process_high_volume_changes
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = '1 minute'
AS
DECLARE
v_last_processed_offset NUMBER;
BEGIN
-- Get the last processed offset
SELECT NVL(MAX(last_processed_offset), 0)
INTO v_last_processed_offset
FROM analytics.change_processing_status
WHERE table_name = 'HIGH_VOLUME_TABLE';
-- Process the next batch of changes
MERGE INTO analytics.high_volume_summary tgt
USING (
SELECT *
FROM high_volume_changes
WHERE METADATA$ACTION_SEQ > v_last_processed_offset
ORDER BY METADATA$ACTION_SEQ
LIMIT 10000 -- Adjust batch size as needed
) src
ON tgt.id = src.id
WHEN MATCHED THEN UPDATE
SET tgt.column1 = src.column1,
tgt.column2 = src.column2
-- ... other columns ...
WHEN NOT MATCHED THEN INSERT
(id, column1, column2)
VALUES
(src.id, src.column1, src.column2);
-- Update the last processed offset
UPDATE analytics.change_processing_status
SET last_processed_offset = (
SELECT MAX(METADATA$ACTION_SEQ)
FROM high_volume_changes
WHERE METADATA$ACTION_SEQ > v_last_processed_offset
)
WHERE table_name = 'HIGH_VOLUME_TABLE';
END;
This approach processes changes in batches, keeping track of the last processed change to ensure no data is missed.
Conclusion: Embracing the Future of Data Processing
Implementing CDC workflows in Snowflake using streams and tasks opens up a world of possibilities for near real-time data processing. From improving inventory management to enabling instant analytics updates, CDC can be a game-changer for your data strategy.
The key to successful CDC implementation lies in:
- Designing your streams to capture the right level of detail
- Optimizing your tasks for performance and reliability
- Monitoring and fine-tuning your workflow as your data needs evolve
As you move forward on your CDC journey, keep experimenting and adapting these techniques to your specific use cases. The data engineering landscape is ever-changing, and with tools like Snowflake's streams and tasks, you're well-equipped to ride the wave of continuous data updates.