A Typical postmodern painting, AI generated
The post-modern data stack

In this post, we walk through building a basic ETL (Extract-Transform-Load) pipeline. This is a toy example, intentionally over-simplistic, but it helped us explore how three modern Python tools work together.

The stack we used: dlt (data load tool), SQLMesh and DuckDB

A note before we begin: I’m familiar with DuckDB and SQLGlot, the SQL parser that SQLMesh is using under the hood, but I’m a newcomer to both dlt and SQLMesh, so take this as a learning notebook.

Outline

Here is a basic diagram showing the pipeline architecture

┌─────────────────┐      ┌──────────────────┐      ┌─────────────────┐
│    EXTRACT      │      │    TRANSFORM     │      │      LOAD       │
│                 │      │                  │      │                 │
│  dlt pipeline   │──▶───│  SQLMesh Models  │──▶───│  dlt pipeline   │
│  (yfinance      │      │  (DuckDB Engine) │      │  (DuckDB →      │
│   → DuckDB)     │      │                  │      │   SQL Server)   │
│                 │      │  raw → marts     │      │                 │
└─────────────────┘      └──────────────────┘      └─────────────────┘

About the Tools

Here’s a brief overview of each tool.

dlt (data load tool)

dlt is an open-source Python library designed to simplify data loading. What we found particularly useful is how it automates schema inference, data normalization, and incremental loading. Some key features:

  • Declarative pipelines: You define sources as Python generators using the @dlt.resource decorator
  • Schema evolution: It automatically handles schema changes as your data evolves
  • Multiple connectors: Works with DuckDB, SQL Server, BigQuery, Snowflake, and many others

SQLMesh

SQLMesh is a data transformation framework that brings software engineering practices to SQL development. We appreciated how it helped us think about transformations more systematically:

  • Version control: Track changes to your SQL models over time
  • Column-level lineage: See exactly which source columns affect which outputs, helpful for debugging
  • Virtual Data Environments: Test changes without duplicating data

DuckDB

DuckDB is an embedded analytical database optimized for OLAP (Online Analytical Processing) workloads, for analytics queries over large datasets rather than transactional operations:

  • In-process: No separate server to install or manage
  • Fast: Uses columnar storage with vectorized execution
  • Handles relatively large data: Out-of-core processing lets it work with datasets larger than available RAM

Python Requirements

duckdb
sqlmesh
polars
pandas  # for Yahoo Finance data ressource
yfinance
pyodbc
sqlalchemy
dlt[duckdb,mssql]
duckdb-engine

Setup & Configuration

Let’s start by importing the necessary libraries and defining the configuration. We are using stock market data as the example dataset; it’s freely available via Yahoo Finance and has enough complexity to demonstrate the transformation capabilities.

import json
import os
import subprocess
import warnings
from datetime import date

import dlt
import duckdb
import pandas as pd
import polars as pl
import yfinance as yf
from sqlglot import lineage
from sqlmesh import Context

warnings.filterwarnings("ignore", category=DeprecationWarning)

# Configuration
DUCKDB_FILE = "./financial_etl_dlt.duckdb"
SQLMESH_PROJECT_DIR = "./dlt_sqlmesh_project"
CREDENTIALS_FILE = "./credentials.json"

# Stock configuration
TICKERS = ["AAPL", "MSFT", "GOOGL", "AMZN", "META", "NVDA", "TSLA"]
START_DATE = "2020-01-01"
END_DATE = date.today().isoformat()

# SQL Server target configuration
TARGET_CONN_KEY = "ms_target_01"  # key in credentials JSON file
TARGET_SCHEMA = "dbo"
TARGET_TABLE = "Dim_Stock_Metrics"

print("Configuration loaded successfully!")
print(f"  - DuckDB file: {DUCKDB_FILE}")
print(f"  - SQLMesh project: {SQLMESH_PROJECT_DIR}")
print(f"  - Tickers: {TICKERS}")
print(f"  - Date range: {START_DATE} to {END_DATE}")
Configuration loaded successfully!
  - DuckDB file: ./financial_etl_dlt.duckdb
  - SQLMesh project: ./dlt_sqlmesh_project
  - Tickers: ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META', 'NVDA', 'TSLA']
  - Date range: 2020-01-01 to 2025-11-27

Extract: dlt + yfinance

With the configuration in place, we can move to the Extract phase. Here, we use dlt to pull stock data from Yahoo Finance and load it directly into DuckDB. The data includes OHLCV values: Open, High, Low, Close (prices), and Volume.

Creating a Custom dlt Resource

dlt uses the @dlt.resource decorator to define data sources. Each resource is a Python generator that yields data rows:

@dlt.resource(table_name="my_table")
def my_source():
    yield {"col1": "value1", "col2": 123}

Key parameters:

  • table_name: Name of the destination table
  • write_disposition: How to handle existing data (replace, append, merge)
  • columns: Optional schema hints for data types
