Python for Data Engineers Cheat Sheet

IntermediateLast updated: 2026-04-16 • 6 sections

Essential Python patterns for data engineering: pandas, itertools, generators, connecting to Snowflake, writing production-safe ETL scripts, and testing data pipelines.

Core Libraries Every Data Engineer Needs

LibraryUse CaseInstall
pandasIn-memory tabular transforms under ~5GBpip install pandas
polarsFaster pandas alternative, lazy eval, Arrow-nativepip install polars
pyarrowColumnar memory format, Parquet read/writepip install pyarrow
snowflake-connector-pythonConnect to Snowflake, fetch_pandas_all()pip install snowflake-connector-python[pandas]
snowflake-snowpark-pythonPush computation to Snowflake warehousepip install snowflake-snowpark-python
sqlalchemyGeneric DB abstraction (Postgres, MySQL, Snowflake)pip install sqlalchemy
boto3AWS S3 / Secrets Manager / Gluepip install boto3
pydanticRuntime data validation, schema contractspip install pydantic
great-expectationsData quality testing frameworkpip install great-expectations
pytestUnit + integration tests for pipelinespip install pytest

Production-Safe Snowflake Connection Pattern

import os
import snowflake.connector
from contextlib import contextmanager

@contextmanager
def snowflake_conn():
    """Always close connection, even on error."""
    conn = snowflake.connector.connect(
        account=os.environ['SF_ACCOUNT'],
        user=os.environ['SF_USER'],
        password=os.environ['SF_PASSWORD'],
        warehouse=os.environ['SF_WAREHOUSE'],
        database=os.environ['SF_DATABASE'],
        schema=os.environ['SF_SCHEMA'],
        role=os.environ['SF_ROLE'],
        client_session_keep_alive=False,
    )
    try:
        yield conn
    finally:
        conn.close()

def run_query(sql: str, params: dict | None = None):
    with snowflake_conn() as conn:
        with conn.cursor() as cur:
            cur.execute(sql, params or {})
            return cur.fetchall()

# Parameterized query (SQL-injection safe)
rows = run_query(
    "SELECT * FROM orders WHERE customer_id = %(cid)s AND order_date >= %(d)s",
    {"cid": 12345, "d": "2026-01-01"},
)

Efficient Pandas Patterns

import pandas as pd

# Read Parquet (10-100x faster than CSV)
df = pd.read_parquet('s3://bucket/orders.parquet')

# Specify dtypes to cut memory 50%+
df = pd.read_csv('data.csv', dtype={
    'order_id': 'int32',
    'amount': 'float32',
    'status': 'category',
})

# Vectorize — NEVER iterate row-by-row
# BAD:   for idx, row in df.iterrows(): ...
# GOOD:  df['total'] = df['qty'] * df['price']

# Use assign() for readable chained transforms
clean = (
    df
    .query('amount > 0')
    .assign(
        order_month=lambda d: d['order_date'].dt.to_period('M'),
        is_big=lambda d: d['amount'] > 1000,
    )
    .groupby('order_month', as_index=False)
    .agg(revenue=('amount', 'sum'), orders=('order_id', 'count'))
)

# Process giant files in chunks
for chunk in pd.read_csv('huge.csv', chunksize=100_000):
    processed = chunk.pipe(transform).pipe(validate)
    processed.to_parquet(f'out/{chunk.index[0]}.parquet')

Generators & Memory-Efficient Iteration

# Stream rows from Snowflake without loading all into memory
def stream_query(sql: str, batch_size: int = 10_000):
    with snowflake_conn() as conn:
        with conn.cursor() as cur:
            cur.execute(sql)
            while True:
                rows = cur.fetchmany(batch_size)
                if not rows:
                    break
                yield from rows

# Consumer uses constant memory no matter the table size
for row in stream_query("SELECT * FROM huge_table"):
    process(row)

# Batch generator for bulk inserts
from itertools import islice
def batched(iterable, n):
    it = iter(iterable)
    while (batch := list(islice(it, n))):
        yield batch

# Bulk write 1000 rows at a time
for batch in batched(records, 1000):
    cursor.executemany(INSERT_SQL, batch)

Testing Data Pipelines

# test_transforms.py
import pandas as pd
import pytest
from my_pipeline import clean_orders

def test_clean_orders_removes_negative_amounts():
    raw = pd.DataFrame({
        'order_id': [1, 2, 3],
        'amount': [100.0, -5.0, 250.0],
    })
    result = clean_orders(raw)
    assert len(result) == 2
    assert (result['amount'] > 0).all()

