The Night Everything Broke (And How Streams Saved Me)

It was 2 AM on a Tuesday. My phone was buzzing non-stop. Our nightly ETL job had failed—again. This time, it crashed after processing 6 hours of data, and we had to start over from scratch. The business needed fresh customer data by 8 AM for the morning reports.

I was running a classic batch process: every night at midnight, truncate the target table, reload everything from source, rebuild all the aggregations. It worked fine when we had 100,000 customers. But we’d grown to 5 million customers, and the full reload was taking 8 hours.

That’s when I discovered Streams and Tasks in Snowflake. Within a week, I rebuilt the entire pipeline:

  • No more full reloads (only process changes)
  • No more manual scheduling (Tasks handled it)
  • No more 8-hour batch windows (incremental updates took 10 minutes)
  • No more 2 AM phone calls (built-in retry logic)

This guide is everything I wish someone had shown me that night. We’ll build real pipelines—not toy examples—starting from simple automation and working up to complex patterns like SCD Type 2.

What Are Streams and Tasks? (The Simple Explanation)

Before diving into code, let’s understand what these actually do:

Streams are like security cameras for your tables. They watch what changes (inserts, updates, deletes) and create a “change log” you can query. Think of it as automatic change data capture (CDC) built into Snowflake.

Tasks are scheduled SQL jobs. They’re like cron jobs but smarter—they can depend on other tasks, run only when data is available, and auto-retry on failure.

Together? Magic. Streams detect changes, Tasks process them automatically.

The old way:

-- Run this manually or via cron at 2 AM
TRUNCATE TABLE customer_summary;
INSERT INTO customer_summary 
SELECT customer_id, COUNT(*) as order_count, SUM(amount) as total_spent
FROM orders
GROUP BY customer_id;
-- Takes hours, processes everything, fails if interrupted

The new way:

-- Stream watches for changes
CREATE STREAM order_changes ON TABLE orders;
-- Task processes only changes, runs automatically
CREATE TASK update_customer_summary
    SCHEDULE = '5 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('order_changes')
AS
    -- Process only changed orders (10 seconds instead of 8 hours!)
    MERGE INTO customer_summary ...

Let’s build this properly.

Part 1: Understanding Streams (Change Data Capture)

Creating Your First Stream

