PySpark Data Manipulation Cheatsheet
The essentials for getting things done fast in PySpark: quick diagnostics, robust cleaning, and practical transformations. Copy-paste ready, big data friendly.
Bookmark this for future reference
If you want, leave your email so I can notify you about new learning materials for Databricks (Playbooks, Cheatsheets, Hands-on Labs and more!)
1. Diagnostics (Explore & sanity-check at scale)
Structure, size, and a tiny unbiased peek
from pyspark.sql import functions as F
# Reveal column names and dtypes
df.printSchema()
# Total row count
df.selectExpr("count(*) AS rows").show()
# Take a tiny unbiased peek (won't overload driver)
df.sample(False, 0.001, 42).limit(20).show()
Quick data health
from pyspark.sql import functions as F
# Count nulls per column
df.select([
F.count(F.when(F.col(c).isNull(), 1)).alias(f"{c}_nulls")
for c in df.columns
]).show()
# Approximate cardinality (NDV) per column
df.select([
F.approx_count_distinct(c).alias(f"{c}_ndv")
for c in df.columns
]).show()
Numeric distributions (approximate quantiles)
# Compute approximate min/median/max at scale
numeric_cols = [c for c, t in df.dtypes if t in ("int", "bigint", "double", "float", "decimal")]
qs = df.approxQuantile(numeric_cols, [0.0, 0.5, 1.0], 0.01)
print(dict(zip(numeric_cols, qs)))
Top frequent categories (detect skew and surprises)
from pyspark.sql import functions as F
categorical_cols = [c for c, t in df.dtypes if t in ("string", "boolean")]
for c in categorical_cols[:5]:
df.groupBy(c).count().orderBy(F.desc("count")).limit(20).show()
2. Cleaning (Make data usable and consistent)
Categorical sanity checks and null statistics
from pyspark.sql import functions as F
from pyspark.sql import types as T
# Frequency table - useful for detecting data skew
df.groupBy("country").count().orderBy(F.desc("count")).show(20, False)
# Nulls per column
df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()
Handle missing values
# Fill selected columns
df = df.na.fill({"country": "Unknown", "age": 0})
# Fill all string columns
df = df.fillna("N/A")
# Drop rows with null id
df = df.dropna(how="any", subset=["id"])
Duplicates and de-duplication
# Remove full-row duplicates
df = df.dropDuplicates()
# Keep first row per id
df = df.dropDuplicates(["id"])
Type casting & safe conversions
from pyspark.sql import functions as F
from pyspark.sql import types as T
# Cast type
df = df.withColumn("age", F.col("age").cast(T.IntegerType()))
# Parse date
df = df.withColumn("dt", F.to_date("date_str", "yyyy-MM-dd"))
Value standardization & replacement
# Normalize values
df = df.replace({"US": "USA", "U.S.": "USA"}, subset=["country"])
# Lowercase strings
df = df.withColumn("gender", F.lower(F.col("gender")))
String cleanup
# Trim spaces
df = df.withColumn("email", F.trim(F.col("email")))
# Keep only digits
df = df.withColumn("phone", F.regexp_replace("phone", r"\D", ""))
# Collapse many spaces to one
df = df.withColumn("name", F.regexp_replace("name", r"\s+", " "))
Outlier and invalid filtering
# Drop outliers
df = df.filter((F.col("age") >= 0) & (F.col("age") <= 120))
# Ensure non-null price
df = df.filter(~F.col("price").isNull())
Column ops: drop, rename
# Drop column
df = df.drop("temp_flag")
# Rename column
df = df.withColumnRenamed("userId", "user_id")
3. Transformation (Shape and enrich your data)
Column creation & renaming
from pyspark.sql import functions as F
# Literal value
df = df.withColumn("source_system", F.lit("SystemA"))
# Derive from existing columns
df = df.withColumn("revenue", F.col("price") * F.col("quantity"))
# Rename a column
df = df.withColumnRenamed("cust_id", "customer_id")
# SQL expression
df = df.withColumn("is_discounted", F.expr("price < original_price"))
Conditional logic
df = df.withColumn(
"customer_segment",
F.when(F.col("total_spent") > 5000, "Premium")
.when(F.col("total_spent") > 1000, "Standard")
.otherwise("Basic")
)
String transformations
# Concatenate
df = df.withColumn("full_address", F.concat_ws(", ", "street", "city", "zip_code"))
# Regex extract
df = df.withColumn("product_code", F.regexp_extract("product_id", r"([A-Z]+)", 1))
# Split into array
df = df.withColumn("tags_array", F.split(F.col("tags_string"), ","))
Date & time transformations
# Extract year
df = df.withColumn("order_year", F.year("order_date"))
# Days between dates
df = df.withColumn("days_to_ship", F.datediff(F.col("ship_date"), F.col("order_date")))
# Format timestamp for reporting
df = df.withColumn("order_hour", F.date_format("order_timestamp", "HH"))
Structuring & flattening
# Create nested struct
df = df.withColumn("customer_details", F.struct("customer_id", "email"))
# Flatten a struct
df = df.select("order_id", "customer_details.*")
# Explode array into rows
df = df.withColumn("product_item", F.explode("products_array"))
Aggregations & rollups
# Daily summary
daily_sales = df.groupBy("order_date").agg(F.sum("revenue").alias("total_revenue"))
# Pivot rows into columns
quarterly_sales = df.groupBy("product_category").pivot("order_quarter").sum("revenue")
Window (analytical) functions
from pyspark.sql.window import Window
# Rank within a group
window_spec = Window.partitionBy("category").orderBy(F.desc("sales"))
df = df.withColumn("sales_rank", F.rank().over(window_spec))
# Running total
df = df.withColumn(
"running_total",
F.sum("sales").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)
Joining & merging
# Enrich with a lookup table
df_enriched = df.join(df_customers, "customer_id", "left")
# Union by name (aligned columns)
df_all_years = df_2023.unionByName(df_2024)