Section 12

I/O & File Formats

Pandas reads one file at a time into one machine's memory. Spark reads directories of files distributed across executors. The file format choice determines whether Spark can use predicate pushdown (skipping data it doesn't need) and column pruning (reading only requested columns).

Format Comparison

Format Columnar Predicate Pushdown Splittable Use Case
ParquetYesYesYesDefault for everything
DeltaYesYesYesACID, time travel, merge
ORCYesYesYesHive ecosystems
CSVNoNoYes*Ingestion only, never intermediate
JSONNoNoYes*Semi-structured ingestion
AvroNo (row)NoYesSchema evolution, Kafka
# Reading
df = spark.read.parquet("s3://bucket/data/")           # reads entire directory
df = spark.read.parquet("s3://bucket/data/year=2024/")  # partition pruning

# Writing
df.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3://bucket/output/")

# Delta (Databricks / open-source delta-spark)
df.write.format("delta").mode("overwrite") \
    .saveAsTable("catalog.schema.table")
Use Parquet or Delta. Always.

CSV and JSON as intermediate formats waste compute, storage, and prevent optimizations. Read CSV once, write Parquet, and never look back. Delta adds ACID transactions, schema enforcement, and time travel on top of Parquet.

Section 13

Partitioning & Bucketing

Partitioning is the concept with no Pandas equivalent. In Pandas, your data is one contiguous block. In Spark, it's split into partitions — chunks that live on different machines and process in parallel. How data is partitioned determines everything about performance.

Disk Partitioning (Hive-style)

When you write .partitionBy("year", "month"), Spark creates a directory structure: year=2024/month=01/. On read, filters on these columns skip entire directories — no data scanned. This is partition pruning.

# Write with partitioning
df.write.partitionBy("region", "date").parquet("output/")

# This read only touches region=US partitions
spark.read.parquet("output/").filter(F.col("region") == "US")

In-Memory Partitions

# Check current partition count
df.rdd.getNumPartitions()

# Repartition — triggers full shuffle
df = df.repartition(200)                     # random, 200 partitions
df = df.repartition(200, "store_id")          # hash by store_id into 200 partitions

# Coalesce — reduces partitions WITHOUT shuffle (merges only)
df = df.coalesce(10)                          # merge down to 10 partitions
Repartition vs Coalesce

repartition(n) shuffles all data across the cluster — expensive but creates even partitions. coalesce(n) only reduces partition count by merging adjacent partitions — cheap but can create uneven sizes. Use coalesce before writing to reduce file count. Use repartition when you need even distribution or to partition by key.

Bucketing

Bucketing pre-sorts and pre-hashes data on disk by a column. Subsequent joins or aggregations on that column avoid shuffles entirely.

# Write bucketed table (persisted in metastore)
df.write.bucketBy(256, "store_id") \
    .sortBy("store_id") \
    .saveAsTable("bucketed_sales")

# Joins on store_id now avoid shuffle
sales = spark.table("bucketed_sales")
stores = spark.table("bucketed_stores")  # also bucketed by store_id, same count
result = sales.join(stores, "store_id")    # no shuffle
Section 14

Caching & Persistence

In Pandas, your DataFrame lives in memory until you delete it. In Spark, DataFrames are recomputed from the DAG every time an action is called — unless you explicitly cache them.

# Cache — stores in executor memory (deserialized)
df.cache()         # equivalent to df.persist(StorageLevel.MEMORY_AND_DISK)

# Persist with specific storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # serialized, spills to disk
df.persist(StorageLevel.DISK_ONLY)            # disk only, frees memory

# Trigger materialization and cache
df.cache()
df.count()  # action forces the cache to fill

# Release
df.unpersist()

When to Cache

Situation Cache? Why
DataFrame used in multiple actionsYesAvoids recomputing the DAG
After expensive join or aggregationYesPrevents re-shuffling
Inside iterative algorithms (ML)YesEach iteration would re-scan source
Used once then discardedNoCache overhead exceeds recompute cost
Dataset is very large, cluster memory tightMaybeUse DISK_ONLY or skip — eviction thrashing is worse
Cache is Lazy Too

df.cache() doesn't compute anything. It marks the DataFrame for caching. The cache fills on the next action. If you cache and immediately write, the first write computes and caches. The second action hits cache. Common pattern: df.cache().count() to force materialization.