PySpark & Spark SQL Cheat Sheet

IntermediateLast updated: 2026-04-16 • 6 sections

PySpark DataFrame API, Spark SQL, window functions, partitioning, caching, and performance tuning for production data pipelines.

DataFrame API Essentials

from pyspark.sql import SparkSession, functions as F, Window

spark = (
    SparkSession.builder
    .appName("orders-etl")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .getOrCreate()
)

# Read sources
orders = spark.read.parquet("s3://bucket/orders/")
customers = spark.read.format("delta").load("s3://bucket/customers/")

# Transform
clean = (
    orders
    .filter(F.col("amount") > 0)
    .withColumn("order_month", F.date_trunc("month", F.col("order_date")))
    .withColumn("is_big", F.col("amount") > 1000)
    .join(customers, "customer_id", "left")
    .select("order_id", "customer_id", "amount", "order_month", "country")
)

# Aggregate
monthly = (
    clean.groupBy("order_month", "country")
    .agg(
        F.sum("amount").alias("revenue"),
        F.count("order_id").alias("orders"),
        F.countDistinct("customer_id").alias("unique_customers"),
    )
)

# Write (partitioned)
(monthly.write
    .mode("overwrite")
    .partitionBy("order_month")
    .format("delta")
    .save("s3://bucket/monthly_summary/"))

Window Functions

from pyspark.sql import Window, functions as F