# Define a custom dlt resource for Yahoo Finance data
@dlt.resource(
    table_name="eod_prices_raw",
    write_disposition="replace",
    columns={
        "ticker": {"data_type": "text"},
        "date": {"data_type": "date"},
        "open_price": {"data_type": "double"},
        "high_price": {"data_type": "double"},
        "low_price": {"data_type": "double"},
        "close_price": {"data_type": "double"},
        "volume": {"data_type": "bigint"},
    },
)
def yfinance_eod_prices(tickers: list[str], start_date: str, end_date: str):
    """
    Extract End-of-Day stock prices from Yahoo Finance.

    This is a custom dlt resource that yields OHLCV data for multiple tickers.
    dlt will automatically:
    - Infer the schema from the yielded dictionaries
    - Handle batching and loading to the destination
    - Track load metadata (_dlt_load_id, _dlt_id)
    """
    print(f"Downloading OHLCV data for {len(tickers)} tickers...")

    # Download data for all tickers at once
    data = yf.download(
        tickers, start=start_date, end=end_date, progress=False, auto_adjust=True
    )

    if data.empty:
        raise ValueError("No data returned from Yahoo Finance")

    # Process each ticker
    rows_yielded = 0
    for ticker in tickers:
        # Handle multi-ticker vs single-ticker response format
        if len(tickers) > 1:
            ticker_data = data.xs(ticker, level=1, axis=1)
        else:
            ticker_data = data

        # Yield each row with OHLCV data
        for date_idx, row in ticker_data.iterrows():
            if pd.notna(row["Close"]) and pd.notna(row["Volume"]):
                yield {
                    "ticker": ticker,
                    "date": date_idx.date(),
                    "open_price": float(row["Open"]),
                    "high_price": float(row["High"]),
                    "low_price": float(row["Low"]),
                    "close_price": float(row["Close"]),
                    "volume": int(row["Volume"]),
                }
                rows_yielded += 1

    print(f"Yielded {rows_yielded} rows with OHLCV data")
# Create and run the dlt extract pipeline
print("Running dlt extract pipeline...")
print("=" * 50)

# Create pipeline
extract_pipeline = dlt.pipeline(
    pipeline_name="financial_extract",
    destination=dlt.destinations.duckdb(DUCKDB_FILE),
    dataset_name="raw",
)

# Run the pipeline
load_info = extract_pipeline.run(
    yfinance_eod_prices(tickers=TICKERS, start_date=START_DATE, end_date=END_DATE)
)

print(f"\nLoad completed!")
print(f"  - Pipeline: {extract_pipeline.pipeline_name}")
print(f"  - Destination: {DUCKDB_FILE}")
print(f"  - Dataset/Schema: raw")
print(f"\nLoad info:")
print(load_info)
Running dlt extract pipeline...
==================================================
Downloading OHLCV data for 7 tickers...
Yielded 10395 rows with OHLCV data

Load completed!
  - Pipeline: financial_extract
  - Destination: ./financial_etl_dlt.duckdb
  - Dataset/Schema: raw

Load info:
Pipeline financial_extract load step completed in 0.53 seconds
1 load package(s) were loaded to destination duckdb and into dataset raw
The duckdb destination used duckdb:////home/francois/Workspace/posts/pipeline_duckdb/./financial_etl_dlt.duckdb location to store data
Load package 1764251419.3941712 is LOADED and contains no failed jobs
# Verify the extracted data in DuckDB
print("Verifying extracted OHLCV data in DuckDB...")
print("=" * 50)

with duckdb.connect(DUCKDB_FILE) as con:
    # Check what tables exist
    tables = con.execute(
        """
        SELECT table_schema, table_name 
        FROM information_schema.tables 
        WHERE table_schema = 'raw'
    """
    ).fetchall()
    print(f"Tables in 'raw' schema: {tables}")

    # Query the data with all OHLCV columns
    df_raw = con.execute(
        """
        SELECT ticker, date, open_price, high_price, low_price, close_price, volume
        FROM raw.eod_prices_raw
        ORDER BY ticker, date
        LIMIT 10
    """
    ).pl()

print("\nFirst 10 rows of extracted OHLCV data:")
df_raw
Verifying extracted OHLCV data in DuckDB...
==================================================
Tables in 'raw' schema: [('raw', 'eod_prices_raw'), ('raw', '_dlt_loads'), ('raw', '_dlt_pipeline_state'), ('raw', '_dlt_version')]

First 10 rows of extracted OHLCV data:
tickerdateopen_pricehigh_pricelow_priceclose_pricevolume
strdatef64f64f64f64i64
"AAPL"2020-01-0271.47661572.52859771.22327472.468277135480400
"AAPL"2020-01-0371.6961672.52374671.5393371.763718146322800
"AAPL"2020-01-0670.88548772.37417770.63455472.335571118387200
"AAPL"2020-01-0772.3452272.60097571.77580471.995369108872000
"AAPL"2020-01-0871.69857473.45508771.69857473.153488132079200
"AAPL"2020-01-0974.1306674.90034273.87973574.707321170108400
"AAPL"2020-01-1074.94137175.44082174.37436374.876221140644800
"AAPL"2020-01-1375.19231376.50245975.07408176.475914121532000
"AAPL"2020-01-1476.4131776.62308275.32017575.443222161954400
"AAPL"2020-01-1575.24298176.1236574.68803475.119926121923600

dlt Metadata Tables

One nice aspect of dlt is that it automatically creates metadata tables to track loads:

Table Purpose
_dlt_loads Tracks each pipeline run with timestamps and status
_dlt_version Schema version information

