IntermediateLast updated: 2026-04-16 • 6 sections
PySpark DataFrame API, Spark SQL, window functions, partitioning, caching, and performance tuning for production data pipelines.
from pyspark.sql import SparkSession, functions as F, Window
spark = (
SparkSession.builder
.appName("orders-etl")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.getOrCreate()
)
# Read sources
orders = spark.read.parquet("s3://bucket/orders/")
customers = spark.read.format("delta").load("s3://bucket/customers/")
# Transform
clean = (
orders
.filter(F.col("amount") > 0)
.withColumn("order_month", F.date_trunc("month", F.col("order_date")))
.withColumn("is_big", F.col("amount") > 1000)
.join(customers, "customer_id", "left")
.select("order_id", "customer_id", "amount", "order_month", "country")
)
# Aggregate
monthly = (
clean.groupBy("order_month", "country")
.agg(
F.sum("amount").alias("revenue"),
F.count("order_id").alias("orders"),
F.countDistinct("customer_id").alias("unique_customers"),
)
)
# Write (partitioned)
(monthly.write
.mode("overwrite")
.partitionBy("order_month")
.format("delta")
.save("s3://bucket/monthly_summary/"))from pyspark.sql import Window, functions as F
# Running total per customer, ordered by date
w_running = Window.partitionBy("customer_id").orderBy("order_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = orders.withColumn("lifetime_spend", F.sum("amount").over(w_running))
# Rank orders within customer (1 = largest)
w_rank = Window.partitionBy("customer_id").orderBy(F.desc("amount"))
df = df.withColumn("order_rank", F.row_number().over(w_rank))
# 7-day rolling average (requires ordered time window)
w_rolling = Window.partitionBy("customer_id") \
.orderBy(F.col("order_date").cast("long")) \
.rangeBetween(-6 * 86400, 0) # 6 days before to current
df = df.withColumn("avg_7d", F.avg("amount").over(w_rolling))
# Lag / Lead for change detection
w_seq = Window.partitionBy("customer_id").orderBy("order_date")
df = df.withColumn("prev_amount", F.lag("amount").over(w_seq))
df = df.withColumn("amount_change", F.col("amount") - F.col("prev_amount"))| Strategy | When Used | How to Force |
|---|---|---|
| Broadcast Hash Join | Small side under spark.sql.autoBroadcastJoinThreshold (default 10MB) | F.broadcast(small_df) hint |
| Sort-Merge Join | Both sides large, sortable by join key | Default for large joins |
| Shuffle Hash Join | One side fits in memory per partition | Rare, AQE chooses |
| Broadcast Nested Loop | Non-equi joins with a small side | F.broadcast() + non-equal condition |
| Cartesian Product | No join condition (danger) | crossJoin() — use with extreme care |
-- Register DataFrame as a temp view
-- orders.createOrReplaceTempView("orders")
-- Rank top-5 customers per country by revenue
SELECT *
FROM (
SELECT
country,
customer_id,
SUM(amount) AS revenue,
RANK() OVER (PARTITION BY country ORDER BY SUM(amount) DESC) AS rnk
FROM orders
GROUP BY country, customer_id
)
WHERE rnk <= 5;
-- Pivot monthly revenue by country
SELECT * FROM orders
PIVOT (
SUM(amount) FOR country IN ('US', 'UK', 'DE', 'IN')
);
-- Explode array column into rows
SELECT order_id, item
FROM orders
LATERAL VIEW EXPLODE(items) t AS item;
-- Broadcast a small lookup
SELECT /*+ BROADCAST(c) */ o.*, c.country
FROM orders o JOIN customers c ON o.customer_id = c.customer_id;Q: What is the difference between narrow and wide transformations?
Narrow transformations (map, filter, union) produce child partitions that depend on exactly one parent partition — no shuffle. Wide transformations (groupBy, join, distinct) require data from multiple partitions — triggers a shuffle which writes intermediate data to disk and reads it over the network. Wide transforms are expensive; minimize them by filtering/projecting before grouping.
Q: How do you handle data skew?
Symptoms: one task runs 10-100x slower than peers in the same stage. Fixes: (1) Enable AQE skew join: spark.sql.adaptive.skewJoin.enabled=true — automatically splits skewed partitions. (2) Salting: add a random prefix to the skewed key, group, then group again without salt. (3) Broadcast the smaller side if possible. (4) Isolate hot keys and process separately. (5) Increase spark.sql.shuffle.partitions so skewed data spreads further.
Q: cache() vs persist() — what is the difference?
cache() is shorthand for persist(StorageLevel.MEMORY_AND_DISK). persist() accepts any StorageLevel: MEMORY_ONLY (fastest, risk OOM), MEMORY_AND_DISK (default), DISK_ONLY (slowest but safe), MEMORY_ONLY_SER (serialized, saves memory), OFF_HEAP (avoids GC). Always call an action after cache() (e.g., df.count()) to materialize it. Unpersist when no longer needed to free memory.
Q: When should you use PySpark vs pandas vs Snowpark?
PySpark: data over 100GB, distributed compute needed, Delta/Iceberg workloads on Databricks/EMR. Pandas: under 5GB in-memory, rich Python ecosystem, single-node. Snowpark: data already in Snowflake — pushes compute to Snowflake warehouse, no egress, uses Snowflake governance. For Snowflake-centric stacks, Snowpark usually beats PySpark on ops simplicity and cost.
Rule of thumb: executor memory 4-8GB, executor cores 4-5 (higher values hurt parallelism due to JVM GC), driver memory 2x the largest broadcast. Set spark.sql.shuffle.partitions to ~2-3x the total cores. Enable AQE so Spark auto-tunes partition counts. Start with 4 executors, scale based on stage duration. For Databricks, use job clusters (terminate after run) not all-purpose clusters.
Common causes: (1) collect() or toPandas() on a large DataFrame brings all rows to driver — replace with write(). (2) Broadcasting a DataFrame too large (>100MB). (3) Accumulators collecting too much data. (4) Large number of files in a partitioned read (driver builds file list in memory). Fix: increase spark.driver.memory, avoid collect(), use checkpoint() to truncate lineage on iterative jobs.
Parquet: open columnar format, read/write only — no ACID, no time travel. Delta Lake: Parquet + transaction log + ACID + schema evolution, best on Databricks. Iceberg: open table format with ACID + time travel + hidden partitioning, broad engine support (Spark, Trino, Snowflake, Flink). Pick Delta if on Databricks, Iceberg if you want multi-engine portability (Snowflake + Spark + Trino reading same data).
Three ways: (1) CLI flags: spark-submit --conf spark.executor.memory=8g app.py. (2) spark-defaults.conf file — baseline cluster settings. (3) In code: SparkSession.builder.config(...). Precedence (highest to lowest): code > CLI > config file. For sensitive values use spark.kubernetes.driver.secrets or environment variables, never commit to git.
repartition(N) does a full shuffle to produce exactly N partitions — use when increasing partition count or rebalancing skewed data. coalesce(N) merges existing partitions without shuffle — use only when reducing partition count (e.g., before writing out). coalesce is cheaper but can create uneven partitions. For output: coalesce(1) only for tiny result sets; for large output, repartition on the partition column.
Workflow: (1) Open Spark UI — Stages tab, find the slowest stage. (2) Check Summary Metrics — is one task much slower? (skew). (3) Check Input Size / Shuffle Read — is data being re-read? (cache missing). (4) Check SQL tab — look at the physical plan for BroadcastExchange, SortMergeJoin, filters pushed down. (5) Check DAG visualization — excessive stages = unnecessary shuffles. Common wins: add broadcast hint, enable AQE, pre-filter before join.
Built-in functions (F.col, F.sum, F.date_trunc) run in the JVM and are optimized by Catalyst — fastest. Python UDFs serialize each row to Python and back — 10-100x slower due to serialization. Pandas UDFs (vectorized, F.pandas_udf) send Arrow batches to Python — much faster than regular UDFs. Rule: always prefer built-ins. If you must write custom logic, use pandas UDFs over Python UDFs.