Code Cheatsheet

Copy-paste patterns for the most common data engineering tasks — SQL, PySpark, dbt, Airflow, and more.

dbt

Incremental modelsql
{{ config(
  materialized='incremental',
  unique_key='order_id'
) }}

SELECT order_id, customer_id, total_amount
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
  WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Schema test (schema.yml)yaml
models:
  - name: fact_orders
    columns:
      - name: order_id
        tests: [unique, not_null]
      - name: status
        tests:
          - accepted_values:
              values: ['pending','shipped','delivered']

PySpark

Read JSON → clean → write Parquetpython
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

spark = SparkSession.builder.appName("etl").getOrCreate()

df = spark.read.json("s3://raw/orders/")

df_clean = (df
  .filter(col("status") != "cancelled")
  .withColumn("order_date", to_date(col("ts")))
  .dropDuplicates(["order_id"]))

df_clean.write \
  .mode("overwrite") \
  .partitionBy("order_date") \
  .parquet("s3://silver/orders/")
Fix data skew with broadcast joinpython
from pyspark.sql.functions import broadcast

# Small table < 200MB → broadcast to all nodes
result = large_df.join(
  broadcast(small_dim_df), "customer_id"
)

SQL Patterns

Deduplicate — keep latest row per keysql
WITH ranked AS (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY customer_id
      ORDER BY updated_at DESC
    ) AS rn
  FROM raw_customers
)
SELECT * FROM ranked WHERE rn = 1;
SCD Type 2 MERGEsql
MERGE INTO dim_customer AS tgt
USING stg_customer AS src
  ON tgt.customer_id = src.customer_id
  AND tgt.is_current = TRUE
WHEN MATCHED AND tgt.city != src.city THEN
  UPDATE SET is_current = FALSE, valid_to = CURRENT_DATE
WHEN NOT MATCHED THEN
  INSERT (customer_id, city, valid_from, valid_to, is_current)
  VALUES (src.customer_id, src.city, CURRENT_DATE, '9999-12-31', TRUE);
Running total & lagsql
SELECT
  order_date,
  revenue,
  SUM(revenue) OVER (ORDER BY order_date) AS cumulative_revenue,
  LAG(revenue) OVER (ORDER BY order_date) AS prev_day_revenue,
  revenue - LAG(revenue) OVER (ORDER BY order_date) AS day_over_day
FROM daily_revenue;

Apache Airflow

Basic DAG skeletonpython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
  dag_id="orders_etl",
  schedule_interval="@daily",
  start_date=datetime(2024, 1, 1),
  catchup=False,
  default_args={"retries": 2,
    "retry_delay": timedelta(minutes=5)},
) as dag:
  extract = PythonOperator(
    task_id="extract", python_callable=extract_fn)
  transform = PythonOperator(
    task_id="transform", python_callable=transform_fn)
  load = PythonOperator(
    task_id="load", python_callable=load_fn)
  extract >> transform >> load

Debezium CDC

Change event structurejson
{
  "op": "u",
  "before": { "id": 1, "status": "pending" },
  "after":  { "id": 1, "status": "shipped" },
  "source": {
    "table": "orders",
    "ts_ms": 1704067200000,
    "db": "myapp"
  }
}
// op: c=create, u=update, d=delete, r=read(snapshot)

Idempotent Load Pattern

Safe re-runnable daily loadsql
-- Delete the partition being processed first
DELETE FROM fact_orders
WHERE order_date = '{{ ds }}';

-- Then re-insert cleanly
INSERT INTO fact_orders
SELECT
  order_id,
  customer_id,
  order_date,
  total_amount
FROM stg_orders
WHERE order_date = '{{ ds }}';
Looking for more? Browse tool-specific guides or interview questions.