Each data row also gets:

  • _dlt_load_id: Unique identifier for the load batch
  • _dlt_id: Unique identifier for each row
# Explore dlt metadata
print("dlt Load Metadata:")
print("=" * 50)

with duckdb.connect(DUCKDB_FILE) as con:
    # Show load history
    loads = con.execute(
        """
        SELECT load_id, schema_name, status, inserted_at
        FROM raw._dlt_loads
        ORDER BY inserted_at DESC
        LIMIT 5
    """
    ).pl()

print("Recent loads:")
loads
dlt Load Metadata:
==================================================
Recent loads:
load_idschema_namestatusinserted_at
strstri64datetime[μs, Europe/Paris]
"1764251419.3941712""financial_extract"02025-11-27 14:50:21.208674 CET
"1764250753.3403895""financial_extract"02025-11-27 14:39:14.771508 CET
"1764249716.0756714""financial_extract"02025-11-27 14:21:57.290876 CET
"1764249510.5065134""financial_extract"02025-11-27 14:18:31.900797 CET
"1764248385.3157668""financial_extract"02025-11-27 13:59:46.452635 CET

SQLMesh Project Setup

Now that we have raw data in DuckDB, we need to set up SQLMesh for the transformation step. Before running any transformations, you need to initialize a SQLMesh project. This is typically a one-time setup using the sqlmesh init command:

sqlmesh init duckdb

This creates a project directory with the following structure:

dlt_sqlmesh_project/
├── config.py              # Database connection & project settings
├── external_models.yaml   # Schema definitions for external tables (dlt tables)
├── models/                # SQL transformation models
│   └── marts/
│       └── stock_metrics.sql
├── seeds/                 # Static CSV data (optional)
├── audits/                # Custom audit definitions (optional)
├── macros/                # Reusable SQL macros (optional)
├── tests/                 # Unit tests (optional)
├── logs/                  # Execution logs
└── .cache/                # SQLMesh internal cache

Key Configuration Files

Let’s examine the three essential files that define our SQLMesh project.

1. config.py - Database connection configuration

FILE: dlt_sqlmesh_project/config.py

from sqlmesh.core.config import Config, DuckDBConnectionConfig, ModelDefaultsConfig

config = Config(
    gateways={
        "duckdb_local": {
            "connection": DuckDBConnectionConfig(database="../financial_etl_dlt.duckdb"),
            "state_connection": DuckDBConnectionConfig(database="../financial_etl_dlt.duckdb"),
        }
    },
    default_gateway="duckdb_local",
    model_defaults=ModelDefaultsConfig(dialect="duckdb"),
)

Key settings:

  • connection: Points to the DuckDB file created by dlt
  • state_connection: Where SQLMesh stores its metadata (same file)
  • dialect: SQL dialect for parsing/generating queries

2. external_models.yaml - Schema for tables NOT managed by SQLMesh (i.e., dlt tables)

FILE: dlt_sqlmesh_project/external_models.yaml

- name: raw.eod_prices_raw
  description: Raw OHLCV stock prices loaded by dlt from Yahoo Finance
  columns:
    ticker: text
    date: date
    open_price: double
    high_price: double
    low_price: double
    close_price: double
    volume: bigint
    _dlt_load_id: text
    _dlt_id: text

Purpose:

  • Tells SQLMesh about tables it doesn’t manage (external sources)
  • dlt creates raw.eod_prices_raw : SQLMesh needs to know its schema
  • OHLCV = Open, High, Low, Close, Volume (standard financial data)
  • Includes dlt metadata columns (_dlt_load_id, _dlt_id)
  • Equivalent to sources in dbt

3. models/marts/stock_metrics.sql - The transformation model with technical indicators

FILE: dlt_sqlmesh_project/models/marts/stock_metrics.sql

MODEL (
  name marts.stock_metrics,
  kind INCREMENTAL_BY_TIME_RANGE (time_column trade_date, batch_size 30),
  start '2020-01-01',
  cron '@daily',
  grain [ticker, trade_date],
  audits (...)
);

-- Technical indicators calculated using SQL window functions:

-- 1. SIMPLE MOVING AVERAGES (single column: close_price)
sma_20_day = AVG(close_price) OVER (... ROWS BETWEEN 19 PRECEDING AND CURRENT ROW)
sma_50_day = AVG(close_price) OVER (... ROWS BETWEEN 49 PRECEDING AND CURRENT ROW)

-- 2. BOLLINGER BANDS (single column: close_price)
bollinger_upper = SMA20 + 2 * STDDEV(close_price)
bollinger_lower = SMA20 - 2 * STDDEV(close_price)

-- 3. RSI - Relative Strength Index (single column: close_price)
rsi_14_day = 100 - (100 / (1 + avg_gain / avg_loss))

-- 4. MACD - Moving Average Convergence Divergence (single column: close_price)
macd_line = SMA_12_day - SMA_26_day

-- 5. ATR - Average True Range [MULTI-COLUMN: high, low, close]
true_range = GREATEST(
    high - low,                    -- Intraday range
    ABS(high - prev_close),        -- Gap up
    ABS(low - prev_close)          -- Gap down
)
atr_14_day = AVG(true_range) OVER 14 days