-- Setup: Create sample source table
CREATE OR REPLACE TABLE customers (
    customer_id INTEGER,
    customer_name STRING,
    email STRING,
    status STRING,
    created_date DATE,
    updated_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Insert initial data
INSERT INTO customers VALUES
(1, 'John Doe', '[email protected]', 'ACTIVE', '2024-01-15', CURRENT_TIMESTAMP()),
(2, 'Jane Smith', '[email protected]', 'ACTIVE', '2024-02-20', CURRENT_TIMESTAMP()),
(3, 'Bob Johnson', '[email protected]', 'ACTIVE', '2024-03-10', CURRENT_TIMESTAMP());
-- Create stream to track changes
CREATE OR REPLACE STREAM customer_changes ON TABLE customers;
-- At this point, stream is empty (no changes yet)
SELECT * FROM customer_changes;
-- Returns 0 rows

How Streams Capture Changes

Now let’s make some changes and see what the stream captures:

-- Make various changes
UPDATE customers SET status = 'INACTIVE' WHERE customer_id = 1;
INSERT INTO customers VALUES (4, 'Alice Williams', '[email protected]', 'ACTIVE', '2024-04-05', CURRENT_TIMESTAMP());
DELETE FROM customers WHERE customer_id = 3;
-- Query the stream
SELECT 
    customer_id,
    customer_name,
    email,
    status,
    METADATA$ACTION,      -- INSERT, DELETE, or UPDATE
    METADATA$ISUPDATE,    -- TRUE for updates
    METADATA$ROW_ID       -- Unique identifier for this change
FROM customer_changes;

Output:

customer_id | customer_name    | status   | METADATA$ACTION | METADATA$ISUPDATE
1           | John Doe         | INACTIVE | INSERT          | TRUE
1           | John Doe         | ACTIVE   | DELETE          | TRUE
4           | Alice Williams   | ACTIVE   | INSERT          | FALSE
3           | Bob Johnson      | ACTIVE   | DELETE          | FALSE

Understanding the output:

  1. UPDATE appears as DELETE (old value) + INSERT (new value) with METADATA$ISUPDATE = TRUE
  2. INSERT appears as INSERT with METADATA$ISUPDATE = FALSE
  3. DELETE appears as DELETE with METADATA$ISUPDATE = FALSE

Stream Consumption (Critical Concept)

Here’s something that confused me for weeks: streams are consumed when you read from them in a DML operation.

-- Query stream (doesn't consume)
SELECT * FROM customer_changes;
-- Stream still has data
-- Use stream in INSERT (consumes!)
INSERT INTO customer_backup
SELECT * FROM customer_changes;
-- Query stream again
SELECT * FROM customer_changes;
-- Returns 0 rows - stream was consumed!

Important patterns:

-- If you need the data multiple times, capture it first
CREATE TEMPORARY TABLE changes_snapshot AS
SELECT * FROM customer_changes;
-- Now use snapshot multiple times
INSERT INTO target1 SELECT * FROM changes_snapshot;
INSERT INTO target2 SELECT * FROM changes_snapshot;
-- Stream is only consumed once

Part 2: Understanding Tasks (Automation)

Creating Your First Task

-- Simple task that runs on schedule
CREATE OR REPLACE TASK hello_world_task
    WAREHOUSE = my_wh
    SCHEDULE = '5 MINUTE'
AS
    INSERT INTO task_logs 
    VALUES ('Hello from task!', CURRENT_TIMESTAMP());
-- Tasks are created in SUSPENDED state
-- You must explicitly start them
ALTER TASK hello_world_task RESUME;
-- Check task status
SHOW TASKS LIKE 'hello_world_task';
-- View task runs
SELECT 
    name,
    state,
    scheduled_time,
    completed_time,
    error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE name = 'HELLO_WORLD_TASK'
ORDER BY scheduled_time DESC
LIMIT 10;

Conditional Execution (Run Only When Needed)

-- Create task that only runs when stream has data
CREATE OR REPLACE TASK process_customer_changes
    WAREHOUSE = etl_wh
    SCHEDULE = '5 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('customer_changes')
AS
    INSERT INTO customer_history
    SELECT 
        customer_id,
        customer_name,
        email,
        status,
        METADATA$ACTION as change_type,
        CURRENT_TIMESTAMP() as processed_at
    FROM customer_changes;
ALTER TASK process_customer_changes RESUME;

Why this is powerful:

  • Task checks every 5 minutes
  • Only runs if stream has data
  • Warehouse only spins up when needed
  • Zero cost if no changes

Task Dependencies (Building Pipelines)

-- Create a pipeline: raw → staging → production
-- Task 1: Load raw data
CREATE OR REPLACE TASK load_raw_data
    WAREHOUSE = etl_wh
    SCHEDULE = '10 MINUTE'
AS
    COPY INTO raw_orders
    FROM @s3_stage/orders/
    FILE_FORMAT = (TYPE = 'CSV');
-- Task 2: Clean and stage (runs after Task 1)
CREATE OR REPLACE TASK stage_data
    WAREHOUSE = etl_wh
    AFTER load_raw_data  -- Dependency!
AS
    INSERT INTO staged_orders
    SELECT 
        order_id,
        customer_id,
        UPPER(TRIM(product_name)) as product_name,
        amount,
        order_date
    FROM raw_orders_stream
    WHERE amount > 0;  -- Filter bad data
-- Task 3: Aggregate to production (runs after Task 2)
CREATE OR REPLACE TASK aggregate_to_prod
    WAREHOUSE = etl_wh
    AFTER stage_data  -- Another dependency!
AS
    MERGE INTO customer_summary target
    USING (
        SELECT 
            customer_id,
            COUNT(*) as new_orders,
            SUM(amount) as new_amount
        FROM staged_orders_stream
        GROUP BY customer_id
    ) source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN 
        UPDATE SET 
            total_orders = total_orders + source.new_orders,
            total_spent = total_spent + source.new_amount
    WHEN NOT MATCHED THEN
        INSERT (customer_id, total_orders, total_spent)
        VALUES (source.customer_id, source.new_orders, source.new_amount);
-- IMPORTANT: Resume tasks in reverse order (child first, parent last)
ALTER TASK aggregate_to_prod RESUME;
ALTER TASK stage_data RESUME;
ALTER TASK load_raw_data RESUME;  -- Root task last!

Task dependency diagram:

load_raw_data (every 10 min)
    ↓
stage_data (after load_raw_data completes)
    ↓
aggregate_to_prod (after stage_data completes)

Part 3: Real Use Case #1 – Simple Incremental Load

Scenario: Load customer orders from source system, keep only net changes.

-- Source table (simulates external system)
CREATE OR REPLACE TABLE source_orders (
    order_id INTEGER,
    customer_id INTEGER,
    product_name STRING,
    amount DECIMAL(10,2),
    order_date DATE,
    updated_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Target table (your data warehouse)
CREATE OR REPLACE TABLE dwh_orders (
    order_id INTEGER PRIMARY KEY,
    customer_id INTEGER,
    product_name STRING,
    amount DECIMAL(10,2),
    order_date DATE,
    loaded_at TIMESTAMP_LTZ
);
-- Create stream on source
CREATE OR REPLACE STREAM source_orders_stream ON TABLE source_orders;
-- Create incremental load task
CREATE OR REPLACE TASK incremental_load_orders
    WAREHOUSE = etl_wh
    SCHEDULE = '5 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('source_orders_stream')
AS
    MERGE INTO dwh_orders target
    USING (
        -- Get net changes from stream
        SELECT 
            order_id,
            customer_id,
            product_name,
            amount,
            order_date
        FROM source_orders_stream
        WHERE METADATA$ACTION = 'INSERT'
          AND METADATA$ISUPDATE = FALSE
    ) source
    ON target.order_id = source.order_id
    WHEN MATCHED THEN 
        UPDATE SET
            target.customer_id = source.customer_id,
            target.product_name = source.product_name,
            target.amount = source.amount,
            target.order_date = source.order_date,
            target.loaded_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN
        INSERT (order_id, customer_id, product_name, amount, order_date, loaded_at)
        VALUES (source.order_id, source.customer_id, source.product_name, 
                source.amount, source.order_date, CURRENT_TIMESTAMP());
ALTER TASK incremental_load_orders RESUME;
-- Test it!
INSERT INTO source_orders VALUES
(1, 101, 'Widget A', 29.99, '2026-01-15', CURRENT_TIMESTAMP()),
(2, 102, 'Widget B', 49.99, '2026-01-16', CURRENT_TIMESTAMP());
-- Wait 5 minutes, check target
SELECT * FROM dwh_orders;
-- Make updates
UPDATE source_orders SET amount = 39.99 WHERE order_id = 1;
-- Wait 5 minutes, verify update applied
SELECT * FROM dwh_orders WHERE order_id = 1;

Why this works:

  • Stream captures all changes
  • Task runs only when changes exist
  • MERGE handles both new and updated records
  • Fully automated, zero manual intervention

Part 4: Real Use Case #2 – SCD Type 2 (The Big One)

This is the use case everyone asks about. Slowly Changing Dimensions Type 2 tracks full history of changes.

Business requirement: Track complete history of customer data changes over time.

Step 1: Create SCD2 Table Structure

-- Dimension table with SCD2 pattern
CREATE OR REPLACE TABLE dim_customer_scd2 (
    customer_key INTEGER AUTOINCREMENT,        -- Surrogate key
    customer_id INTEGER,                       -- Natural key
    customer_name STRING,
    email STRING,
    phone STRING,
    address STRING,
    city STRING,
    state STRING,
    status STRING,
    effective_start_date TIMESTAMP_LTZ,       -- When this version became active
    effective_end_date TIMESTAMP_LTZ,         -- When this version became inactive
    is_current BOOLEAN,                       -- Is this the current version?
    inserted_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Source table
CREATE OR REPLACE TABLE source_customers (
    customer_id INTEGER PRIMARY KEY,
    customer_name STRING,
    email STRING,
    phone STRING,
    address STRING,
    city STRING,
    state STRING,
    status STRING,
    updated_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load initial data
INSERT INTO source_customers VALUES
(1, 'John Doe', '[email protected]', '555-0001', '123 Main St', 'Seattle', 'WA', 'ACTIVE', CURRENT_TIMESTAMP()),
(2, 'Jane Smith', '[email protected]', '555-0002', '456 Oak Ave', 'Portland', 'OR', 'ACTIVE', CURRENT_TIMESTAMP()),
(3, 'Bob Johnson', '[email protected]', '555-0003', '789 Pine Rd', 'Boston', 'MA', 'ACTIVE', CURRENT_TIMESTAMP());
-- Initial load to dimension
INSERT INTO dim_customer_scd2 
(customer_id, customer_name, email, phone, address, city, state, status, 
 effective_start_date, effective_end_date, is_current)
SELECT 
    customer_id,
    customer_name,
    email,
    phone,
    address,
    city,
    state,
    status,
    CURRENT_TIMESTAMP() as effective_start_date,
    '9999-12-31'::TIMESTAMP_LTZ as effective_end_date,
    TRUE as is_current
FROM source_customers;
-- Verify
SELECT * FROM dim_customer_scd2;

Step 2: Create Stream on Source

CREATE OR REPLACE STREAM source_customers_stream ON TABLE source_customers;

Step 3: Build SCD2 Processing Logic

This is the complex part. We need to:

  1. Identify what changed
  2. Expire old records (set end date, is_current = FALSE)
  3. Insert new versions
CREATE OR REPLACE TASK process_customer_scd2
    WAREHOUSE = etl_wh
    SCHEDULE = '10 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('source_customers_stream')
AS
BEGIN
    -- Step 1: Expire old records for changed customers
    UPDATE dim_customer_scd2
    SET 
        effective_end_date = CURRENT_TIMESTAMP(),
        is_current = FALSE
    WHERE customer_id IN (
        SELECT customer_id 
        FROM source_customers_stream
        WHERE METADATA$ACTION = 'INSERT'  -- Updates appear as INSERT in stream
          AND METADATA$ISUPDATE = TRUE
    )
    AND is_current = TRUE;
    
    -- Step 2: Insert new versions for changed customers
    INSERT INTO dim_customer_scd2
    (customer_id, customer_name, email, phone, address, city, state, status,
     effective_start_date, effective_end_date, is_current)
    SELECT 
        customer_id,
        customer_name,
        email,
        phone,
        address,
        city,
        state,
        status,
        CURRENT_TIMESTAMP() as effective_start_date,
        '9999-12-31'::TIMESTAMP_LTZ as effective_end_date,
        TRUE as is_current
    FROM source_customers_stream
    WHERE METADATA$ACTION = 'INSERT'
      AND METADATA$ISUPDATE = TRUE;
    
    -- Step 3: Insert brand new customers
    INSERT INTO dim_customer_scd2
    (customer_id, customer_name, email, phone, address, city, state, status,
     effective_start_date, effective_end_date, is_current)
    SELECT 
        customer_id,
        customer_name,
        email,
        phone,
        address,
        city,
        state,
        status,
        CURRENT_TIMESTAMP() as effective_start_date,
        '9999-12-31'::TIMESTAMP_LTZ as effective_end_date,
        TRUE as is_current
    FROM source_customers_stream
    WHERE METADATA$ACTION = 'INSERT'
      AND METADATA$ISUPDATE = FALSE;
END;
ALTER TASK process_customer_scd2 RESUME;

Step 4: Test SCD2 Processing

-- Test 1: Update customer address (should create new version)
UPDATE source_customers 
SET address = '999 New Street', city = 'San Francisco', state = 'CA'
WHERE customer_id = 1;
-- Wait 10 minutes (or manually run: EXECUTE TASK process_customer_scd2;)
-- Check results - should see 2 versions of customer 1
SELECT 
    customer_key,
    customer_id,
    customer_name,
    address,
    city,
    state,
    effective_start_date,
    effective_end_date,
    is_current
FROM dim_customer_scd2
WHERE customer_id = 1
ORDER BY effective_start_date;
-- Output:
-- customer_key | customer_id | address        | city          | is_current | effective_start_date | effective_end_date
-- 1            | 1           | 123 Main St    | Seattle       | FALSE      | 2026-01-15 10:00     | 2026-01-15 15:30
-- 4            | 1           | 999 New Street | San Francisco | TRUE       | 2026-01-15 15:30     | 9999-12-31 23:59
-- Test 2: Update multiple attributes
UPDATE source_customers 
SET 
    email = '[email protected]',
    phone = '555-9999',
    status = 'INACTIVE'
WHERE customer_id = 2;
-- Check history
SELECT 
    customer_key,
    customer_id,
    customer_name,
    email,
    phone,
    status,
    effective_start_date,
    is_current
FROM dim_customer_scd2
WHERE customer_id = 2
ORDER BY effective_start_date;
-- Test 3: Insert new customer
INSERT INTO source_customers VALUES
(4, 'Alice Williams', '[email protected]', '555-0004', '321 Elm St', 'Austin', 'TX', 'ACTIVE', CURRENT_TIMESTAMP());
-- Verify new customer appears in dimension
SELECT * FROM dim_customer_scd2 WHERE customer_id = 4;

Step 5: Query Historical Data

Now the payoff—querying data as it was at any point in time:

-- Current state (easy)
SELECT * FROM dim_customer_scd2 WHERE is_current = TRUE;
-- State as of specific date
SELECT 
    customer_id,
    customer_name,
    address,
    city,
    state,
    status
FROM dim_customer_scd2
WHERE '2026-01-15 12:00:00'::TIMESTAMP_LTZ BETWEEN effective_start_date AND effective_end_date;
-- Find all changes for a customer
SELECT 
    customer_id,
    customer_name,
    address || ', ' || city || ', ' || state as full_address,
    status,
    effective_start_date,
    effective_end_date,
    DATEDIFF(day, effective_start_date, effective_end_date) as days_active
FROM dim_customer_scd2
WHERE customer_id = 1
ORDER BY effective_start_date;
-- Customers who changed status in last 30 days
SELECT DISTINCT
    current_version.customer_id,
    current_version.customer_name,
    previous_version.status as old_status,
    current_version.status as new_status,
    current_version.effective_start_date as changed_on
FROM dim_customer_scd2 current_version
JOIN dim_customer_scd2 previous_version
    ON current_version.customer_id = previous_version.customer_id
    AND previous_version.effective_end_date = current_version.effective_start_date
WHERE current_version.is_current = TRUE
  AND current_version.status != previous_version.status
  AND current_version.effective_start_date >= DATEADD(day, -30, CURRENT_TIMESTAMP());

Part 5: Real Use Case #3 – Multi-Table Pipeline

Scenario: Process orders → update customer metrics → trigger alerts

-- Source tables
CREATE OR REPLACE TABLE raw_orders (
    order_id INTEGER,
    customer_id INTEGER,
    amount DECIMAL(10,2),
    order_date DATE,
    status STRING,
    inserted_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
CREATE OR REPLACE TABLE customer_metrics (
    customer_id INTEGER PRIMARY KEY,
    total_orders INTEGER DEFAULT 0,
    total_spent DECIMAL(15,2) DEFAULT 0,
    avg_order_value DECIMAL(10,2) DEFAULT 0,
    last_order_date DATE,
    customer_segment STRING,  -- 'VIP', 'Regular', 'At Risk'
    updated_at TIMESTAMP_LTZ
);
CREATE OR REPLACE TABLE vip_alerts (
    alert_id INTEGER AUTOINCREMENT,
    customer_id INTEGER,
    alert_type STRING,
    message STRING,
    created_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create streams
CREATE OR REPLACE STREAM raw_orders_stream ON TABLE raw_orders;
-- Task 1: Process new orders
CREATE OR REPLACE TASK process_new_orders
    WAREHOUSE = etl_wh
    SCHEDULE = '5 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('raw_orders_stream')
AS
    MERGE INTO customer_metrics target
    USING (
        SELECT 
            customer_id,
            COUNT(*) as new_order_count,
            SUM(amount) as new_order_amount,
            MAX(order_date) as latest_order_date
        FROM raw_orders_stream
        WHERE METADATA$ACTION = 'INSERT'
          AND status = 'COMPLETED'
        GROUP BY customer_id
    ) source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET
        total_orders = total_orders + source.new_order_count,
        total_spent = total_spent + source.new_order_amount,
        avg_order_value = (total_spent + source.new_order_amount) / (total_orders + source.new_order_count),
        last_order_date = GREATEST(target.last_order_date, source.latest_order_date),
        updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN INSERT
        (customer_id, total_orders, total_spent, avg_order_value, last_order_date, updated_at)
    VALUES 
        (source.customer_id, source.new_order_count, source.new_order_amount,
         source.new_order_amount / source.new_order_count, source.latest_order_date, CURRENT_TIMESTAMP());
-- Task 2: Update customer segments (runs after Task 1)
CREATE OR REPLACE TASK update_customer_segments
    WAREHOUSE = etl_wh
    AFTER process_new_orders
AS
    UPDATE customer_metrics
    SET 
        customer_segment = CASE
            WHEN total_spent >= 10000 THEN 'VIP'
            WHEN total_spent >= 1000 THEN 'Regular'
            WHEN DATEDIFF(day, last_order_date, CURRENT_DATE()) > 180 THEN 'At Risk'
            ELSE 'Regular'
        END,
        updated_at = CURRENT_TIMESTAMP()
    WHERE updated_at >= DATEADD(minute, -10, CURRENT_TIMESTAMP());  -- Only recently updated
-- Task 3: Generate VIP alerts (runs after Task 2)
CREATE OR REPLACE TASK generate_vip_alerts
    WAREHOUSE = etl_wh
    AFTER update_customer_segments
AS
    INSERT INTO vip_alerts (customer_id, alert_type, message)
    SELECT 
        customer_id,
        'NEW_VIP' as alert_type,
        'Customer ' || customer_id || ' just became VIP with $' || total_spent || ' total spent!'
    FROM customer_metrics
    WHERE customer_segment = 'VIP'
      AND updated_at >= DATEADD(minute, -10, CURRENT_TIMESTAMP())
      AND total_spent >= 10000
      AND total_spent < 10500;  -- Likely just crossed threshold
-- Resume tasks (child first!)
ALTER TASK generate_vip_alerts RESUME;
ALTER TASK update_customer_segments RESUME;
ALTER TASK process_new_orders RESUME;
-- Test the pipeline
INSERT INTO raw_orders VALUES
(1, 101, 500.00, '2026-01-15', 'COMPLETED', CURRENT_TIMESTAMP()),
(2, 101, 9600.00, '2026-01-16', 'COMPLETED', CURRENT_TIMESTAMP());  -- Should trigger VIP!
-- Wait 5-10 minutes, check results
SELECT * FROM customer_metrics WHERE customer_id = 101;
SELECT * FROM vip_alerts WHERE customer_id = 101;

Part 6: Error Handling and Monitoring

Handling Task Failures

-- Task with error handling
CREATE OR REPLACE TASK robust_processing
    WAREHOUSE = etl_wh
    SCHEDULE = '10 MINUTE'
AS
BEGIN
    -- Use TRY-CATCH pattern
    INSERT INTO processing_log VALUES ('Starting process', CURRENT_TIMESTAMP(), NULL);
    
    BEGIN
        -- Your processing logic
        MERGE INTO target_table ...;
        
        INSERT INTO processing_log VALUES ('Success', CURRENT_TIMESTAMP(), NULL);
    EXCEPTION
        WHEN OTHER THEN
            INSERT INTO error_log VALUES (SQLERRM, CURRENT_TIMESTAMP());
            RETURN;
    END;
END;

Monitoring Task Execution

-- Create monitoring view
CREATE OR REPLACE VIEW task_monitoring AS
SELECT 
    name as task_name,
    state,
    schedule,
    warehouse_name,
    error_code,
    error_message,
    scheduled_time,
    query_start_time,
    completed_time,
    DATEDIFF(second, query_start_time, completed_time) as execution_seconds
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    scheduled_time_range_start => DATEADD(hour, -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
-- Check for failures
SELECT *
FROM task_monitoring
WHERE state = 'FAILED'
ORDER BY scheduled_time DESC;
-- Check average execution time
SELECT 
    task_name,
    COUNT(*) as runs,
    AVG(execution_seconds) as avg_seconds,
    MAX(execution_seconds) as max_seconds,
    COUNT(CASE WHEN state = 'FAILED' THEN 1 END) as failure_count
FROM task_monitoring
WHERE scheduled_time >= DATEADD(day, -7, CURRENT_TIMESTAMP())
GROUP BY task_name
ORDER BY failure_count DESC, avg_seconds DESC;

Stream Lag Monitoring

-- Check if streams are falling behind
SELECT 
    table_name,
    stream_name,
    SYSTEM$STREAM_GET_TABLE_TIMESTAMP(stream_name) as stream_position,
    CURRENT_TIMESTAMP() as current_time,
    DATEDIFF(minute, 
        SYSTEM$STREAM_GET_TABLE_TIMESTAMP(stream_name), 
        CURRENT_TIMESTAMP()
    ) as lag_minutes
FROM information_schema.streams
WHERE table_schema = CURRENT_SCHEMA();
-- Alert if lag > 1 hour
SELECT 
    stream_name,
    lag_minutes,
    'WARNING: Stream falling behind!' as alert
FROM (
    SELECT 
        stream_name,
        DATEDIFF(minute, 
            SYSTEM$STREAM_GET_TABLE_TIMESTAMP(stream_name), 
            CURRENT_TIMESTAMP()
        ) as lag_minutes
    FROM information_schema.streams
)
WHERE lag_minutes > 60;

Part 7: Performance Optimization

Tip 1: Minimize Stream Scans

-- Bad: Scanning stream multiple times
CREATE TASK inefficient_task AS
BEGIN
    INSERT INTO table1 SELECT * FROM my_stream WHERE condition1;
    INSERT INTO table2 SELECT * FROM my_stream WHERE condition2;
END;
-- Problem: Stream scanned twice (expensive!)
-- Good: Scan once, use temp table
CREATE TASK efficient_task AS
BEGIN
    CREATE TEMPORARY TABLE stream_data AS SELECT * FROM my_stream;
    
    INSERT INTO table1 SELECT * FROM stream_data WHERE condition1;
    INSERT INTO table2 SELECT * FROM stream_data WHERE condition2;
    
    DROP TABLE stream_data;
END;

Tip 2: Use Clustering for Large Streams

-- If stream source table is large, cluster it
ALTER TABLE large_source_table CLUSTER BY (date_column);
-- Improves stream performance significantly

Tip 3: Right-Size Warehouses

-- Use smaller warehouses for simple tasks
CREATE TASK simple_aggregation
    WAREHOUSE = X_SMALL_WH  -- Don't waste credits
    SCHEDULE = '5 MINUTE'
AS ...;
-- Use larger for complex processing
CREATE TASK complex_transformations
    WAREHOUSE = LARGE_WH
    SCHEDULE = '1 HOUR'
AS ...;

Tip 4: Task Scheduling Strategy

-- Stagger tasks to avoid warehouse contention
CREATE TASK task_1 SCHEDULE = 'USING CRON 0 * * * * UTC' AS ...;   -- Every hour at :00
CREATE TASK task_2 SCHEDULE = 'USING CRON 15 * * *