IntermediateLast updated: 2026-04-16 • 6 sections
Essential Python patterns for data engineering: pandas, itertools, generators, connecting to Snowflake, writing production-safe ETL scripts, and testing data pipelines.
| Library | Use Case | Install |
|---|---|---|
| pandas | In-memory tabular transforms under ~5GB | pip install pandas |
| polars | Faster pandas alternative, lazy eval, Arrow-native | pip install polars |
| pyarrow | Columnar memory format, Parquet read/write | pip install pyarrow |
| snowflake-connector-python | Connect to Snowflake, fetch_pandas_all() | pip install snowflake-connector-python[pandas] |
| snowflake-snowpark-python | Push computation to Snowflake warehouse | pip install snowflake-snowpark-python |
| sqlalchemy | Generic DB abstraction (Postgres, MySQL, Snowflake) | pip install sqlalchemy |
| boto3 | AWS S3 / Secrets Manager / Glue | pip install boto3 |
| pydantic | Runtime data validation, schema contracts | pip install pydantic |
| great-expectations | Data quality testing framework | pip install great-expectations |
| pytest | Unit + integration tests for pipelines | pip install pytest |
import os
import snowflake.connector
from contextlib import contextmanager
@contextmanager
def snowflake_conn():
"""Always close connection, even on error."""
conn = snowflake.connector.connect(
account=os.environ['SF_ACCOUNT'],
user=os.environ['SF_USER'],
password=os.environ['SF_PASSWORD'],
warehouse=os.environ['SF_WAREHOUSE'],
database=os.environ['SF_DATABASE'],
schema=os.environ['SF_SCHEMA'],
role=os.environ['SF_ROLE'],
client_session_keep_alive=False,
)
try:
yield conn
finally:
conn.close()
def run_query(sql: str, params: dict | None = None):
with snowflake_conn() as conn:
with conn.cursor() as cur:
cur.execute(sql, params or {})
return cur.fetchall()
# Parameterized query (SQL-injection safe)
rows = run_query(
"SELECT * FROM orders WHERE customer_id = %(cid)s AND order_date >= %(d)s",
{"cid": 12345, "d": "2026-01-01"},
)import pandas as pd
# Read Parquet (10-100x faster than CSV)
df = pd.read_parquet('s3://bucket/orders.parquet')
# Specify dtypes to cut memory 50%+
df = pd.read_csv('data.csv', dtype={
'order_id': 'int32',
'amount': 'float32',
'status': 'category',
})
# Vectorize — NEVER iterate row-by-row
# BAD: for idx, row in df.iterrows(): ...
# GOOD: df['total'] = df['qty'] * df['price']
# Use assign() for readable chained transforms
clean = (
df
.query('amount > 0')
.assign(
order_month=lambda d: d['order_date'].dt.to_period('M'),
is_big=lambda d: d['amount'] > 1000,
)
.groupby('order_month', as_index=False)
.agg(revenue=('amount', 'sum'), orders=('order_id', 'count'))
)
# Process giant files in chunks
for chunk in pd.read_csv('huge.csv', chunksize=100_000):
processed = chunk.pipe(transform).pipe(validate)
processed.to_parquet(f'out/{chunk.index[0]}.parquet')# Stream rows from Snowflake without loading all into memory
def stream_query(sql: str, batch_size: int = 10_000):
with snowflake_conn() as conn:
with conn.cursor() as cur:
cur.execute(sql)
while True:
rows = cur.fetchmany(batch_size)
if not rows:
break
yield from rows
# Consumer uses constant memory no matter the table size
for row in stream_query("SELECT * FROM huge_table"):
process(row)
# Batch generator for bulk inserts
from itertools import islice
def batched(iterable, n):
it = iter(iterable)
while (batch := list(islice(it, n))):
yield batch
# Bulk write 1000 rows at a time
for batch in batched(records, 1000):
cursor.executemany(INSERT_SQL, batch)# test_transforms.py
import pandas as pd
import pytest
from my_pipeline import clean_orders
def test_clean_orders_removes_negative_amounts():
raw = pd.DataFrame({
'order_id': [1, 2, 3],
'amount': [100.0, -5.0, 250.0],
})
result = clean_orders(raw)
assert len(result) == 2
assert (result['amount'] > 0).all()
def test_clean_orders_preserves_schema():
raw = pd.DataFrame({'order_id': [1], 'amount': [100.0]})
result = clean_orders(raw)
assert set(result.columns) == {'order_id', 'amount', 'order_month'}
@pytest.mark.parametrize("amount,expected", [
(0.01, True), (999.99, True), (1000.01, False), (-1, False),
])
def test_valid_amount(amount, expected):
assert is_valid_amount(amount) is expected
# Run: pytest -v --cov=my_pipeline --cov-report=term-missingUse Snowpark when your data lives in Snowflake and transformations are SQL-expressible — compute runs on the Snowflake warehouse, no data egress, scales to billions of rows. Use pandas for local transforms under ~5GB, or when you need libraries like scikit-learn that require Python objects. Rule: if you find yourself pulling a large table out of Snowflake only to transform and push it back, switch to Snowpark.
Common fixes: (1) Read Parquet instead of CSV — 10-100x faster. (2) Specify dtypes on read to avoid object columns. (3) Use category dtype for low-cardinality strings. (4) Vectorize operations — iterrows() is 100x slower than column arithmetic. (5) If still slow, switch to polars (lazy evaluation, multi-core). (6) For over 10GB, push compute to Snowflake via Snowpark.
Use environment variables loaded from a secret manager at runtime: AWS Secrets Manager, Azure Key Vault, or Snowflake key-pair authentication. Never commit credentials to git. In CI/CD use GitHub Actions secrets or Vault. For local dev use a .env file with python-dotenv (gitignored). For Snowflake specifically, prefer key-pair auth over passwords — it is more secure and does not expire.
Separate pure transform logic (deterministic functions taking a DataFrame, returning a DataFrame) from I/O (read/write). Unit-test the transforms with small fixture DataFrames using pytest. Mock external services with moto (AWS) or responses (HTTP). Use testcontainers or a real Snowflake test account for integration tests. Target about 80% coverage on transform logic and 100% on critical financial/compliance calculations.
Use generators when processing data larger than memory (streaming from Snowflake, reading large files line by line), when you only need the result once (no re-iteration needed), or when you want lazy evaluation. Use lists when you need len(), indexing, or multiple iterations. Generators use O(1) memory regardless of data size — critical for pipelines processing millions of rows.
Standard layout: src/pipeline/{extract,transform,load}.py for stages, src/pipeline/config.py for settings (pydantic-settings), src/pipeline/io/ for external connectors, tests/ mirroring src, pyproject.toml pinning deps, Dockerfile for deployment. Keep main.py thin — just wiring. Use dependency injection for connections so tests can inject mocks.
Polars is faster (Rust + multi-core), uses less memory, and has a cleaner API, but the ecosystem (scikit-learn, plotting libraries, tutorials) is smaller. Pick polars for new ETL work especially over 1GB where speed matters. Stay with pandas when integrating with an existing pandas codebase, using scikit-learn heavily, or onboarding junior engineers — pandas has more Stack Overflow answers.
Use the tenacity library: @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=60)) decorator retries on exception with exponential backoff. Only retry idempotent operations. Set a circuit breaker for chronic failures. Log each retry attempt so you can alert when retries exceed normal baseline.