-- 6. DAILY METRICS [MULTI-COLUMN]
daily_return_pct  = (close - prev_close) / prev_close * 100
price_range_pct   = (high - low) / close * 100           -- Uses high, low, close
volume_ratio      = volume / AVG(volume) OVER 20 days

Key features:

  • ATR uses 3 columns (high_price, low_price, close_price)
  • price_range_pct also uses multiple columns
  • WINDOW clauses for efficient computation
  • CTEs for intermediate calculations (gains, losses, true_range)

Transform: Run SQLMesh Pipeline

With the project configured, we can now run SQLMesh to transform the raw data. This is where we found SQLMesh particularly helpful: it handles the complexity of incremental processing and keeps track of what has already been computed.

The sqlmesh plan --auto-apply command will:

  1. Detect the external table (raw.eod_prices_raw) created by dlt
  2. Execute our transformation model (marts.stock_metrics)
  3. Create the output table with technical indicators like SMA (Simple Moving Average, an average of prices over a rolling window, commonly used in financial analysis)
  4. Run data quality audits automatically
# Run SQLMesh to apply transformations
print("Running SQLMesh transformation plan...")
print("=" * 50)

try:
    # Run sqlmesh plan with auto-apply
    result = subprocess.run(
        ["sqlmesh", "plan", "--auto-apply"],
        cwd=SQLMESH_PROJECT_DIR,
        capture_output=True,
        text=True,
        check=True,
    )
    print(result.stdout)
    if result.stderr:
        # Filter out deprecation warnings
        stderr_lines = [
            l
            for l in result.stderr.split("\n")
            if "DeprecationWarning" not in l and l.strip()
        ]
        if stderr_lines:
            print("Info:")
            print("\n".join(stderr_lines))
    print("\nSQLMesh transformation completed successfully!")
except subprocess.CalledProcessError as e:
    print("SQLMesh execution failed!")
    print(f"stdout: {e.stdout}")
    print(f"stderr: {e.stderr}")
    raise
Running SQLMesh transformation plan...
==================================================

Summary of differences from `prod`:

Metadata Updated:
- marts.stock_metrics

  audits (
    NOT_NULL('columns' = (ticker, trade_date, close_price, volume)),
-   UNIQUE_COMBINATION_OF_COLUMNS('columns' = (ticker, trade_date))
+   UNIQUE_COMBINATION_OF_COLUMNS('columns' = (ticker, trade_date)),
+   VALID_RSI_RANGE(),
+   VALID_OHLC_PRICES(),
+   POSITIVE_VOLUME(),
+   VALID_ATR()
  ),

Metadata Updated: marts.stock_metrics

SKIP: No physical layer updates to perform

[ 1/72] marts.stock_metrics   [insert 2020-01-01 - 2020-01-30, audits passed 6] 0.04s
...
[72/72] marts.stock_metrics   [insert 2025-10-31 - 2025-11-26, audits passed 6] 0.03s

Auditing models ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 72/72 • 0:00:02

Model batches executed

SKIP: No model batches to execute

Updating virtual layer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00

Virtual layer updated

SQLMesh transformation completed successfully!

A note on the output: The prod environment represents the current “deployed” state. When we run the plan, it compares our model code against prod and shows a diff of what changed, similar to how Git works. In this case, we added new audits, so SQLMesh detected a metadata change but recognized that the underlying transformation logic was unchanged. That’s why it skipped reprocessing the data (No physical layer updates) and only ran the new audits against the existing data.

One concept that helped us understand SQLMesh better is its two-layer architecture:

Physical Layer (Versioned Tables)          Virtual Layer (Views)
┌────────────────────────────────────┐    ┌─────────────────────────┐
│ sqlmesh__marts.marts__stock_metrics│    │ marts.stock_metrics     │
│        __[fingerprint]             │◄───│ (VIEW pointing to       │
└────────────────────────────────────┘    │  physical table)        │
                                          └─────────────────────────┘

Benefits:

  • Instant environment creation: Just create views, no data copy
  • Instant promotion: Swap view pointers, no recomputation
  • Easy rollbacks: Point to previous version

Verify: Query Transformed Data

Before loading the data to its final destination, it’s worth verifying that the transformation worked as expected. Let’s query the transformed data from DuckDB and look at some of the technical indicators.

# Connect to DuckDB and query the transformed data with technical indicators
print("Querying transformed data with technical indicators...")
print("=" * 50)

with duckdb.connect(DUCKDB_FILE) as con:
    # Query the marts table - show key technical indicators
    query = """
    SELECT 
        ticker,
        trade_date,
        ROUND(close_price, 2) AS close,
        ROUND(sma_20_day, 2) AS sma20,
        ROUND(sma_50_day, 2) AS sma50,
        ROUND(rsi_14_day, 1) AS rsi,
        ROUND(macd_line, 2) AS macd,
        ROUND(atr_14_day, 2) AS atr,
        ROUND(bollinger_upper, 2) AS bb_upper,
        ROUND(bollinger_lower, 2) AS bb_lower
    FROM marts.stock_metrics
    WHERE ticker = 'AAPL'
    ORDER BY trade_date DESC
    LIMIT 15
    """
    df_transformed = con.execute(query).pl()

