Section 10

Window Functions

Window functions exist in Pandas but are less central. In Spark, they replace most of what you'd use .iloc, .shift(), .rank(), and .rolling() for — and they work distributed.

from pyspark.sql.window import Window

# Define window spec
w = Window.partitionBy("store_id").orderBy("timestamp")

df = df.withColumns({
    # Rank within each store
    "rank": F.row_number().over(w),

    # Previous row's value (shift equivalent)
    "prev_revenue": F.lag("revenue", 1).over(w),

    # Running sum
    "cumulative_rev": F.sum("revenue").over(w.rowsBetween(Window.unboundedPreceding, 0)),

    # Rolling average (last 7 rows)
    "rolling_avg": F.avg("revenue").over(w.rowsBetween(-6, 0)),
})

Pandas vs Spark Window Comparison

Operation Pandas PySpark
Rankdf.groupby("g")["v"].rank()F.rank().over(w)
Shift / Lagdf.groupby("g")["v"].shift(1)F.lag("v", 1).over(w)
Cumulative sumdf.groupby("g")["v"].cumsum()F.sum("v").over(w.rowsBetween(...))
Rolling meandf["v"].rolling(7).mean()F.avg("v").over(w.rowsBetween(-6, 0))
Pct of groupdf["v"] / df.groupby("g")["v"].transform("sum")F.col("v") / F.sum("v").over(w_unbound)
Window Frame Matters

Without an explicit frame, aggregate window functions default to RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW when an orderBy is present. This means F.sum("v").over(w) is a running sum, not a group sum. Use Window.partitionBy("g") without orderBy for full-partition aggregates, or specify rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) explicitly.

Section 11

UDFs & When to Avoid Them

In Pandas, .apply() is natural — it runs your Python function row by row. In Spark, UDFs exist but are a performance trap. Every UDF call serializes data from JVM to Python, runs your function, and serializes back. This kills Catalyst optimizations and Tungsten code generation.

The Cost Hierarchy

Best
Built-in Functions
Acceptable
Pandas UDFs
Last Resort
Row-level UDFs
# BAD — row-level UDF (serializes every row to Python)
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

@udf(returnType=DoubleType())
def slow_normalize(value, mean, std):
    return float((value - mean) / std)

# GOOD — built-in expression (runs as JVM bytecode)
df = df.withColumn("normalized",
    (F.col("value") - F.col("mean")) / F.col("std")
)

Pandas UDFs (Vectorized)

When you genuinely need Python logic, use Pandas UDFs. They operate on batches (Arrow-based columnar transfer) instead of rows. Roughly 10-100x faster than row UDFs.

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def vectorized_zscore(value: pd.Series, mean: pd.Series, std: pd.Series) -> pd.Series:
    return (value - mean) / std

df = df.withColumn("zscore",
    vectorized_zscore(F.col("value"), F.col("mean"), F.col("std"))
)

GroupBy Apply (applyInPandas)

The Spark equivalent of df.groupby().apply(). Each group is converted to a Pandas DataFrame, your function runs, and the result is reassembled into a Spark DataFrame.

def fit_per_store(pdf: pd.DataFrame) -> pd.DataFrame:
    # pdf is a regular Pandas DataFrame for one group
    # Fit a model, add predictions, whatever you need
    pdf["prediction"] = some_sklearn_model.predict(pdf[["feature1", "feature2"]])
    return pdf

result = df.groupby("store_id").applyInPandas(fit_per_store, schema=output_schema)
Decision Rule

Can you express it with F.* functions? Do that. Need vectorized math? Pandas UDF. Need full Pandas per group (sklearn, scipy)? applyInPandas. Need row-level Python with side effects? Row UDF — and question why.