def test_clean_orders_preserves_schema():
    raw = pd.DataFrame({'order_id': [1], 'amount': [100.0]})
    result = clean_orders(raw)
    assert set(result.columns) == {'order_id', 'amount', 'order_month'}

@pytest.mark.parametrize("amount,expected", [
    (0.01, True), (999.99, True), (1000.01, False), (-1, False),
])
def test_valid_amount(amount, expected):
    assert is_valid_amount(amount) is expected

# Run: pytest -v --cov=my_pipeline --cov-report=term-missing

Production Pipeline Best Practices

  • Never hardcode credentials — use environment variables, AWS Secrets Manager, or Snowflake key-pair auth
  • Always use parameterized queries (%(name)s or %s) — never f-string SQL (SQL injection risk)
  • Wrap connections in context managers (with statement) so they close even on exception
  • Use logging, not print() — configure root logger with JSON formatter for production
  • Validate input schemas with pydantic BaseModel — fail fast on bad data
  • Prefer Parquet over CSV for intermediate files: 10-100x faster I/O, preserves dtypes
  • Use Snowpark DataFrames when possible — pushes compute to Snowflake, avoids data egress
  • Pin dependencies in requirements.txt or pyproject.toml — unpinned versions break builds silently
  • Add retries with exponential backoff for network calls (tenacity library)
  • Write tests before productionizing — pytest + moto (mock AWS) + testcontainers (real Postgres)

Frequently Asked Questions

Should I use pandas or Snowpark for data engineering?

Use Snowpark when your data lives in Snowflake and transformations are SQL-expressible — compute runs on the Snowflake warehouse, no data egress, scales to billions of rows. Use pandas for local transforms under ~5GB, or when you need libraries like scikit-learn that require Python objects. Rule: if you find yourself pulling a large table out of Snowflake only to transform and push it back, switch to Snowpark.

Why is my pandas script slow on a 2GB CSV?

Common fixes: (1) Read Parquet instead of CSV — 10-100x faster. (2) Specify dtypes on read to avoid object columns. (3) Use category dtype for low-cardinality strings. (4) Vectorize operations — iterrows() is 100x slower than column arithmetic. (5) If still slow, switch to polars (lazy evaluation, multi-core). (6) For over 10GB, push compute to Snowflake via Snowpark.

How do I handle secrets in Python data pipelines?

Use environment variables loaded from a secret manager at runtime: AWS Secrets Manager, Azure Key Vault, or Snowflake key-pair authentication. Never commit credentials to git. In CI/CD use GitHub Actions secrets or Vault. For local dev use a .env file with python-dotenv (gitignored). For Snowflake specifically, prefer key-pair auth over passwords — it is more secure and does not expire.

What is the best way to unit-test a data pipeline?

Separate pure transform logic (deterministic functions taking a DataFrame, returning a DataFrame) from I/O (read/write). Unit-test the transforms with small fixture DataFrames using pytest. Mock external services with moto (AWS) or responses (HTTP). Use testcontainers or a real Snowflake test account for integration tests. Target about 80% coverage on transform logic and 100% on critical financial/compliance calculations.

When should I use generators instead of lists?

Use generators when processing data larger than memory (streaming from Snowflake, reading large files line by line), when you only need the result once (no re-iteration needed), or when you want lazy evaluation. Use lists when you need len(), indexing, or multiple iterations. Generators use O(1) memory regardless of data size — critical for pipelines processing millions of rows.

How do I structure a production ETL Python project?

Standard layout: src/pipeline/{extract,transform,load}.py for stages, src/pipeline/config.py for settings (pydantic-settings), src/pipeline/io/ for external connectors, tests/ mirroring src, pyproject.toml pinning deps, Dockerfile for deployment. Keep main.py thin — just wiring. Use dependency injection for connections so tests can inject mocks.

Polars vs pandas — which should I pick in 2026?

Polars is faster (Rust + multi-core), uses less memory, and has a cleaner API, but the ecosystem (scikit-learn, plotting libraries, tutorials) is smaller. Pick polars for new ETL work especially over 1GB where speed matters. Stay with pandas when integrating with an existing pandas codebase, using scikit-learn heavily, or onboarding junior engineers — pandas has more Stack Overflow answers.

How do I retry failed API calls gracefully?

Use the tenacity library: @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=60)) decorator retries on exception with exponential backoff. Only retry idempotent operations. Set a circuit breaker for chronic failures. Log each retry attempt so you can alert when retries exceed normal baseline.

Related Cheat Sheets

PySpark & Spark SQL Cheat Sheetdbt Commands Cheat SheetSnowflake SQL Cheat SheetData Engineering Interview Questions & Answers
← All Cheat Sheets