print("Sample AAPL data with technical indicators (most recent 15 rows):")
df_transformed
Querying transformed data with technical indicators...
==================================================
Sample AAPL data with technical indicators (most recent 15 rows):
tickertrade_dateclosesma20sma50rsimacdatrbb_upperbb_lower
strdatef64f64f64f64f64f64f64f64
"AAPL"2025-11-26277.55271.13271.1363.01.025.95277.91264.34
"AAPL"2025-11-25276.97270.77270.7761.60.76.14276.98264.56
"AAPL"2025-11-24275.92270.41270.4160.30.336.11275.95264.86
"AAPL"2025-11-21271.49270.06270.0655.10.145.95274.98265.14
"AAPL"2025-11-20266.25269.97269.9741.40.15.73275.0264.94
"AAPL"2025-11-12273.47270.49270.4963.00.05.46275.11265.87
"AAPL"2025-11-11275.25270.12270.1273.10.05.64274.45265.79
"AAPL"2025-11-10269.43269.39269.3943.60.05.52270.72268.05
"AAPL"2025-11-07268.21269.38269.3826.70.05.4270.84267.92
"AAPL"2025-11-06269.51269.61269.6139.20.05.38270.63268.6

Lineage

We can also explore column lineage using SQLMesh Python API:

def get_source_columns(node):
    """Recursively extract source column names from lineage node."""
    sources = []
    for downstream in node.downstream:
        if not downstream.downstream:
            # Leaf node - actual source column
            sources.append(downstream.name)
        else:
            sources.extend(get_source_columns(downstream))
    return sources


# Initialize SQLMesh context
ctx = Context(paths=[SQLMESH_PROJECT_DIR])

# Get the stock_metrics model
model = ctx.get_model("marts.stock_metrics")

print("Column Lineage for marts.stock_metrics")
print("=" * 60)

# Compute actual lineage using SQLGlot
print(f"\n{'Output Column':20} {'Type':10} {'Source Columns'}")
print("-" * 60)

for col_name, col_type in model.columns_to_types.items():
    result = lineage.lineage(col_name, model.query)
    sources = list(dict.fromkeys(get_source_columns(result)))  # dedupe, preserve order
    source_str = ", ".join(sources) if sources else "N/A"
    print(f"{col_name:20} {str(col_type):10} {source_str}")
Column Lineage for marts.stock_metrics
============================================================

Output Column        Type       Source Columns
------------------------------------------------------------
ticker               TEXT       eod_prices_raw.ticker
trade_date           DATE       eod_prices_raw.date
open_price           DOUBLE     eod_prices_raw.open_price
high_price           DOUBLE     eod_prices_raw.high_price
low_price            DOUBLE     eod_prices_raw.low_price
close_price          DOUBLE     eod_prices_raw.close_price
volume               BIGINT     eod_prices_raw.volume
sma_20_day           DOUBLE     eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
sma_50_day           DOUBLE     eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
bollinger_upper      DOUBLE     eod_prices_raw.close_price
bollinger_lower      DOUBLE     eod_prices_raw.close_price
rsi_14_day           DOUBLE     eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
macd_line            DOUBLE     eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
sma_9_day            DOUBLE     eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
atr_14_day           DOUBLE     eod_prices_raw.low_price, eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker, eod_prices_raw.high_price
daily_return_pct     DOUBLE     eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
price_range_pct      DOUBLE     eod_prices_raw.high_price, eod_prices_raw.close_price, eod_prices_raw.low_price
volume_ratio         DOUBLE     eod_prices_raw.volume
volume_sma_20        BIGINT     eod_prices_raw.volume

SQLMesh Audits

One feature we found particularly useful is SQLMesh’s audits : these are data quality checks that run automatically after model execution. SQLMesh supports two types:

1. Built-in Audits (defined inline in the MODEL):

audits (
  not_null(columns := (ticker, trade_date)),
  unique_combination_of_columns(columns := (ticker, trade_date))
)

2. Custom Audits (defined in audits/ directory):

dlt_sqlmesh_project/
└── audits/
    ├── valid_rsi_range.sql      # RSI must be 0-100
    ├── valid_ohlc_prices.sql    # High >= Low, etc.
    ├── positive_volume.sql      # Volume > 0
    └── valid_atr.sql            # ATR >= 0

Custom audits return rows that violate the condition. If any rows are returned, the audit fails.

Example Custom Audit: valid_ohlc_prices.sql

AUDIT (
  name valid_ohlc_prices,
  dialect duckdb
);

/*
 * OHLC Price Sanity Check (Multi-Column Audit)
 * Validates relationships between Open, High, Low, Close prices:
 *   - High must be >= Low
 *   - High must be >= Open and Close
 *   - Low must be <= Open and Close
 * Returns rows that violate these constraints.
 */

SELECT
  ticker,
  trade_date,
  open_price,
  high_price,
  low_price,
  close_price,
  CASE
    WHEN high_price < low_price THEN 'high < low'
    WHEN high_price < open_price THEN 'high < open'
    WHEN high_price < close_price THEN 'high < close'
    WHEN low_price > open_price THEN 'low > open'
    WHEN low_price > close_price THEN 'low > close'
  END AS violation_reason
FROM @this_model
WHERE high_price < low_price
   OR high_price < open_price
   OR high_price < close_price
   OR low_price > open_price
   OR low_price > close_price

