Window Functions
Window functions exist in Pandas but are less central. In Spark, they replace most of what you'd use .iloc, .shift(), .rank(), and .rolling() for — and they work distributed.
from pyspark.sql.window import Window
# Define window spec
w = Window.partitionBy("store_id").orderBy("timestamp")
df = df.withColumns({
# Rank within each store
"rank": F.row_number().over(w),
# Previous row's value (shift equivalent)
"prev_revenue": F.lag("revenue", 1).over(w),
# Running sum
"cumulative_rev": F.sum("revenue").over(w.rowsBetween(Window.unboundedPreceding, 0)),
# Rolling average (last 7 rows)
"rolling_avg": F.avg("revenue").over(w.rowsBetween(-6, 0)),
})
Pandas vs Spark Window Comparison
| Operation | Pandas | PySpark |
|---|---|---|
| Rank | df.groupby("g")["v"].rank() | F.rank().over(w) |
| Shift / Lag | df.groupby("g")["v"].shift(1) | F.lag("v", 1).over(w) |
| Cumulative sum | df.groupby("g")["v"].cumsum() | F.sum("v").over(w.rowsBetween(...)) |
| Rolling mean | df["v"].rolling(7).mean() | F.avg("v").over(w.rowsBetween(-6, 0)) |
| Pct of group | df["v"] / df.groupby("g")["v"].transform("sum") | F.col("v") / F.sum("v").over(w_unbound) |
Without an explicit frame, aggregate window functions default to RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW when an orderBy is present. This means F.sum("v").over(w) is a running sum, not a group sum. Use Window.partitionBy("g") without orderBy for full-partition aggregates, or specify rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) explicitly.
UDFs & When to Avoid Them
In Pandas, .apply() is natural — it runs your Python function row by row. In Spark, UDFs exist but are a performance trap. Every UDF call serializes data from JVM to Python, runs your function, and serializes back. This kills Catalyst optimizations and Tungsten code generation.
The Cost Hierarchy
# BAD — row-level UDF (serializes every row to Python)
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
@udf(returnType=DoubleType())
def slow_normalize(value, mean, std):
return float((value - mean) / std)
# GOOD — built-in expression (runs as JVM bytecode)
df = df.withColumn("normalized",
(F.col("value") - F.col("mean")) / F.col("std")
)
Pandas UDFs (Vectorized)
When you genuinely need Python logic, use Pandas UDFs. They operate on batches (Arrow-based columnar transfer) instead of rows. Roughly 10-100x faster than row UDFs.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def vectorized_zscore(value: pd.Series, mean: pd.Series, std: pd.Series) -> pd.Series:
return (value - mean) / std
df = df.withColumn("zscore",
vectorized_zscore(F.col("value"), F.col("mean"), F.col("std"))
)
GroupBy Apply (applyInPandas)
The Spark equivalent of df.groupby().apply(). Each group is converted to a Pandas DataFrame, your function runs, and the result is reassembled into a Spark DataFrame.
def fit_per_store(pdf: pd.DataFrame) -> pd.DataFrame:
# pdf is a regular Pandas DataFrame for one group
# Fit a model, add predictions, whatever you need
pdf["prediction"] = some_sklearn_model.predict(pdf[["feature1", "feature2"]])
return pdf
result = df.groupby("store_id").applyInPandas(fit_per_store, schema=output_schema)
Can you express it with F.* functions? Do that. Need vectorized math? Pandas UDF. Need full Pandas per group (sklearn, scipy)? applyInPandas. Need row-level Python with side effects? Row UDF — and question why.