// quick reference
Code Cheatsheet
Copy-paste patterns for the most common data engineering tasks — SQL, PySpark, dbt, Airflow, and more.
dbt
Incremental modelsql
{{ config(
materialized='incremental',
unique_key='order_id'
) }}
SELECT order_id, customer_id, total_amount
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}Schema test (schema.yml)yaml
models:
- name: fact_orders
columns:
- name: order_id
tests: [unique, not_null]
- name: status
tests:
- accepted_values:
values: ['pending','shipped','delivered']PySpark
Read JSON → clean → write Parquetpython
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
spark = SparkSession.builder.appName("etl").getOrCreate()
df = spark.read.json("s3://raw/orders/")
df_clean = (df
.filter(col("status") != "cancelled")
.withColumn("order_date", to_date(col("ts")))
.dropDuplicates(["order_id"]))
df_clean.write \
.mode("overwrite") \
.partitionBy("order_date") \
.parquet("s3://silver/orders/")Fix data skew with broadcast joinpython
from pyspark.sql.functions import broadcast # Small table < 200MB → broadcast to all nodes result = large_df.join( broadcast(small_dim_df), "customer_id" )
SQL Patterns
Deduplicate — keep latest row per keysql
WITH ranked AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY updated_at DESC
) AS rn
FROM raw_customers
)
SELECT * FROM ranked WHERE rn = 1;SCD Type 2 MERGEsql
MERGE INTO dim_customer AS tgt USING stg_customer AS src ON tgt.customer_id = src.customer_id AND tgt.is_current = TRUE WHEN MATCHED AND tgt.city != src.city THEN UPDATE SET is_current = FALSE, valid_to = CURRENT_DATE WHEN NOT MATCHED THEN INSERT (customer_id, city, valid_from, valid_to, is_current) VALUES (src.customer_id, src.city, CURRENT_DATE, '9999-12-31', TRUE);
Running total & lagsql
SELECT order_date, revenue, SUM(revenue) OVER (ORDER BY order_date) AS cumulative_revenue, LAG(revenue) OVER (ORDER BY order_date) AS prev_day_revenue, revenue - LAG(revenue) OVER (ORDER BY order_date) AS day_over_day FROM daily_revenue;
Apache Airflow
Basic DAG skeletonpython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id="orders_etl",
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={"retries": 2,
"retry_delay": timedelta(minutes=5)},
) as dag:
extract = PythonOperator(
task_id="extract", python_callable=extract_fn)
transform = PythonOperator(
task_id="transform", python_callable=transform_fn)
load = PythonOperator(
task_id="load", python_callable=load_fn)
extract >> transform >> loadDebezium CDC
Change event structurejson
{
"op": "u",
"before": { "id": 1, "status": "pending" },
"after": { "id": 1, "status": "shipped" },
"source": {
"table": "orders",
"ts_ms": 1704067200000,
"db": "myapp"
}
}
// op: c=create, u=update, d=delete, r=read(snapshot)Idempotent Load Pattern
Safe re-runnable daily loadsql
-- Delete the partition being processed first
DELETE FROM fact_orders
WHERE order_date = '{{ ds }}';
-- Then re-insert cleanly
INSERT INTO fact_orders
SELECT
order_id,
customer_id,
order_date,
total_amount
FROM stg_orders
WHERE order_date = '{{ ds }}';Looking for more? Browse tool-specific guides or interview questions.