Note: @this_model is a macro that refers to the model being audited.

Let’s run all the audits:

# Run SQLMesh audits to validate data quality
print("Running data quality audits...")
print("=" * 50)

result = subprocess.run(
    ["sqlmesh", "audit"],
    cwd=SQLMESH_PROJECT_DIR,
    capture_output=True,
    text=True,
)

print(result.stdout)
if result.returncode == 0:
    print("\nAll audits passed!")
else:
    print("\nAudit failures detected:")
    print(result.stderr)
Running data quality audits...
==================================================
Found 6 audits.
not_null on model marts.stock_metrics ✅ PASS.
unique_combination_of_columns on model marts.stock_metrics ✅ PASS.
valid_rsi_range on model marts.stock_metrics ✅ PASS.
valid_ohlc_prices on model marts.stock_metrics ✅ PASS.
positive_volume on model marts.stock_metrics ✅ PASS.
valid_atr on model marts.stock_metrics ✅ PASS.

Finished with 0 audit errors and 0 audits skipped.
Done.


All audits passed!

Load: dlt to SQL Server

With our data transformed and validated, we can move to the final Load phase. Here, we use dlt again, this time to export the transformed data from DuckDB to SQL Server. This demonstrates how dlt can work bidirectionally, not just for ingestion but also for exporting data to downstream systems.

dlt sql_database Source

dlt provides a sql_database source that can read from any SQLAlchemy-compatible database, including DuckDB:

from dlt.sources.sql_database import sql_database

source = sql_database(
    credentials="duckdb:///path/to/db.duckdb",
    schema="marts",
    table_names=["stock_metrics"]
)

dlt mssql Destination

dlt supports SQL Server as a destination:

pipeline = dlt.pipeline(
    destination=dlt.destinations.mssql(credentials={...})
)

Prerequisites:

  • Microsoft ODBC Driver 17 or 18 for SQL Server
  • Valid SQL Server credentials
# Load credentials
def load_credentials(creds_file: str, conn_key: str) -> dict:
    """Load SQL Server credentials from JSON file."""
    with open(creds_file, "r") as f:
        creds = json.load(f)
    return creds[conn_key]["info"]


print("Loading SQL Server credentials...")
try:
    creds = load_credentials(CREDENTIALS_FILE, TARGET_CONN_KEY)
    print(f"  Server: {creds['server']}:{creds['port']}")
    print(f"  Database: {creds['database']}")
    print(f"  Target: {TARGET_SCHEMA}.{TARGET_TABLE}")
    creds_loaded = True
except Exception as e:
    print(f"Warning: Could not load credentials - {e}")
    print("\nSQL Server load will be skipped.")
    creds_loaded = False
Loading SQL Server credentials...
  Server: localhost:1433
  Database: FINANCIAL_MART
  Target: dbo.Dim_Stock_Metrics
# Create and run the dlt load pipeline (DuckDB -> SQL Server)
if creds_loaded:
    print("Running dlt load pipeline (DuckDB -> SQL Server)...")
    print("=" * 50)

    try:
        # Import the sql_database source
        from dlt.sources.sql_database import sql_database

        # Create source from DuckDB
        # Read from the marts.stock_metrics table
        duckdb_source = sql_database(
            credentials=f"duckdb:///{DUCKDB_FILE}",
            schema="marts",
            table_names=["stock_metrics"],
        )

        # Create pipeline to SQL Server
        # Note: We need to pass driver options for self-signed certificates
        load_pipeline = dlt.pipeline(
            pipeline_name="financial_load",
            destination=dlt.destinations.mssql(
                credentials={
                    "database": creds["database"],
                    "username": creds["username"],
                    "password": creds["password"],
                    "host": creds["server"],
                    "port": creds["port"],
                    "driver": "ODBC Driver 18 for SQL Server",
                    "query": {"TrustServerCertificate": "yes"},
                }
            ),
            dataset_name=TARGET_SCHEMA,
        )

        # Run the pipeline
        load_info = load_pipeline.run(duckdb_source, write_disposition="replace")

        print(f"\nLoad completed!")
        print(f"  - Source: {DUCKDB_FILE} (marts.stock_metrics)")
        print(f"  - Destination: SQL Server ({creds['database']})")
        print(f"\nLoad info:")
        print(load_info)

    except Exception as e:
        print(f"\nLoad failed: {e}")
        print("\nTroubleshooting tips:")
        print("  1. Verify SQL Server is running and accessible")
        print("  2. Check ODBC Driver 17/18 is installed")
        print("  3. Verify the target database exists")
else:
    print("\nSkipping SQL Server load (no valid credentials).")
    print("The transformed data is available in DuckDB at:")
    print(f"  {DUCKDB_FILE} -> marts.stock_metrics")
Running dlt load pipeline (DuckDB -> SQL Server)...
==================================================


