PySpark Data Manipulation Cheatsheet

The essentials for getting things done fast in PySpark: quick diagnostics, robust cleaning, and practical transformations. Copy-paste ready, big data friendly.

Bookmark this for future reference

If you want, leave your email so I can notify you about new learning materials for Databricks (Playbooks, Cheatsheets, Hands-on Labs and more!)

1. Diagnostics (Explore & sanity-check at scale)

Structure, size, and a tiny unbiased peek

from pyspark.sql import functions as F

# Reveal column names and dtypes
df.printSchema()

# Total row count
df.selectExpr("count(*) AS rows").show()

# Take a tiny unbiased peek (won't overload driver)
df.sample(False, 0.001, 42).limit(20).show()

Quick data health

from pyspark.sql import functions as F

# Count nulls per column
df.select([
    F.count(F.when(F.col(c).isNull(), 1)).alias(f"{c}_nulls")
    for c in df.columns
]).show()

# Approximate cardinality (NDV) per column
df.select([
    F.approx_count_distinct(c).alias(f"{c}_ndv")
    for c in df.columns
]).show()

Numeric distributions (approximate quantiles)

# Compute approximate min/median/max at scale
numeric_cols = [c for c, t in df.dtypes if t in ("int", "bigint", "double", "float", "decimal")]
qs = df.approxQuantile(numeric_cols, [0.0, 0.5, 1.0], 0.01)
print(dict(zip(numeric_cols, qs)))

Top frequent categories (detect skew and surprises)

from pyspark.sql import functions as F

categorical_cols = [c for c, t in df.dtypes if t in ("string", "boolean")]
for c in categorical_cols[:5]:
    df.groupBy(c).count().orderBy(F.desc("count")).limit(20).show()

2. Cleaning (Make data usable and consistent)

Categorical sanity checks and null statistics

from pyspark.sql import functions as F
from pyspark.sql import types as T

# Frequency table - useful for detecting data skew
df.groupBy("country").count().orderBy(F.desc("count")).show(20, False)

# Nulls per column
df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

Handle missing values

# Fill selected columns
df = df.na.fill({"country": "Unknown", "age": 0})

# Fill all string columns
df = df.fillna("N/A")

# Drop rows with null id
df = df.dropna(how="any", subset=["id"])

Duplicates and de-duplication

# Remove full-row duplicates
df = df.dropDuplicates()

# Keep first row per id
df = df.dropDuplicates(["id"])

Type casting & safe conversions

from pyspark.sql import functions as F
from pyspark.sql import types as T

# Cast type
df = df.withColumn("age", F.col("age").cast(T.IntegerType()))

# Parse date
df = df.withColumn("dt", F.to_date("date_str", "yyyy-MM-dd"))

Value standardization & replacement

# Normalize values
df = df.replace({"US": "USA", "U.S.": "USA"}, subset=["country"])

# Lowercase strings
df = df.withColumn("gender", F.lower(F.col("gender")))

String cleanup

# Trim spaces
df = df.withColumn("email", F.trim(F.col("email")))

# Keep only digits
df = df.withColumn("phone", F.regexp_replace("phone", r"\D", ""))

# Collapse many spaces to one
df = df.withColumn("name", F.regexp_replace("name", r"\s+", " "))

Outlier and invalid filtering

# Drop outliers
df = df.filter((F.col("age") >= 0) & (F.col("age") <= 120))

# Ensure non-null price
df = df.filter(~F.col("price").isNull())

Column ops: drop, rename

# Drop column
df = df.drop("temp_flag")

# Rename column
df = df.withColumnRenamed("userId", "user_id")

3. Transformation (Shape and enrich your data)

Column creation & renaming

from pyspark.sql import functions as F

# Literal value
df = df.withColumn("source_system", F.lit("SystemA"))

# Derive from existing columns
df = df.withColumn("revenue", F.col("price") * F.col("quantity"))

# Rename a column
df = df.withColumnRenamed("cust_id", "customer_id")

# SQL expression
df = df.withColumn("is_discounted", F.expr("price < original_price"))

Conditional logic

df = df.withColumn(
    "customer_segment",
    F.when(F.col("total_spent") > 5000, "Premium")
     .when(F.col("total_spent") > 1000, "Standard")
     .otherwise("Basic")
)

String transformations

# Concatenate
df = df.withColumn("full_address", F.concat_ws(", ", "street", "city", "zip_code"))

# Regex extract
df = df.withColumn("product_code", F.regexp_extract("product_id", r"([A-Z]+)", 1))

# Split into array
df = df.withColumn("tags_array", F.split(F.col("tags_string"), ","))

Date & time transformations

# Extract year
df = df.withColumn("order_year", F.year("order_date"))

# Days between dates
df = df.withColumn("days_to_ship", F.datediff(F.col("ship_date"), F.col("order_date")))

# Format timestamp for reporting
df = df.withColumn("order_hour", F.date_format("order_timestamp", "HH"))

Structuring & flattening

# Create nested struct
df = df.withColumn("customer_details", F.struct("customer_id", "email"))

# Flatten a struct
df = df.select("order_id", "customer_details.*")

# Explode array into rows
df = df.withColumn("product_item", F.explode("products_array"))

Aggregations & rollups

# Daily summary
daily_sales = df.groupBy("order_date").agg(F.sum("revenue").alias("total_revenue"))

# Pivot rows into columns
quarterly_sales = df.groupBy("product_category").pivot("order_quarter").sum("revenue")

Window (analytical) functions

from pyspark.sql.window import Window

# Rank within a group
window_spec = Window.partitionBy("category").orderBy(F.desc("sales"))
df = df.withColumn("sales_rank", F.rank().over(window_spec))

# Running total
df = df.withColumn(
    "running_total",
    F.sum("sales").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

Joining & merging

# Enrich with a lookup table
df_enriched = df.join(df_customers, "customer_id", "left")

# Union by name (aligned columns)
df_all_years = df_2023.unionByName(df_2024)
↑ Top