The Night Everything Broke (And How Streams Saved Me)
It was 2 AM on a Tuesday. My phone was buzzing non-stop. Our nightly ETL job had failedâagain. This time, it crashed after processing 6 hours of data, and we had to start over from scratch. The business needed fresh customer data by 8 AM for the morning reports.
I was running a classic batch process: every night at midnight, truncate the target table, reload everything from source, rebuild all the aggregations. It worked fine when we had 100,000 customers. But we’d grown to 5 million customers, and the full reload was taking 8 hours.
That’s when I discovered Streams and Tasks in Snowflake. Within a week, I rebuilt the entire pipeline:
- No more full reloads (only process changes)
- No more manual scheduling (Tasks handled it)
- No more 8-hour batch windows (incremental updates took 10 minutes)
- No more 2 AM phone calls (built-in retry logic)
This guide is everything I wish someone had shown me that night. We’ll build real pipelinesânot toy examplesâstarting from simple automation and working up to complex patterns like SCD Type 2.
What Are Streams and Tasks? (The Simple Explanation)
Before diving into code, let’s understand what these actually do:
Streams are like security cameras for your tables. They watch what changes (inserts, updates, deletes) and create a “change log” you can query. Think of it as automatic change data capture (CDC) built into Snowflake.
Tasks are scheduled SQL jobs. They’re like cron jobs but smarterâthey can depend on other tasks, run only when data is available, and auto-retry on failure.
Together? Magic. Streams detect changes, Tasks process them automatically.
The old way:
-- Run this manually or via cron at 2 AM
TRUNCATE TABLE customer_summary;
INSERT INTO customer_summary
SELECT customer_id, COUNT(*) as order_count, SUM(amount) as total_spent
FROM orders
GROUP BY customer_id;
-- Takes hours, processes everything, fails if interrupted
The new way:
-- Stream watches for changes
CREATE STREAM order_changes ON TABLE orders;
-- Task processes only changes, runs automatically
CREATE TASK update_customer_summary
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('order_changes')
AS
-- Process only changed orders (10 seconds instead of 8 hours!)
MERGE INTO customer_summary ...
Let’s build this properly.
Part 1: Understanding Streams (Change Data Capture)
Creating Your First Stream
-- Setup: Create sample source table
CREATE OR REPLACE TABLE customers (
customer_id INTEGER,
customer_name STRING,
email STRING,
status STRING,
created_date DATE,
updated_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Insert initial data
INSERT INTO customers VALUES
(1, 'John Doe', '[email protected]', 'ACTIVE', '2024-01-15', CURRENT_TIMESTAMP()),
(2, 'Jane Smith', '[email protected]', 'ACTIVE', '2024-02-20', CURRENT_TIMESTAMP()),
(3, 'Bob Johnson', '[email protected]', 'ACTIVE', '2024-03-10', CURRENT_TIMESTAMP());
-- Create stream to track changes
CREATE OR REPLACE STREAM customer_changes ON TABLE customers;
-- At this point, stream is empty (no changes yet)
SELECT * FROM customer_changes;
-- Returns 0 rows
How Streams Capture Changes
Now let’s make some changes and see what the stream captures:
-- Make various changes
UPDATE customers SET status = 'INACTIVE' WHERE customer_id = 1;
INSERT INTO customers VALUES (4, 'Alice Williams', '[email protected]', 'ACTIVE', '2024-04-05', CURRENT_TIMESTAMP());
DELETE FROM customers WHERE customer_id = 3;
-- Query the stream
SELECT
customer_id,
customer_name,
email,
status,
METADATA$ACTION, -- INSERT, DELETE, or UPDATE
METADATA$ISUPDATE, -- TRUE for updates
METADATA$ROW_ID -- Unique identifier for this change
FROM customer_changes;
Output:
customer_id | customer_name | status | METADATA$ACTION | METADATA$ISUPDATE
1 | John Doe | INACTIVE | INSERT | TRUE
1 | John Doe | ACTIVE | DELETE | TRUE
4 | Alice Williams | ACTIVE | INSERT | FALSE
3 | Bob Johnson | ACTIVE | DELETE | FALSE
Understanding the output:
- UPDATE appears as DELETE (old value) + INSERT (new value) with
METADATA$ISUPDATE = TRUE - INSERT appears as INSERT with
METADATA$ISUPDATE = FALSE - DELETE appears as DELETE with
METADATA$ISUPDATE = FALSE
Stream Consumption (Critical Concept)
Here’s something that confused me for weeks: streams are consumed when you read from them in a DML operation.
-- Query stream (doesn't consume)
SELECT * FROM customer_changes;
-- Stream still has data
-- Use stream in INSERT (consumes!)
INSERT INTO customer_backup
SELECT * FROM customer_changes;
-- Query stream again
SELECT * FROM customer_changes;
-- Returns 0 rows - stream was consumed!
Important patterns:
-- If you need the data multiple times, capture it first
CREATE TEMPORARY TABLE changes_snapshot AS
SELECT * FROM customer_changes;
-- Now use snapshot multiple times
INSERT INTO target1 SELECT * FROM changes_snapshot;
INSERT INTO target2 SELECT * FROM changes_snapshot;
-- Stream is only consumed once
Part 2: Understanding Tasks (Automation)
Creating Your First Task
-- Simple task that runs on schedule
CREATE OR REPLACE TASK hello_world_task
WAREHOUSE = my_wh
SCHEDULE = '5 MINUTE'
AS
INSERT INTO task_logs
VALUES ('Hello from task!', CURRENT_TIMESTAMP());
-- Tasks are created in SUSPENDED state
-- You must explicitly start them
ALTER TASK hello_world_task RESUME;
-- Check task status
SHOW TASKS LIKE 'hello_world_task';
-- View task runs
SELECT
name,
state,
scheduled_time,
completed_time,
error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE name = 'HELLO_WORLD_TASK'
ORDER BY scheduled_time DESC
LIMIT 10;
Conditional Execution (Run Only When Needed)
-- Create task that only runs when stream has data
CREATE OR REPLACE TASK process_customer_changes
WAREHOUSE = etl_wh
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('customer_changes')
AS
INSERT INTO customer_history
SELECT
customer_id,
customer_name,
email,
status,
METADATA$ACTION as change_type,
CURRENT_TIMESTAMP() as processed_at
FROM customer_changes;
ALTER TASK process_customer_changes RESUME;
Why this is powerful:
- Task checks every 5 minutes
- Only runs if stream has data
- Warehouse only spins up when needed
- Zero cost if no changes
Task Dependencies (Building Pipelines)
-- Create a pipeline: raw â staging â production
-- Task 1: Load raw data
CREATE OR REPLACE TASK load_raw_data
WAREHOUSE = etl_wh
SCHEDULE = '10 MINUTE'
AS
COPY INTO raw_orders
FROM @s3_stage/orders/
FILE_FORMAT = (TYPE = 'CSV');
-- Task 2: Clean and stage (runs after Task 1)
CREATE OR REPLACE TASK stage_data
WAREHOUSE = etl_wh
AFTER load_raw_data -- Dependency!
AS
INSERT INTO staged_orders
SELECT
order_id,
customer_id,
UPPER(TRIM(product_name)) as product_name,
amount,
order_date
FROM raw_orders_stream
WHERE amount > 0; -- Filter bad data
-- Task 3: Aggregate to production (runs after Task 2)
CREATE OR REPLACE TASK aggregate_to_prod
WAREHOUSE = etl_wh
AFTER stage_data -- Another dependency!
AS
MERGE INTO customer_summary target
USING (
SELECT
customer_id,
COUNT(*) as new_orders,
SUM(amount) as new_amount
FROM staged_orders_stream
GROUP BY customer_id
) source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
UPDATE SET
total_orders = total_orders + source.new_orders,
total_spent = total_spent + source.new_amount
WHEN NOT MATCHED THEN
INSERT (customer_id, total_orders, total_spent)
VALUES (source.customer_id, source.new_orders, source.new_amount);
-- IMPORTANT: Resume tasks in reverse order (child first, parent last)
ALTER TASK aggregate_to_prod RESUME;
ALTER TASK stage_data RESUME;
ALTER TASK load_raw_data RESUME; -- Root task last!
Task dependency diagram:
load_raw_data (every 10 min)
â
stage_data (after load_raw_data completes)
â
aggregate_to_prod (after stage_data completes)
Part 3: Real Use Case #1 – Simple Incremental Load
Scenario: Load customer orders from source system, keep only net changes.
-- Source table (simulates external system)
CREATE OR REPLACE TABLE source_orders (
order_id INTEGER,
customer_id INTEGER,
product_name STRING,
amount DECIMAL(10,2),
order_date DATE,
updated_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Target table (your data warehouse)
CREATE OR REPLACE TABLE dwh_orders (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
product_name STRING,
amount DECIMAL(10,2),
order_date DATE,
loaded_at TIMESTAMP_LTZ
);
-- Create stream on source
CREATE OR REPLACE STREAM source_orders_stream ON TABLE source_orders;
-- Create incremental load task
CREATE OR REPLACE TASK incremental_load_orders
WAREHOUSE = etl_wh
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('source_orders_stream')
AS
MERGE INTO dwh_orders target
USING (
-- Get net changes from stream
SELECT
order_id,
customer_id,
product_name,
amount,
order_date
FROM source_orders_stream
WHERE METADATA$ACTION = 'INSERT'
AND METADATA$ISUPDATE = FALSE
) source
ON target.order_id = source.order_id
WHEN MATCHED THEN
UPDATE SET
target.customer_id = source.customer_id,
target.product_name = source.product_name,
target.amount = source.amount,
target.order_date = source.order_date,
target.loaded_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, product_name, amount, order_date, loaded_at)
VALUES (source.order_id, source.customer_id, source.product_name,
source.amount, source.order_date, CURRENT_TIMESTAMP());
ALTER TASK incremental_load_orders RESUME;
-- Test it!
INSERT INTO source_orders VALUES
(1, 101, 'Widget A', 29.99, '2026-01-15', CURRENT_TIMESTAMP()),
(2, 102, 'Widget B', 49.99, '2026-01-16', CURRENT_TIMESTAMP());
-- Wait 5 minutes, check target
SELECT * FROM dwh_orders;
-- Make updates
UPDATE source_orders SET amount = 39.99 WHERE order_id = 1;
-- Wait 5 minutes, verify update applied
SELECT * FROM dwh_orders WHERE order_id = 1;
Why this works:
- Stream captures all changes
- Task runs only when changes exist
- MERGE handles both new and updated records
- Fully automated, zero manual intervention
Part 4: Real Use Case #2 – SCD Type 2 (The Big One)
This is the use case everyone asks about. Slowly Changing Dimensions Type 2 tracks full history of changes.
Business requirement: Track complete history of customer data changes over time.
Step 1: Create SCD2 Table Structure
-- Dimension table with SCD2 pattern
CREATE OR REPLACE TABLE dim_customer_scd2 (
customer_key INTEGER AUTOINCREMENT, -- Surrogate key
customer_id INTEGER, -- Natural key
customer_name STRING,
email STRING,
phone STRING,
address STRING,
city STRING,
state STRING,
status STRING,
effective_start_date TIMESTAMP_LTZ, -- When this version became active
effective_end_date TIMESTAMP_LTZ, -- When this version became inactive
is_current BOOLEAN, -- Is this the current version?
inserted_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Source table
CREATE OR REPLACE TABLE source_customers (
customer_id INTEGER PRIMARY KEY,
customer_name STRING,
email STRING,
phone STRING,
address STRING,
city STRING,
state STRING,
status STRING,
updated_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load initial data
INSERT INTO source_customers VALUES
(1, 'John Doe', '[email protected]', '555-0001', '123 Main St', 'Seattle', 'WA', 'ACTIVE', CURRENT_TIMESTAMP()),
(2, 'Jane Smith', '[email protected]', '555-0002', '456 Oak Ave', 'Portland', 'OR', 'ACTIVE', CURRENT_TIMESTAMP()),
(3, 'Bob Johnson', '[email protected]', '555-0003', '789 Pine Rd', 'Boston', 'MA', 'ACTIVE', CURRENT_TIMESTAMP());
-- Initial load to dimension
INSERT INTO dim_customer_scd2
(customer_id, customer_name, email, phone, address, city, state, status,
effective_start_date, effective_end_date, is_current)
SELECT
customer_id,
customer_name,
email,
phone,
address,
city,
state,
status,
CURRENT_TIMESTAMP() as effective_start_date,
'9999-12-31'::TIMESTAMP_LTZ as effective_end_date,
TRUE as is_current
FROM source_customers;
-- Verify
SELECT * FROM dim_customer_scd2;
Step 2: Create Stream on Source
CREATE OR REPLACE STREAM source_customers_stream ON TABLE source_customers;
Step 3: Build SCD2 Processing Logic
This is the complex part. We need to:
- Identify what changed
- Expire old records (set end date, is_current = FALSE)
- Insert new versions
CREATE OR REPLACE TASK process_customer_scd2
WAREHOUSE = etl_wh
SCHEDULE = '10 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('source_customers_stream')
AS
BEGIN
-- Step 1: Expire old records for changed customers
UPDATE dim_customer_scd2
SET
effective_end_date = CURRENT_TIMESTAMP(),
is_current = FALSE
WHERE customer_id IN (
SELECT customer_id
FROM source_customers_stream
WHERE METADATA$ACTION = 'INSERT' -- Updates appear as INSERT in stream
AND METADATA$ISUPDATE = TRUE
)
AND is_current = TRUE;
-- Step 2: Insert new versions for changed customers
INSERT INTO dim_customer_scd2
(customer_id, customer_name, email, phone, address, city, state, status,
effective_start_date, effective_end_date, is_current)
SELECT
customer_id,
customer_name,
email,
phone,
address,
city,
state,
status,
CURRENT_TIMESTAMP() as effective_start_date,
'9999-12-31'::TIMESTAMP_LTZ as effective_end_date,
TRUE as is_current
FROM source_customers_stream
WHERE METADATA$ACTION = 'INSERT'
AND METADATA$ISUPDATE = TRUE;
-- Step 3: Insert brand new customers
INSERT INTO dim_customer_scd2
(customer_id, customer_name, email, phone, address, city, state, status,
effective_start_date, effective_end_date, is_current)
SELECT
customer_id,
customer_name,
email,
phone,
address,
city,
state,
status,
CURRENT_TIMESTAMP() as effective_start_date,
'9999-12-31'::TIMESTAMP_LTZ as effective_end_date,
TRUE as is_current
FROM source_customers_stream
WHERE METADATA$ACTION = 'INSERT'
AND METADATA$ISUPDATE = FALSE;
END;
ALTER TASK process_customer_scd2 RESUME;
Step 4: Test SCD2 Processing
-- Test 1: Update customer address (should create new version)
UPDATE source_customers
SET address = '999 New Street', city = 'San Francisco', state = 'CA'
WHERE customer_id = 1;
-- Wait 10 minutes (or manually run: EXECUTE TASK process_customer_scd2;)
-- Check results - should see 2 versions of customer 1
SELECT
customer_key,
customer_id,
customer_name,
address,
city,
state,
effective_start_date,
effective_end_date,
is_current
FROM dim_customer_scd2
WHERE customer_id = 1
ORDER BY effective_start_date;
-- Output:
-- customer_key | customer_id | address | city | is_current | effective_start_date | effective_end_date
-- 1 | 1 | 123 Main St | Seattle | FALSE | 2026-01-15 10:00 | 2026-01-15 15:30
-- 4 | 1 | 999 New Street | San Francisco | TRUE | 2026-01-15 15:30 | 9999-12-31 23:59
-- Test 2: Update multiple attributes
UPDATE source_customers
SET
email = '[email protected]',
phone = '555-9999',
status = 'INACTIVE'
WHERE customer_id = 2;
-- Check history
SELECT
customer_key,
customer_id,
customer_name,
email,
phone,
status,
effective_start_date,
is_current
FROM dim_customer_scd2
WHERE customer_id = 2
ORDER BY effective_start_date;
-- Test 3: Insert new customer
INSERT INTO source_customers VALUES
(4, 'Alice Williams', '[email protected]', '555-0004', '321 Elm St', 'Austin', 'TX', 'ACTIVE', CURRENT_TIMESTAMP());
-- Verify new customer appears in dimension
SELECT * FROM dim_customer_scd2 WHERE customer_id = 4;
Step 5: Query Historical Data
Now the payoffâquerying data as it was at any point in time:
-- Current state (easy)
SELECT * FROM dim_customer_scd2 WHERE is_current = TRUE;
-- State as of specific date
SELECT
customer_id,
customer_name,
address,
city,
state,
status
FROM dim_customer_scd2
WHERE '2026-01-15 12:00:00'::TIMESTAMP_LTZ BETWEEN effective_start_date AND effective_end_date;
-- Find all changes for a customer
SELECT
customer_id,
customer_name,
address || ', ' || city || ', ' || state as full_address,
status,
effective_start_date,
effective_end_date,
DATEDIFF(day, effective_start_date, effective_end_date) as days_active
FROM dim_customer_scd2
WHERE customer_id = 1
ORDER BY effective_start_date;
-- Customers who changed status in last 30 days
SELECT DISTINCT
current_version.customer_id,
current_version.customer_name,
previous_version.status as old_status,
current_version.status as new_status,
current_version.effective_start_date as changed_on
FROM dim_customer_scd2 current_version
JOIN dim_customer_scd2 previous_version
ON current_version.customer_id = previous_version.customer_id
AND previous_version.effective_end_date = current_version.effective_start_date
WHERE current_version.is_current = TRUE
AND current_version.status != previous_version.status
AND current_version.effective_start_date >= DATEADD(day, -30, CURRENT_TIMESTAMP());
Part 5: Real Use Case #3 – Multi-Table Pipeline
Scenario: Process orders â update customer metrics â trigger alerts
-- Source tables
CREATE OR REPLACE TABLE raw_orders (
order_id INTEGER,
customer_id INTEGER,
amount DECIMAL(10,2),
order_date DATE,
status STRING,
inserted_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
CREATE OR REPLACE TABLE customer_metrics (
customer_id INTEGER PRIMARY KEY,
total_orders INTEGER DEFAULT 0,
total_spent DECIMAL(15,2) DEFAULT 0,
avg_order_value DECIMAL(10,2) DEFAULT 0,
last_order_date DATE,
customer_segment STRING, -- 'VIP', 'Regular', 'At Risk'
updated_at TIMESTAMP_LTZ
);
CREATE OR REPLACE TABLE vip_alerts (
alert_id INTEGER AUTOINCREMENT,
customer_id INTEGER,
alert_type STRING,
message STRING,
created_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create streams
CREATE OR REPLACE STREAM raw_orders_stream ON TABLE raw_orders;
-- Task 1: Process new orders
CREATE OR REPLACE TASK process_new_orders
WAREHOUSE = etl_wh
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('raw_orders_stream')
AS
MERGE INTO customer_metrics target
USING (
SELECT
customer_id,
COUNT(*) as new_order_count,
SUM(amount) as new_order_amount,
MAX(order_date) as latest_order_date
FROM raw_orders_stream
WHERE METADATA$ACTION = 'INSERT'
AND status = 'COMPLETED'
GROUP BY customer_id
) source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET
total_orders = total_orders + source.new_order_count,
total_spent = total_spent + source.new_order_amount,
avg_order_value = (total_spent + source.new_order_amount) / (total_orders + source.new_order_count),
last_order_date = GREATEST(target.last_order_date, source.latest_order_date),
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT
(customer_id, total_orders, total_spent, avg_order_value, last_order_date, updated_at)
VALUES
(source.customer_id, source.new_order_count, source.new_order_amount,
source.new_order_amount / source.new_order_count, source.latest_order_date, CURRENT_TIMESTAMP());
-- Task 2: Update customer segments (runs after Task 1)
CREATE OR REPLACE TASK update_customer_segments
WAREHOUSE = etl_wh
AFTER process_new_orders
AS
UPDATE customer_metrics
SET
customer_segment = CASE
WHEN total_spent >= 10000 THEN 'VIP'
WHEN total_spent >= 1000 THEN 'Regular'
WHEN DATEDIFF(day, last_order_date, CURRENT_DATE()) > 180 THEN 'At Risk'
ELSE 'Regular'
END,
updated_at = CURRENT_TIMESTAMP()
WHERE updated_at >= DATEADD(minute, -10, CURRENT_TIMESTAMP()); -- Only recently updated
-- Task 3: Generate VIP alerts (runs after Task 2)
CREATE OR REPLACE TASK generate_vip_alerts
WAREHOUSE = etl_wh
AFTER update_customer_segments
AS
INSERT INTO vip_alerts (customer_id, alert_type, message)
SELECT
customer_id,
'NEW_VIP' as alert_type,
'Customer ' || customer_id || ' just became VIP with $' || total_spent || ' total spent!'
FROM customer_metrics
WHERE customer_segment = 'VIP'
AND updated_at >= DATEADD(minute, -10, CURRENT_TIMESTAMP())
AND total_spent >= 10000
AND total_spent < 10500; -- Likely just crossed threshold
-- Resume tasks (child first!)
ALTER TASK generate_vip_alerts RESUME;
ALTER TASK update_customer_segments RESUME;
ALTER TASK process_new_orders RESUME;
-- Test the pipeline
INSERT INTO raw_orders VALUES
(1, 101, 500.00, '2026-01-15', 'COMPLETED', CURRENT_TIMESTAMP()),
(2, 101, 9600.00, '2026-01-16', 'COMPLETED', CURRENT_TIMESTAMP()); -- Should trigger VIP!
-- Wait 5-10 minutes, check results
SELECT * FROM customer_metrics WHERE customer_id = 101;
SELECT * FROM vip_alerts WHERE customer_id = 101;
Part 6: Error Handling and Monitoring
Handling Task Failures
-- Task with error handling
CREATE OR REPLACE TASK robust_processing
WAREHOUSE = etl_wh
SCHEDULE = '10 MINUTE'
AS
BEGIN
-- Use TRY-CATCH pattern
INSERT INTO processing_log VALUES ('Starting process', CURRENT_TIMESTAMP(), NULL);
BEGIN
-- Your processing logic
MERGE INTO target_table ...;
INSERT INTO processing_log VALUES ('Success', CURRENT_TIMESTAMP(), NULL);
EXCEPTION
WHEN OTHER THEN
INSERT INTO error_log VALUES (SQLERRM, CURRENT_TIMESTAMP());
RETURN;
END;
END;
Monitoring Task Execution
-- Create monitoring view
CREATE OR REPLACE VIEW task_monitoring AS
SELECT
name as task_name,
state,
schedule,
warehouse_name,
error_code,
error_message,
scheduled_time,
query_start_time,
completed_time,
DATEDIFF(second, query_start_time, completed_time) as execution_seconds
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
scheduled_time_range_start => DATEADD(hour, -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
-- Check for failures
SELECT *
FROM task_monitoring
WHERE state = 'FAILED'
ORDER BY scheduled_time DESC;
-- Check average execution time
SELECT
task_name,
COUNT(*) as runs,
AVG(execution_seconds) as avg_seconds,
MAX(execution_seconds) as max_seconds,
COUNT(CASE WHEN state = 'FAILED' THEN 1 END) as failure_count
FROM task_monitoring
WHERE scheduled_time >= DATEADD(day, -7, CURRENT_TIMESTAMP())
GROUP BY task_name
ORDER BY failure_count DESC, avg_seconds DESC;
Stream Lag Monitoring
-- Check if streams are falling behind
SELECT
table_name,
stream_name,
SYSTEM$STREAM_GET_TABLE_TIMESTAMP(stream_name) as stream_position,
CURRENT_TIMESTAMP() as current_time,
DATEDIFF(minute,
SYSTEM$STREAM_GET_TABLE_TIMESTAMP(stream_name),
CURRENT_TIMESTAMP()
) as lag_minutes
FROM information_schema.streams
WHERE table_schema = CURRENT_SCHEMA();
-- Alert if lag > 1 hour
SELECT
stream_name,
lag_minutes,
'WARNING: Stream falling behind!' as alert
FROM (
SELECT
stream_name,
DATEDIFF(minute,
SYSTEM$STREAM_GET_TABLE_TIMESTAMP(stream_name),
CURRENT_TIMESTAMP()
) as lag_minutes
FROM information_schema.streams
)
WHERE lag_minutes > 60;
Part 7: Performance Optimization
Tip 1: Minimize Stream Scans
-- Bad: Scanning stream multiple times
CREATE TASK inefficient_task AS
BEGIN
INSERT INTO table1 SELECT * FROM my_stream WHERE condition1;
INSERT INTO table2 SELECT * FROM my_stream WHERE condition2;
END;
-- Problem: Stream scanned twice (expensive!)
-- Good: Scan once, use temp table
CREATE TASK efficient_task AS
BEGIN
CREATE TEMPORARY TABLE stream_data AS SELECT * FROM my_stream;
INSERT INTO table1 SELECT * FROM stream_data WHERE condition1;
INSERT INTO table2 SELECT * FROM stream_data WHERE condition2;
DROP TABLE stream_data;
END;
Tip 2: Use Clustering for Large Streams
-- If stream source table is large, cluster it
ALTER TABLE large_source_table CLUSTER BY (date_column);
-- Improves stream performance significantly
Tip 3: Right-Size Warehouses
-- Use smaller warehouses for simple tasks
CREATE TASK simple_aggregation
WAREHOUSE = X_SMALL_WH -- Don't waste credits
SCHEDULE = '5 MINUTE'
AS ...;
-- Use larger for complex processing
CREATE TASK complex_transformations
WAREHOUSE = LARGE_WH
SCHEDULE = '1 HOUR'
AS ...;
Tip 4: Task Scheduling Strategy
-- Stagger tasks to avoid warehouse contention
CREATE TASK task_1 SCHEDULE = 'USING CRON 0 * * * * UTC' AS ...; -- Every hour at :00
CREATE TASK task_2 SCHEDULE = 'USING CRON 15 * * *