GroupBy & Aggregations
Same concept, different syntax. Pandas aggregations return immediately. Spark aggregations add to the plan and trigger a shuffle — the most expensive operation in distributed computing.
# Single aggregation
df.groupby("store")["revenue"].sum()
# Multiple aggregations
df.groupby("store").agg(
total_rev=("revenue", "sum"),
avg_price=("price", "mean"),
num_txns=("id", "count")
)
# Single aggregation
df.groupBy("store").agg(F.sum("revenue"))
# Multiple aggregations
df.groupBy("store").agg(
F.sum("revenue").alias("total_rev"),
F.avg("price").alias("avg_price"),
F.count("id").alias("num_txns")
)
Pivot Tables
df.pivot_table(
values="revenue",
index="store",
columns="quarter",
aggfunc="sum"
)
df.groupBy("store") \
.pivot("quarter", ["Q1", "Q2", "Q3", "Q4"]) \
.agg(F.sum("revenue"))
Without explicit values, .pivot() triggers an extra action to discover distinct values — scanning the entire column. Always pass the list of pivot values explicitly.
Null Handling
Pandas uses NaN (a float) for missing numerics and None for objects, which causes type coercion and inconsistent behavior. Spark uses SQL null semantics uniformly across all types. Null propagation is well-defined: any operation involving null returns null.
df["col"].isna()
df["col"].fillna(0)
df.dropna(subset=["col"])
F.col("col").isNull()
F.coalesce(F.col("col"), F.lit(0))
df.dropna(subset=["col"]) # same API here
Spark distinguishes between null (missing) and NaN (IEEE 754 not-a-number). F.isnan() checks for NaN, .isNull() checks for null. They are not interchangeable. NaN == NaN is true in Spark (differs from IEEE and Pandas).
# Fill nulls with different strategies per column
df.fillna({"price": 0.0, "name": "unknown"})
# Null-safe equality (== returns null when either side is null)
df.filter(F.col("a").eqNullSafe(F.col("b")))
# Coalesce — first non-null value across columns
df.withColumn("val", F.coalesce("primary", "fallback", F.lit(0)))
Joins
Same concept, different execution. Pandas joins happen in memory on one machine. Spark joins are distributed and the join strategy dramatically affects performance.
pd.merge(left, right,
on="id",
how="left")
# Multi-key
pd.merge(left, right,
on=["id", "date"],
how="inner")
left.join(right,
on="id",
how="left")
# Multi-key
left.join(right,
on=["id", "date"],
how="inner")
Join Strategies
| Strategy | When | Mechanism |
|---|---|---|
Broadcast Hash Join |
One side < 10MB (default threshold) | Small table sent to all executors. No shuffle. |
Sort-Merge Join |
Both sides large | Both sides shuffled and sorted by key. Default for large-large. |
Shuffle Hash Join |
One side much smaller (but too big to broadcast) | Both sides shuffled by key, hash table built on smaller side. |
# Force broadcast when you know the right side is small
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), on="key")
# Avoid duplicate columns after join with same-name keys
result = left.join(right, left["id"] == right["id"], "inner") \
.drop(right["id"])
If one join key has 10M rows and all others have 100, that partition takes 100,000x longer. Solutions: salting the key, isolating the skewed value, or using spark.sql.adaptive.skewJoin.enabled=true (AQE handles this automatically in Spark 3+).