/home/francois/miniconda3/envs/pipeline313/lib/python3.13/site-packages/duckdb_engine/__init__.py:184: DuckDBEngineWarning: duckdb-engine doesn't yet support reflection on indices
  warnings.warn(



Load completed!
  - Source: ./financial_etl_dlt.duckdb (marts.stock_metrics)
  - Destination: SQL Server (FINANCIAL_MART)

Load info:
Pipeline financial_load load step completed in 6.88 seconds
1 load package(s) were loaded to destination mssql and into dataset dbo
The mssql destination used mssql://migadmin:***@localhost:1433/financial_mart location to store data
Load package 1764251428.6438282 is LOADED and contains no failed jobs

We can check that the data is in SQL Server:

SELECT TOP 10 * FROM dbo.stock_metrics ORDER BY trade_date DESC",
ticker trade_date close_price volume sma_50_day _dlt_load_id _dlt_id open_price high_price low_price ... bollinger_upper bollinger_lower rsi_14_day macd_line sma_9_day atr_14_day daily_return_pct price_range_pct volume_ratio volume_sma_20
0 TSLA 2025-11-26 426.579987 63299400 425.9326 1764251428.6438282 dp7lDd+JOoBe9g 423.950012 426.940002 416.890015 ... 473.2135 378.6517 43.15 -14.1993 407.6211 21.3321 1.7120 2.3559 0.72 88165984
1 GOOGL 2025-11-26 319.950012 51290800 290.8753 1764251428.6438282 DfvXqKHwDRHjuQ 320.679993 324.500000 316.790009 ... 319.7867 261.9638 69.13 4.6406 298.8444 12.1707 -1.0790 2.4097 1.12 45707632
2 META 2025-11-26 633.609985 15186100 617.5516 1764251428.6438282 j/1Q7cXe0XMQ9Q 637.690002 638.359985 631.630005 ... 652.9907 582.1125 56.86 -8.2399 607.3067 17.8543 -0.4102 1.0622 0.62 24326500
3 MSFT 2025-11-26 485.500000 25697600 497.4609 1764251428.6438282 w+FErTh2jGGz1g 486.309998 488.309998 481.200012 ... 526.8620 468.0599 43.31 -5.6150 486.8763 11.8285 1.7841 1.4645 0.98 26267837
4 AMZN 2025-11-26 229.160004 38435400 236.8532 1764251428.6438282 idQBkg7Rc/3mEw 230.740005 231.750000 228.770004 ... 260.0106 213.6957 36.48 -6.3015 226.1933 6.5457 -0.2221 1.3004 0.70 55171516
5 NVDA 2025-11-26 180.259995 183181100 189.3247 1764251428.6438282 hNEoZkDqEOIfQA 181.630005 182.910004 178.240005 ... 206.1015 172.5480 43.50 -4.4397 182.7556 9.0200 1.3722 2.5907 0.83 221997200
6 AAPL 2025-11-26 277.549988 33413600 271.1275 1764251428.6438282 KXA3vllpset5WQ 276.959991 279.529999 276.630005 ... 277.9141 264.3409 63.02 1.0158 271.5611 5.9532 0.2094 1.0449 0.67 49662732
7 TSLA 2025-11-25 419.399994 71915600 425.8967 1764251428.6438282 h4GeCQBQU+CrHg 414.420013 420.480011 405.950012 ... 474.5472 377.2461 35.78 -12.6092 404.8889 22.9257 0.3878 3.4645 0.80 89547461
8 GOOGL 2025-11-25 323.440002 88632100 289.2600 1764251428.6438282 frSIcFsmz+UMgw 326.209991 328.829987 317.649994 ... 315.2424 263.2776 71.99 3.7683 294.2467 12.1350 1.5255 3.4566 1.95 45397456
9 META 2025-11-25 636.219971 25213000 616.6595 1764251428.6438282 0bqKg0SIKNKX/g 624.000000 637.049988 618.299988 ... 652.2370 581.0819 50.11 -7.5019 604.6711 18.6593 3.7795 2.9471 1.02 24834300

10 rows × 21 columns

Production Deployment: Logging & Return Codes

The pipeline above works well for development, but deploying to production with a scheduler (cron, Airflow, Prefect, etc.) requires a few additional considerations. We found that proper logging and return codes are essential for monitoring and debugging issues.

Why This Matters

Aspect Development Production
Output print() statements Structured log files
Exit behavior Exceptions crash the script Return codes signal status
Monitoring Manual inspection Automated alerts on failures

Logging Setup

Replace print() with Python’s logging module for production, or loguru:

import logging
import sys

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

# Usage
logger.info("Starting ETL pipeline...")
logger.warning("Missing data for ticker: XYZ")
logger.error("Failed to connect to SQL Server")

Log Levels:

  • DEBUG: Detailed diagnostic info (row counts, query times)
  • INFO: General progress updates (“Loaded 2012 rows”)
  • WARNING: Non-critical issues (“Retrying connection…”)
  • ERROR: Failures that stop the pipeline
  • CRITICAL: System-level failures

Return Codes for Schedulers

Schedulers use exit codes to determine success/failure. Use sys.exit() with meaningful codes:

import sys

# Define exit codes
EXIT_SUCCESS = 0
EXIT_GENERAL_ERROR = 1
EXIT_CONFIG_ERROR = 2
EXIT_DATA_QUALITY_ERROR = 3
EXIT_CONNECTION_ERROR = 4

def main():
    try:
        # Load configuration
        if not os.path.exists('credentials.json'):
            logger.error("Missing credentials.json")
            sys.exit(EXIT_CONFIG_ERROR)
        
        # Extract
        logger.info("Extracting data...")
        df = extract_data()
        
        # Validate
        if len(df) == 0:
            logger.error("No data extracted - aborting")
            sys.exit(EXIT_DATA_QUALITY_ERROR)
        
        # Transform
        logger.info("Running SQLMesh transformations...")
        run_sqlmesh()
        
        # Load
        logger.info("Loading to SQL Server...")
        load_to_sql_server()
        
        logger.info("Pipeline completed successfully")
        sys.exit(EXIT_SUCCESS)
        
    except ConnectionError as e:
        logger.error(f"Connection failed: {e}")
        sys.exit(EXIT_CONNECTION_ERROR)
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        sys.exit(EXIT_GENERAL_ERROR)

if __name__ == "__main__":
    main()

Sample Scheduler Configuration (cron)

# Run daily at 6 AM, log output, alert on failure
0 6 * * * /usr/bin/python3 /path/to/run_pipeline.py >> /var/log/etl.log 2>&1 || mail -s "ETL Failed" alerts@company.com

Sample Log Output

2024-01-15 06:00:01 - INFO - Starting ETL pipeline...
2024-01-15 06:00:02 - INFO - Extracting data from yfinance...
2024-01-15 06:00:05 - INFO - Extracted 2012 rows for ['AAPL', 'MSFT']
2024-01-15 06:00:06 - INFO - Running SQLMesh transformations...
2024-01-15 06:00:08 - INFO - SQLMesh plan applied successfully
2024-01-15 06:00:09 - INFO - Loading to SQL Server...
2024-01-15 06:00:12 - INFO - Loaded 2012 rows to dbo.Dim_Stock_Metrics
2024-01-15 06:00:12 - INFO - Pipeline completed successfully

Further Engineering Considerations

There are a few more topics worth considering as you move toward production. These go beyond the scope of this example, but we wanted to mention them briefly.

1. Error Handling and Observability

While dlt and SQLMesh provide solid foundations, production-grade pipelines typically need more comprehensive error handling and observability.

  • Centralized Logging: Beyond basic file logging, integrate with centralized logging systems for easier searching, aggregation, and analysis of logs across multiple pipeline runs.
  • Alerting: Configure alerts based on log errors, audit failures, or unexpected data volumes.
  • Monitoring Dashboards: Build dashboards to visualize pipeline health, execution times, data volumes, and data quality metrics over time. This helps in proactive identification of issues and performance bottlenecks.
  • Idempotency and Retries: Design pipelines to be idempotent where possible, allowing safe retries without duplicating data or side effects. dlt’s write_disposition and SQLMesh’s transactional updates aid this. Implement retry mechanisms with exponential backoff for transient errors (e.g., network issues, API rate limits).

2. Schema Evolution Strategy

dlt’s automatic schema evolution is a helpful feature, but managing it thoughtfully becomes important when transformations and downstream consumers are involved.

  • Graceful Changes: While dlt handles schema additions, consider how schema changes (e.g., column renaming, type changes) in the raw layer propagate. SQLMesh’s column-level lineage can help identify affected downstream models.
  • Versioned External Models: For significant schema changes in external sources, SQLMesh allows versioning of external model definitions. This lets you gracefully transition dependents without breaking existing queries.
  • Impact Analysis: Before deploying a schema change, use SQLMesh’s plan command to visualize the impact on dependent models. This helps prevent unexpected issues in the transformation layer.

3. Cost/Resource Management

While DuckDB is efficient for local development and moderate data volumes, larger-scale deployments may require thinking about cost and resource tradeoffs.

  • Compute vs. Storage: For cloud environments, understand the trade-offs between compute and storage costs. DuckDB is compute-bound locally; in a cloud data warehouse, query complexity and data scanned directly impact costs.
  • Incremental Processing: SQLMesh’s incremental models are critical for cost optimization. By only processing new or changed data, you significantly reduce compute resources and execution time compared to full table rebuilds.
  • Resource Allocation: Fine-tune resource allocation (CPU, memory) for pipeline execution environments, especially when running on orchestrators like Airflow.
  • Cloud-Native Alternatives: If the DuckDB file grows excessively large or requires distributed processing, consider migrating to cloud-native data warehouses or data lake solutions that offer scalable compute and storage.

4. Testability

Beyond SQLMesh audits, it is recommended to consider a broader testing strategy for production pipelines:

  • Unit Tests for dlt Resources: Write Python unit tests for your custom dlt resources (yfinance_eod_prices in this example). Mock external dependencies (like the yfinance API) to ensure the resource logic works correctly under various conditions.
  • SQLMesh Unit Tests: `SQLMesh supports SQL unit tests for models. These tests define expected outputs for specific input data, ensuring transformation logic is correct and remains so after changes.
  • Integration Tests: Test the full pipeline flow (Extract -> Transform -> Load) in a controlled environment with representative data. This ensures all components work together seamlessly.
  • Virtual Data Environments (VDEs) for Development: Leverage SQLMesh VDEs to create isolated environments for feature development. This allows developers to test changes to models without impacting production data or other developers’ work. Changes can be validated in a VDE before merging to a shared environment.

By considering these aspects, you can evolve a pipeline like this from a working example into something more suitable for production use.

References