# Running total per customer, ordered by date
w_running = Window.partitionBy("customer_id").orderBy("order_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = orders.withColumn("lifetime_spend", F.sum("amount").over(w_running))

# Rank orders within customer (1 = largest)
w_rank = Window.partitionBy("customer_id").orderBy(F.desc("amount"))
df = df.withColumn("order_rank", F.row_number().over(w_rank))

# 7-day rolling average (requires ordered time window)
w_rolling = Window.partitionBy("customer_id") \
    .orderBy(F.col("order_date").cast("long")) \
    .rangeBetween(-6 * 86400, 0)  # 6 days before to current

df = df.withColumn("avg_7d", F.avg("amount").over(w_rolling))

# Lag / Lead for change detection
w_seq = Window.partitionBy("customer_id").orderBy("order_date")
df = df.withColumn("prev_amount", F.lag("amount").over(w_seq))
df = df.withColumn("amount_change", F.col("amount") - F.col("prev_amount"))

Join Strategies & When Spark Picks Each

StrategyWhen UsedHow to Force
Broadcast Hash JoinSmall side under spark.sql.autoBroadcastJoinThreshold (default 10MB)F.broadcast(small_df) hint
Sort-Merge JoinBoth sides large, sortable by join keyDefault for large joins
Shuffle Hash JoinOne side fits in memory per partitionRare, AQE chooses
Broadcast Nested LoopNon-equi joins with a small sideF.broadcast() + non-equal condition
Cartesian ProductNo join condition (danger)crossJoin() — use with extreme care

Spark SQL — Common Patterns

-- Register DataFrame as a temp view
-- orders.createOrReplaceTempView("orders")

-- Rank top-5 customers per country by revenue
SELECT *
FROM (
  SELECT
    country,
    customer_id,
    SUM(amount) AS revenue,
    RANK() OVER (PARTITION BY country ORDER BY SUM(amount) DESC) AS rnk
  FROM orders
  GROUP BY country, customer_id
)
WHERE rnk <= 5;

-- Pivot monthly revenue by country
SELECT * FROM orders
PIVOT (
  SUM(amount) FOR country IN ('US', 'UK', 'DE', 'IN')
);

-- Explode array column into rows
SELECT order_id, item
FROM orders
LATERAL VIEW EXPLODE(items) t AS item;

-- Broadcast a small lookup
SELECT /*+ BROADCAST(c) */ o.*, c.country
FROM orders o JOIN customers c ON o.customer_id = c.customer_id;

Performance Tuning Checklist

  • Enable Adaptive Query Execution (AQE): spark.sql.adaptive.enabled=true — auto-coalesces partitions and handles skew
  • Partition by low-cardinality columns (country, month) — NOT by user_id (too granular, creates millions of files)
  • Use broadcast joins for dimension tables under 100MB: F.broadcast(dim_df)
  • Cache DataFrames reused 3+ times: df.cache() then df.count() to materialize
  • Prefer DataFrame API over RDD API — Catalyst optimizer only works on DataFrames
  • Avoid collect() on big data — brings all rows to driver and OOMs. Use write() instead
  • Set spark.sql.shuffle.partitions based on data size: target ~128MB per partition
  • Handle skew with salting: add random_key, aggregate, then aggregate again
  • Use columnar formats (Parquet, Delta, Iceberg) — predicate pushdown + column pruning
  • Monitor Spark UI Stages tab — look for skewed tasks (one task 10x slower than others)

Common Interview Q&A

Q: What is the difference between narrow and wide transformations?

Narrow transformations (map, filter, union) produce child partitions that depend on exactly one parent partition — no shuffle. Wide transformations (groupBy, join, distinct) require data from multiple partitions — triggers a shuffle which writes intermediate data to disk and reads it over the network. Wide transforms are expensive; minimize them by filtering/projecting before grouping.

Q: How do you handle data skew?

Symptoms: one task runs 10-100x slower than peers in the same stage. Fixes: (1) Enable AQE skew join: spark.sql.adaptive.skewJoin.enabled=true — automatically splits skewed partitions. (2) Salting: add a random prefix to the skewed key, group, then group again without salt. (3) Broadcast the smaller side if possible. (4) Isolate hot keys and process separately. (5) Increase spark.sql.shuffle.partitions so skewed data spreads further.

Q: cache() vs persist() — what is the difference?

cache() is shorthand for persist(StorageLevel.MEMORY_AND_DISK). persist() accepts any StorageLevel: MEMORY_ONLY (fastest, risk OOM), MEMORY_AND_DISK (default), DISK_ONLY (slowest but safe), MEMORY_ONLY_SER (serialized, saves memory), OFF_HEAP (avoids GC). Always call an action after cache() (e.g., df.count()) to materialize it. Unpersist when no longer needed to free memory.

Q: When should you use PySpark vs pandas vs Snowpark?

PySpark: data over 100GB, distributed compute needed, Delta/Iceberg workloads on Databricks/EMR. Pandas: under 5GB in-memory, rich Python ecosystem, single-node. Snowpark: data already in Snowflake — pushes compute to Snowflake warehouse, no egress, uses Snowflake governance. For Snowflake-centric stacks, Snowpark usually beats PySpark on ops simplicity and cost.

Frequently Asked Questions

What Spark cluster configuration should I start with?

Rule of thumb: executor memory 4-8GB, executor cores 4-5 (higher values hurt parallelism due to JVM GC), driver memory 2x the largest broadcast. Set spark.sql.shuffle.partitions to ~2-3x the total cores. Enable AQE so Spark auto-tunes partition counts. Start with 4 executors, scale based on stage duration. For Databricks, use job clusters (terminate after run) not all-purpose clusters.

Why is my Spark job OOMing on the driver?

Common causes: (1) collect() or toPandas() on a large DataFrame brings all rows to driver — replace with write(). (2) Broadcasting a DataFrame too large (>100MB). (3) Accumulators collecting too much data. (4) Large number of files in a partitioned read (driver builds file list in memory). Fix: increase spark.driver.memory, avoid collect(), use checkpoint() to truncate lineage on iterative jobs.

Parquet vs Delta vs Iceberg for Spark — which?

Parquet: open columnar format, read/write only — no ACID, no time travel. Delta Lake: Parquet + transaction log + ACID + schema evolution, best on Databricks. Iceberg: open table format with ACID + time travel + hidden partitioning, broad engine support (Spark, Trino, Snowflake, Flink). Pick Delta if on Databricks, Iceberg if you want multi-engine portability (Snowflake + Spark + Trino reading same data).

How do you pass configs to spark-submit in production?

Three ways: (1) CLI flags: spark-submit --conf spark.executor.memory=8g app.py. (2) spark-defaults.conf file — baseline cluster settings. (3) In code: SparkSession.builder.config(...). Precedence (highest to lowest): code > CLI > config file. For sensitive values use spark.kubernetes.driver.secrets or environment variables, never commit to git.

repartition() vs coalesce() — when to use each?

repartition(N) does a full shuffle to produce exactly N partitions — use when increasing partition count or rebalancing skewed data. coalesce(N) merges existing partitions without shuffle — use only when reducing partition count (e.g., before writing out). coalesce is cheaper but can create uneven partitions. For output: coalesce(1) only for tiny result sets; for large output, repartition on the partition column.

How do I debug a slow PySpark job?

Workflow: (1) Open Spark UI — Stages tab, find the slowest stage. (2) Check Summary Metrics — is one task much slower? (skew). (3) Check Input Size / Shuffle Read — is data being re-read? (cache missing). (4) Check SQL tab — look at the physical plan for BroadcastExchange, SortMergeJoin, filters pushed down. (5) Check DAG visualization — excessive stages = unnecessary shuffles. Common wins: add broadcast hint, enable AQE, pre-filter before join.

How are Spark UDFs different from built-in functions?

Built-in functions (F.col, F.sum, F.date_trunc) run in the JVM and are optimized by Catalyst — fastest. Python UDFs serialize each row to Python and back — 10-100x slower due to serialization. Pandas UDFs (vectorized, F.pandas_udf) send Arrow batches to Python — much faster than regular UDFs. Rule: always prefer built-ins. If you must write custom logic, use pandas UDFs over Python UDFs.

Related Cheat Sheets

Python for Data Engineers Cheat SheetDatabricks SQL & Workflows Cheat SheetSQL Window Functions Cheat SheetData Engineering Interview Questions & Answers
← All Cheat Sheets