Section 15
Performance & Debugging
Pandas performance tuning is about vectorization and memory layout. Spark performance tuning is about shuffles, partitions, and the Spark UI. The number one skill: reading the DAG.
Explain Plans
# See what Spark will actually do
df.explain() # physical plan
df.explain(True) # parsed → analyzed → optimized → physical
df.explain("formatted") # human-readable physical plan
Common Performance Killers
| Problem | Symptom | Fix |
|---|---|---|
| Data skew | One task takes 100x longer | Salt keys, isolate hot keys, enable AQE skew join |
| Too many small files | Slow reads, task scheduling overhead | coalesce() before write, compact Delta tables |
| Too few partitions | Executors idle, OOM on large partitions | repartition(), increase spark.sql.shuffle.partitions |
| UDF bottleneck | Slow tasks, Python worker overhead | Replace with built-in functions or Pandas UDFs |
| Unnecessary shuffle | Large shuffle read/write in Spark UI | broadcast() small tables, pre-bucket for repeated joins |
| Collecting to driver | Driver OOM | Aggregate first, then collect. Never .toPandas() raw data. |
Key Spark Configurations
# Adaptive Query Execution (Spark 3+) — enables runtime optimization
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Shuffle partitions — default 200, tune to cluster size
spark.conf.set("spark.sql.shuffle.partitions", "auto") # AQE auto-tunes
# Broadcast threshold (bytes)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") # 10MB
The Spark UI Is Your Debugger
There is no pdb for distributed data. The Spark UI (port 4040 locally, built into Databricks) shows job stages, task durations, shuffle sizes, and executor metrics. A single slow task in the UI tells you more than an hour of staring at code. Learn to read the DAG visualization and the stage detail view.
Section 16
The Full Mapping Table
Reference sheet for translating Pandas operations to PySpark.
| Pandas | PySpark | Key Difference |
|---|---|---|
import pandas as pd | from pyspark.sql import functions as F | F.* is your new toolbox |
pd.read_csv("f.csv") | spark.read.csv("f.csv", header=True) | Reads directory, not single file |
df.head(5) | df.show(5) / df.limit(5) | .show() prints, .limit() returns DF |
df.shape | (df.count(), len(df.columns)) | .count() triggers full computation |
df.dtypes | df.dtypes / df.printSchema() | Spark types, not NumPy |
df.describe() | df.describe() / df.summary() | .summary() adds quartiles |
df["col"] | F.col("col") | Returns Column expression, not Series |
df[["a", "b"]] | df.select("a", "b") | Lazy selection |
df[df.x > 5] | df.filter(F.col("x") > 5) | Lazy filter |
df["new"] = expr | df.withColumn("new", expr) | Returns new DF (immutable) |
df.rename(columns={}) | df.withColumnRenamed("old", "new") | One column at a time (or use .toDF()) |
df.drop(columns=["x"]) | df.drop("x") | Returns new DF |
df.sort_values("col") | df.orderBy("col") / df.sort("col") | Triggers shuffle (global sort) |
df.groupby("g").sum() | df.groupBy("g").agg(F.sum("col")) | Must name aggregation explicitly |
pd.merge(a, b, on="k") | a.join(b, on="k") | Distributed join, watch for skew |
pd.concat([a, b]) | a.union(b) / a.unionByName(b) | unionByName matches by column name |
df.apply(func) | df.withColumn(..., udf(...)) | UDFs are slow — avoid when possible |
df.fillna(0) | df.fillna(0) / F.coalesce() | SQL null semantics |
df.isna() | F.col("x").isNull() | No .isna() method on columns |
df.drop_duplicates() | df.dropDuplicates() | No keep parameter — use window |
df.to_csv("out.csv") | df.write.csv("out/") | Writes directory of part files |
df.value_counts() | df.groupBy("col").count() | No shorthand |
df.shift(1) | F.lag("col", 1).over(w) | Requires explicit window spec |
len(df) | df.count() | Triggers full computation |