An Example ETL Pipeline with dlt + SQLMesh + DuckDB
In this post, we walk through building a basic ETL (Extract-Transform-Load) pipeline. This is a toy example, intentionally over-simplistic, but it helped us explore how three modern Python tools work together.
The stack we used: dlt (data load tool), SQLMesh and DuckDB
A note before we begin: I’m familiar with DuckDB and SQLGlot, the SQL parser that SQLMesh is using under the hood, but I’m a newcomer to both dlt and SQLMesh, so take this as a learning notebook.
Outline
- About the Tools
- Setup & Configuration
- Extract: dlt + yfinance
- SQLMesh Project Setup
- Transform: Run SQLMesh Pipeline
- Verify: Query Transformed Data
- Lineage
- Load: dlt to SQL Server
- Production Deployment: Logging & Return Codes
- Further Engineering Considerations
- References
Here is a basic diagram showing the pipeline architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ EXTRACT │ │ TRANSFORM │ │ LOAD │
│ │ │ │ │ │
│ dlt pipeline │──▶───│ SQLMesh Models │──▶───│ dlt pipeline │
│ (yfinance │ │ (DuckDB Engine) │ │ (DuckDB → │
│ → DuckDB) │ │ │ │ SQL Server) │
│ │ │ raw → marts │ │ │
└─────────────────┘ └──────────────────┘ └─────────────────┘
About the Tools
Here’s a brief overview of each tool.
dlt (data load tool)
dlt is an open-source Python library designed to simplify data loading. What we found particularly useful is how it automates schema inference, data normalization, and incremental loading. Some key features:
- Declarative pipelines: You define sources as Python generators using the
@dlt.resourcedecorator - Schema evolution: It automatically handles schema changes as your data evolves
- Multiple connectors: Works with DuckDB, SQL Server, BigQuery, Snowflake, and many others
SQLMesh
SQLMesh is a data transformation framework that brings software engineering practices to SQL development. We appreciated how it helped us think about transformations more systematically:
- Version control: Track changes to your SQL models over time
- Column-level lineage: See exactly which source columns affect which outputs, helpful for debugging
- Virtual Data Environments: Test changes without duplicating data
DuckDB
DuckDB is an embedded analytical database optimized for OLAP (Online Analytical Processing) workloads, for analytics queries over large datasets rather than transactional operations:
- In-process: No separate server to install or manage
- Fast: Uses columnar storage with vectorized execution
- Handles relatively large data: Out-of-core processing lets it work with datasets larger than available RAM
Python Requirements
duckdb
sqlmesh
polars
pandas # for Yahoo Finance data ressource
yfinance
pyodbc
sqlalchemy
dlt[duckdb,mssql]
duckdb-engine
Setup & Configuration
Let’s start by importing the necessary libraries and defining the configuration. We are using stock market data as the example dataset; it’s freely available via Yahoo Finance and has enough complexity to demonstrate the transformation capabilities.
import json
import os
import subprocess
import warnings
from datetime import date
import dlt
import duckdb
import pandas as pd
import polars as pl
import yfinance as yf
from sqlglot import lineage
from sqlmesh import Context
warnings.filterwarnings("ignore", category=DeprecationWarning)
# Configuration
DUCKDB_FILE = "./financial_etl_dlt.duckdb"
SQLMESH_PROJECT_DIR = "./dlt_sqlmesh_project"
CREDENTIALS_FILE = "./credentials.json"
# Stock configuration
TICKERS = ["AAPL", "MSFT", "GOOGL", "AMZN", "META", "NVDA", "TSLA"]
START_DATE = "2020-01-01"
END_DATE = date.today().isoformat()
# SQL Server target configuration
TARGET_CONN_KEY = "ms_target_01" # key in credentials JSON file
TARGET_SCHEMA = "dbo"
TARGET_TABLE = "Dim_Stock_Metrics"
print("Configuration loaded successfully!")
print(f" - DuckDB file: {DUCKDB_FILE}")
print(f" - SQLMesh project: {SQLMESH_PROJECT_DIR}")
print(f" - Tickers: {TICKERS}")
print(f" - Date range: {START_DATE} to {END_DATE}")
Configuration loaded successfully!
- DuckDB file: ./financial_etl_dlt.duckdb
- SQLMesh project: ./dlt_sqlmesh_project
- Tickers: ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META', 'NVDA', 'TSLA']
- Date range: 2020-01-01 to 2025-11-27
Extract: dlt + yfinance
With the configuration in place, we can move to the Extract phase. Here, we use dlt to pull stock data from Yahoo Finance and load it directly into DuckDB. The data includes OHLCV values: Open, High, Low, Close (prices), and Volume.
Creating a Custom dlt Resource
dlt uses the @dlt.resource decorator to define data sources. Each resource is a Python generator that yields data rows:
@dlt.resource(table_name="my_table")
def my_source():
yield {"col1": "value1", "col2": 123}
Key parameters:
table_name: Name of the destination tablewrite_disposition: How to handle existing data (replace,append,merge)columns: Optional schema hints for data types
# Define a custom dlt resource for Yahoo Finance data
@dlt.resource(
table_name="eod_prices_raw",
write_disposition="replace",
columns={
"ticker": {"data_type": "text"},
"date": {"data_type": "date"},
"open_price": {"data_type": "double"},
"high_price": {"data_type": "double"},
"low_price": {"data_type": "double"},
"close_price": {"data_type": "double"},
"volume": {"data_type": "bigint"},
},
)
def yfinance_eod_prices(tickers: list[str], start_date: str, end_date: str):
"""
Extract End-of-Day stock prices from Yahoo Finance.
This is a custom dlt resource that yields OHLCV data for multiple tickers.
dlt will automatically:
- Infer the schema from the yielded dictionaries
- Handle batching and loading to the destination
- Track load metadata (_dlt_load_id, _dlt_id)
"""
print(f"Downloading OHLCV data for {len(tickers)} tickers...")
# Download data for all tickers at once
data = yf.download(
tickers, start=start_date, end=end_date, progress=False, auto_adjust=True
)
if data.empty:
raise ValueError("No data returned from Yahoo Finance")
# Process each ticker
rows_yielded = 0
for ticker in tickers:
# Handle multi-ticker vs single-ticker response format
if len(tickers) > 1:
ticker_data = data.xs(ticker, level=1, axis=1)
else:
ticker_data = data
# Yield each row with OHLCV data
for date_idx, row in ticker_data.iterrows():
if pd.notna(row["Close"]) and pd.notna(row["Volume"]):
yield {
"ticker": ticker,
"date": date_idx.date(),
"open_price": float(row["Open"]),
"high_price": float(row["High"]),
"low_price": float(row["Low"]),
"close_price": float(row["Close"]),
"volume": int(row["Volume"]),
}
rows_yielded += 1
print(f"Yielded {rows_yielded} rows with OHLCV data")
# Create and run the dlt extract pipeline
print("Running dlt extract pipeline...")
print("=" * 50)
# Create pipeline
extract_pipeline = dlt.pipeline(
pipeline_name="financial_extract",
destination=dlt.destinations.duckdb(DUCKDB_FILE),
dataset_name="raw",
)
# Run the pipeline
load_info = extract_pipeline.run(
yfinance_eod_prices(tickers=TICKERS, start_date=START_DATE, end_date=END_DATE)
)
print(f"\nLoad completed!")
print(f" - Pipeline: {extract_pipeline.pipeline_name}")
print(f" - Destination: {DUCKDB_FILE}")
print(f" - Dataset/Schema: raw")
print(f"\nLoad info:")
print(load_info)
Running dlt extract pipeline...
==================================================
Downloading OHLCV data for 7 tickers...
Yielded 10395 rows with OHLCV data
Load completed!
- Pipeline: financial_extract
- Destination: ./financial_etl_dlt.duckdb
- Dataset/Schema: raw
Load info:
Pipeline financial_extract load step completed in 0.53 seconds
1 load package(s) were loaded to destination duckdb and into dataset raw
The duckdb destination used duckdb:////home/francois/Workspace/posts/pipeline_duckdb/./financial_etl_dlt.duckdb location to store data
Load package 1764251419.3941712 is LOADED and contains no failed jobs
# Verify the extracted data in DuckDB
print("Verifying extracted OHLCV data in DuckDB...")
print("=" * 50)
with duckdb.connect(DUCKDB_FILE) as con:
# Check what tables exist
tables = con.execute(
"""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema = 'raw'
"""
).fetchall()
print(f"Tables in 'raw' schema: {tables}")
# Query the data with all OHLCV columns
df_raw = con.execute(
"""
SELECT ticker, date, open_price, high_price, low_price, close_price, volume
FROM raw.eod_prices_raw
ORDER BY ticker, date
LIMIT 10
"""
).pl()
print("\nFirst 10 rows of extracted OHLCV data:")
df_raw
Verifying extracted OHLCV data in DuckDB...
==================================================
Tables in 'raw' schema: [('raw', 'eod_prices_raw'), ('raw', '_dlt_loads'), ('raw', '_dlt_pipeline_state'), ('raw', '_dlt_version')]
First 10 rows of extracted OHLCV data:
| ticker | date | open_price | high_price | low_price | close_price | volume |
|---|---|---|---|---|---|---|
| str | date | f64 | f64 | f64 | f64 | i64 |
| "AAPL" | 2020-01-02 | 71.476615 | 72.528597 | 71.223274 | 72.468277 | 135480400 |
| "AAPL" | 2020-01-03 | 71.69616 | 72.523746 | 71.53933 | 71.763718 | 146322800 |
| "AAPL" | 2020-01-06 | 70.885487 | 72.374177 | 70.634554 | 72.335571 | 118387200 |
| "AAPL" | 2020-01-07 | 72.34522 | 72.600975 | 71.775804 | 71.995369 | 108872000 |
| "AAPL" | 2020-01-08 | 71.698574 | 73.455087 | 71.698574 | 73.153488 | 132079200 |
| "AAPL" | 2020-01-09 | 74.13066 | 74.900342 | 73.879735 | 74.707321 | 170108400 |
| "AAPL" | 2020-01-10 | 74.941371 | 75.440821 | 74.374363 | 74.876221 | 140644800 |
| "AAPL" | 2020-01-13 | 75.192313 | 76.502459 | 75.074081 | 76.475914 | 121532000 |
| "AAPL" | 2020-01-14 | 76.41317 | 76.623082 | 75.320175 | 75.443222 | 161954400 |
| "AAPL" | 2020-01-15 | 75.242981 | 76.12365 | 74.688034 | 75.119926 | 121923600 |
dlt Metadata Tables
One nice aspect of dlt is that it automatically creates metadata tables to track loads:
| Table | Purpose |
|---|---|
_dlt_loads |
Tracks each pipeline run with timestamps and status |
_dlt_version |
Schema version information |
Each data row also gets:
_dlt_load_id: Unique identifier for the load batch_dlt_id: Unique identifier for each row
# Explore dlt metadata
print("dlt Load Metadata:")
print("=" * 50)
with duckdb.connect(DUCKDB_FILE) as con:
# Show load history
loads = con.execute(
"""
SELECT load_id, schema_name, status, inserted_at
FROM raw._dlt_loads
ORDER BY inserted_at DESC
LIMIT 5
"""
).pl()
print("Recent loads:")
loads
dlt Load Metadata:
==================================================
Recent loads:
| load_id | schema_name | status | inserted_at |
|---|---|---|---|
| str | str | i64 | datetime[μs, Europe/Paris] |
| "1764251419.3941712" | "financial_extract" | 0 | 2025-11-27 14:50:21.208674 CET |
| "1764250753.3403895" | "financial_extract" | 0 | 2025-11-27 14:39:14.771508 CET |
| "1764249716.0756714" | "financial_extract" | 0 | 2025-11-27 14:21:57.290876 CET |
| "1764249510.5065134" | "financial_extract" | 0 | 2025-11-27 14:18:31.900797 CET |
| "1764248385.3157668" | "financial_extract" | 0 | 2025-11-27 13:59:46.452635 CET |
SQLMesh Project Setup
Now that we have raw data in DuckDB, we need to set up SQLMesh for the transformation step. Before running any transformations, you need to initialize a SQLMesh project. This is typically a one-time setup using the sqlmesh init command:
sqlmesh init duckdb
This creates a project directory with the following structure:
dlt_sqlmesh_project/
├── config.py # Database connection & project settings
├── external_models.yaml # Schema definitions for external tables (dlt tables)
├── models/ # SQL transformation models
│ └── marts/
│ └── stock_metrics.sql
├── seeds/ # Static CSV data (optional)
├── audits/ # Custom audit definitions (optional)
├── macros/ # Reusable SQL macros (optional)
├── tests/ # Unit tests (optional)
├── logs/ # Execution logs
└── .cache/ # SQLMesh internal cache
Key Configuration Files
Let’s examine the three essential files that define our SQLMesh project.
1. config.py - Database connection configuration
FILE: dlt_sqlmesh_project/config.py
from sqlmesh.core.config import Config, DuckDBConnectionConfig, ModelDefaultsConfig
config = Config(
gateways={
"duckdb_local": {
"connection": DuckDBConnectionConfig(database="../financial_etl_dlt.duckdb"),
"state_connection": DuckDBConnectionConfig(database="../financial_etl_dlt.duckdb"),
}
},
default_gateway="duckdb_local",
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
)
Key settings:
connection: Points to the DuckDB file created by dltstate_connection: Where SQLMesh stores its metadata (same file)dialect: SQL dialect for parsing/generating queries
2. external_models.yaml - Schema for tables NOT managed by SQLMesh (i.e., dlt tables)
FILE: dlt_sqlmesh_project/external_models.yaml
- name: raw.eod_prices_raw
description: Raw OHLCV stock prices loaded by dlt from Yahoo Finance
columns:
ticker: text
date: date
open_price: double
high_price: double
low_price: double
close_price: double
volume: bigint
_dlt_load_id: text
_dlt_id: text
Purpose:
- Tells SQLMesh about tables it doesn’t manage (external sources)
- dlt creates
raw.eod_prices_raw: SQLMesh needs to know its schema - OHLCV = Open, High, Low, Close, Volume (standard financial data)
- Includes dlt metadata columns (
_dlt_load_id,_dlt_id) - Equivalent to
sourcesin dbt
3. models/marts/stock_metrics.sql - The transformation model with technical indicators
FILE: dlt_sqlmesh_project/models/marts/stock_metrics.sql
MODEL (
name marts.stock_metrics,
kind INCREMENTAL_BY_TIME_RANGE (time_column trade_date, batch_size 30),
start '2020-01-01',
cron '@daily',
grain [ticker, trade_date],
audits (...)
);
-- Technical indicators calculated using SQL window functions:
-- 1. SIMPLE MOVING AVERAGES (single column: close_price)
sma_20_day = AVG(close_price) OVER (... ROWS BETWEEN 19 PRECEDING AND CURRENT ROW)
sma_50_day = AVG(close_price) OVER (... ROWS BETWEEN 49 PRECEDING AND CURRENT ROW)
-- 2. BOLLINGER BANDS (single column: close_price)
bollinger_upper = SMA20 + 2 * STDDEV(close_price)
bollinger_lower = SMA20 - 2 * STDDEV(close_price)
-- 3. RSI - Relative Strength Index (single column: close_price)
rsi_14_day = 100 - (100 / (1 + avg_gain / avg_loss))
-- 4. MACD - Moving Average Convergence Divergence (single column: close_price)
macd_line = SMA_12_day - SMA_26_day
-- 5. ATR - Average True Range [MULTI-COLUMN: high, low, close]
true_range = GREATEST(
high - low, -- Intraday range
ABS(high - prev_close), -- Gap up
ABS(low - prev_close) -- Gap down
)
atr_14_day = AVG(true_range) OVER 14 days
-- 6. DAILY METRICS [MULTI-COLUMN]
daily_return_pct = (close - prev_close) / prev_close * 100
price_range_pct = (high - low) / close * 100 -- Uses high, low, close
volume_ratio = volume / AVG(volume) OVER 20 days
Key features:
- ATR uses 3 columns (high_price, low_price, close_price)
price_range_pctalso uses multiple columns- WINDOW clauses for efficient computation
- CTEs for intermediate calculations (gains, losses, true_range)
Transform: Run SQLMesh Pipeline
With the project configured, we can now run SQLMesh to transform the raw data. This is where we found SQLMesh particularly helpful: it handles the complexity of incremental processing and keeps track of what has already been computed.
The sqlmesh plan --auto-apply command will:
- Detect the external table (
raw.eod_prices_raw) created by dlt - Execute our transformation model (
marts.stock_metrics) - Create the output table with technical indicators like SMA (Simple Moving Average, an average of prices over a rolling window, commonly used in financial analysis)
- Run data quality audits automatically
# Run SQLMesh to apply transformations
print("Running SQLMesh transformation plan...")
print("=" * 50)
try:
# Run sqlmesh plan with auto-apply
result = subprocess.run(
["sqlmesh", "plan", "--auto-apply"],
cwd=SQLMESH_PROJECT_DIR,
capture_output=True,
text=True,
check=True,
)
print(result.stdout)
if result.stderr:
# Filter out deprecation warnings
stderr_lines = [
l
for l in result.stderr.split("\n")
if "DeprecationWarning" not in l and l.strip()
]
if stderr_lines:
print("Info:")
print("\n".join(stderr_lines))
print("\nSQLMesh transformation completed successfully!")
except subprocess.CalledProcessError as e:
print("SQLMesh execution failed!")
print(f"stdout: {e.stdout}")
print(f"stderr: {e.stderr}")
raise
Running SQLMesh transformation plan...
==================================================
Summary of differences from `prod`:
Metadata Updated:
- marts.stock_metrics
audits (
NOT_NULL('columns' = (ticker, trade_date, close_price, volume)),
- UNIQUE_COMBINATION_OF_COLUMNS('columns' = (ticker, trade_date))
+ UNIQUE_COMBINATION_OF_COLUMNS('columns' = (ticker, trade_date)),
+ VALID_RSI_RANGE(),
+ VALID_OHLC_PRICES(),
+ POSITIVE_VOLUME(),
+ VALID_ATR()
),
Metadata Updated: marts.stock_metrics
SKIP: No physical layer updates to perform
[ 1/72] marts.stock_metrics [insert 2020-01-01 - 2020-01-30, audits passed 6] 0.04s
...
[72/72] marts.stock_metrics [insert 2025-10-31 - 2025-11-26, audits passed 6] 0.03s
Auditing models ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 72/72 • 0:00:02
Model batches executed
SKIP: No model batches to execute
Updating virtual layer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
Virtual layer updated
SQLMesh transformation completed successfully!
A note on the output: The prod environment represents the current “deployed” state. When we run the plan, it compares our model code against prod and shows a diff of what changed, similar to how Git works. In this case, we added new audits, so SQLMesh detected a metadata change but recognized that the underlying transformation logic was unchanged. That’s why it skipped reprocessing the data (No physical layer updates) and only ran the new audits against the existing data.
One concept that helped us understand SQLMesh better is its two-layer architecture:
Physical Layer (Versioned Tables) Virtual Layer (Views)
┌────────────────────────────────────┐ ┌─────────────────────────┐
│ sqlmesh__marts.marts__stock_metrics│ │ marts.stock_metrics │
│ __[fingerprint] │◄───│ (VIEW pointing to │
└────────────────────────────────────┘ │ physical table) │
└─────────────────────────┘
Benefits:
- Instant environment creation: Just create views, no data copy
- Instant promotion: Swap view pointers, no recomputation
- Easy rollbacks: Point to previous version
Verify: Query Transformed Data
Before loading the data to its final destination, it’s worth verifying that the transformation worked as expected. Let’s query the transformed data from DuckDB and look at some of the technical indicators.
# Connect to DuckDB and query the transformed data with technical indicators
print("Querying transformed data with technical indicators...")
print("=" * 50)
with duckdb.connect(DUCKDB_FILE) as con:
# Query the marts table - show key technical indicators
query = """
SELECT
ticker,
trade_date,
ROUND(close_price, 2) AS close,
ROUND(sma_20_day, 2) AS sma20,
ROUND(sma_50_day, 2) AS sma50,
ROUND(rsi_14_day, 1) AS rsi,
ROUND(macd_line, 2) AS macd,
ROUND(atr_14_day, 2) AS atr,
ROUND(bollinger_upper, 2) AS bb_upper,
ROUND(bollinger_lower, 2) AS bb_lower
FROM marts.stock_metrics
WHERE ticker = 'AAPL'
ORDER BY trade_date DESC
LIMIT 15
"""
df_transformed = con.execute(query).pl()
print("Sample AAPL data with technical indicators (most recent 15 rows):")
df_transformed
Querying transformed data with technical indicators...
==================================================
Sample AAPL data with technical indicators (most recent 15 rows):
| ticker | trade_date | close | sma20 | sma50 | rsi | macd | atr | bb_upper | bb_lower |
|---|---|---|---|---|---|---|---|---|---|
| str | date | f64 | f64 | f64 | f64 | f64 | f64 | f64 | f64 |
| "AAPL" | 2025-11-26 | 277.55 | 271.13 | 271.13 | 63.0 | 1.02 | 5.95 | 277.91 | 264.34 |
| "AAPL" | 2025-11-25 | 276.97 | 270.77 | 270.77 | 61.6 | 0.7 | 6.14 | 276.98 | 264.56 |
| "AAPL" | 2025-11-24 | 275.92 | 270.41 | 270.41 | 60.3 | 0.33 | 6.11 | 275.95 | 264.86 |
| "AAPL" | 2025-11-21 | 271.49 | 270.06 | 270.06 | 55.1 | 0.14 | 5.95 | 274.98 | 265.14 |
| "AAPL" | 2025-11-20 | 266.25 | 269.97 | 269.97 | 41.4 | 0.1 | 5.73 | 275.0 | 264.94 |
| … | … | … | … | … | … | … | … | … | … |
| "AAPL" | 2025-11-12 | 273.47 | 270.49 | 270.49 | 63.0 | 0.0 | 5.46 | 275.11 | 265.87 |
| "AAPL" | 2025-11-11 | 275.25 | 270.12 | 270.12 | 73.1 | 0.0 | 5.64 | 274.45 | 265.79 |
| "AAPL" | 2025-11-10 | 269.43 | 269.39 | 269.39 | 43.6 | 0.0 | 5.52 | 270.72 | 268.05 |
| "AAPL" | 2025-11-07 | 268.21 | 269.38 | 269.38 | 26.7 | 0.0 | 5.4 | 270.84 | 267.92 |
| "AAPL" | 2025-11-06 | 269.51 | 269.61 | 269.61 | 39.2 | 0.0 | 5.38 | 270.63 | 268.6 |
Lineage
We can also explore column lineage using SQLMesh Python API:
def get_source_columns(node):
"""Recursively extract source column names from lineage node."""
sources = []
for downstream in node.downstream:
if not downstream.downstream:
# Leaf node - actual source column
sources.append(downstream.name)
else:
sources.extend(get_source_columns(downstream))
return sources
# Initialize SQLMesh context
ctx = Context(paths=[SQLMESH_PROJECT_DIR])
# Get the stock_metrics model
model = ctx.get_model("marts.stock_metrics")
print("Column Lineage for marts.stock_metrics")
print("=" * 60)
# Compute actual lineage using SQLGlot
print(f"\n{'Output Column':20} {'Type':10} {'Source Columns'}")
print("-" * 60)
for col_name, col_type in model.columns_to_types.items():
result = lineage.lineage(col_name, model.query)
sources = list(dict.fromkeys(get_source_columns(result))) # dedupe, preserve order
source_str = ", ".join(sources) if sources else "N/A"
print(f"{col_name:20} {str(col_type):10} {source_str}")
Column Lineage for marts.stock_metrics
============================================================
Output Column Type Source Columns
------------------------------------------------------------
ticker TEXT eod_prices_raw.ticker
trade_date DATE eod_prices_raw.date
open_price DOUBLE eod_prices_raw.open_price
high_price DOUBLE eod_prices_raw.high_price
low_price DOUBLE eod_prices_raw.low_price
close_price DOUBLE eod_prices_raw.close_price
volume BIGINT eod_prices_raw.volume
sma_20_day DOUBLE eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
sma_50_day DOUBLE eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
bollinger_upper DOUBLE eod_prices_raw.close_price
bollinger_lower DOUBLE eod_prices_raw.close_price
rsi_14_day DOUBLE eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
macd_line DOUBLE eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
sma_9_day DOUBLE eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
atr_14_day DOUBLE eod_prices_raw.low_price, eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker, eod_prices_raw.high_price
daily_return_pct DOUBLE eod_prices_raw.close_price, eod_prices_raw.date, eod_prices_raw.ticker
price_range_pct DOUBLE eod_prices_raw.high_price, eod_prices_raw.close_price, eod_prices_raw.low_price
volume_ratio DOUBLE eod_prices_raw.volume
volume_sma_20 BIGINT eod_prices_raw.volume
SQLMesh Audits
One feature we found particularly useful is SQLMesh’s audits : these are data quality checks that run automatically after model execution. SQLMesh supports two types:
1. Built-in Audits (defined inline in the MODEL):
audits (
not_null(columns := (ticker, trade_date)),
unique_combination_of_columns(columns := (ticker, trade_date))
)
2. Custom Audits (defined in audits/ directory):
dlt_sqlmesh_project/
└── audits/
├── valid_rsi_range.sql # RSI must be 0-100
├── valid_ohlc_prices.sql # High >= Low, etc.
├── positive_volume.sql # Volume > 0
└── valid_atr.sql # ATR >= 0
Custom audits return rows that violate the condition. If any rows are returned, the audit fails.
Example Custom Audit: valid_ohlc_prices.sql
AUDIT (
name valid_ohlc_prices,
dialect duckdb
);
/*
* OHLC Price Sanity Check (Multi-Column Audit)
* Validates relationships between Open, High, Low, Close prices:
* - High must be >= Low
* - High must be >= Open and Close
* - Low must be <= Open and Close
* Returns rows that violate these constraints.
*/
SELECT
ticker,
trade_date,
open_price,
high_price,
low_price,
close_price,
CASE
WHEN high_price < low_price THEN 'high < low'
WHEN high_price < open_price THEN 'high < open'
WHEN high_price < close_price THEN 'high < close'
WHEN low_price > open_price THEN 'low > open'
WHEN low_price > close_price THEN 'low > close'
END AS violation_reason
FROM @this_model
WHERE high_price < low_price
OR high_price < open_price
OR high_price < close_price
OR low_price > open_price
OR low_price > close_price
Note: @this_model is a macro that refers to the model being audited.
Let’s run all the audits:
# Run SQLMesh audits to validate data quality
print("Running data quality audits...")
print("=" * 50)
result = subprocess.run(
["sqlmesh", "audit"],
cwd=SQLMESH_PROJECT_DIR,
capture_output=True,
text=True,
)
print(result.stdout)
if result.returncode == 0:
print("\nAll audits passed!")
else:
print("\nAudit failures detected:")
print(result.stderr)
Running data quality audits...
==================================================
Found 6 audits.
not_null on model marts.stock_metrics ✅ PASS.
unique_combination_of_columns on model marts.stock_metrics ✅ PASS.
valid_rsi_range on model marts.stock_metrics ✅ PASS.
valid_ohlc_prices on model marts.stock_metrics ✅ PASS.
positive_volume on model marts.stock_metrics ✅ PASS.
valid_atr on model marts.stock_metrics ✅ PASS.
Finished with 0 audit errors and 0 audits skipped.
Done.
All audits passed!
Load: dlt to SQL Server
With our data transformed and validated, we can move to the final Load phase. Here, we use dlt again, this time to export the transformed data from DuckDB to SQL Server. This demonstrates how dlt can work bidirectionally, not just for ingestion but also for exporting data to downstream systems.
dlt sql_database Source
dlt provides a sql_database source that can read from any SQLAlchemy-compatible database, including DuckDB:
from dlt.sources.sql_database import sql_database
source = sql_database(
credentials="duckdb:///path/to/db.duckdb",
schema="marts",
table_names=["stock_metrics"]
)
dlt mssql Destination
dlt supports SQL Server as a destination:
pipeline = dlt.pipeline(
destination=dlt.destinations.mssql(credentials={...})
)
Prerequisites:
- Microsoft ODBC Driver 17 or 18 for SQL Server
- Valid SQL Server credentials
# Load credentials
def load_credentials(creds_file: str, conn_key: str) -> dict:
"""Load SQL Server credentials from JSON file."""
with open(creds_file, "r") as f:
creds = json.load(f)
return creds[conn_key]["info"]
print("Loading SQL Server credentials...")
try:
creds = load_credentials(CREDENTIALS_FILE, TARGET_CONN_KEY)
print(f" Server: {creds['server']}:{creds['port']}")
print(f" Database: {creds['database']}")
print(f" Target: {TARGET_SCHEMA}.{TARGET_TABLE}")
creds_loaded = True
except Exception as e:
print(f"Warning: Could not load credentials - {e}")
print("\nSQL Server load will be skipped.")
creds_loaded = False
Loading SQL Server credentials...
Server: localhost:1433
Database: FINANCIAL_MART
Target: dbo.Dim_Stock_Metrics
# Create and run the dlt load pipeline (DuckDB -> SQL Server)
if creds_loaded:
print("Running dlt load pipeline (DuckDB -> SQL Server)...")
print("=" * 50)
try:
# Import the sql_database source
from dlt.sources.sql_database import sql_database
# Create source from DuckDB
# Read from the marts.stock_metrics table
duckdb_source = sql_database(
credentials=f"duckdb:///{DUCKDB_FILE}",
schema="marts",
table_names=["stock_metrics"],
)
# Create pipeline to SQL Server
# Note: We need to pass driver options for self-signed certificates
load_pipeline = dlt.pipeline(
pipeline_name="financial_load",
destination=dlt.destinations.mssql(
credentials={
"database": creds["database"],
"username": creds["username"],
"password": creds["password"],
"host": creds["server"],
"port": creds["port"],
"driver": "ODBC Driver 18 for SQL Server",
"query": {"TrustServerCertificate": "yes"},
}
),
dataset_name=TARGET_SCHEMA,
)
# Run the pipeline
load_info = load_pipeline.run(duckdb_source, write_disposition="replace")
print(f"\nLoad completed!")
print(f" - Source: {DUCKDB_FILE} (marts.stock_metrics)")
print(f" - Destination: SQL Server ({creds['database']})")
print(f"\nLoad info:")
print(load_info)
except Exception as e:
print(f"\nLoad failed: {e}")
print("\nTroubleshooting tips:")
print(" 1. Verify SQL Server is running and accessible")
print(" 2. Check ODBC Driver 17/18 is installed")
print(" 3. Verify the target database exists")
else:
print("\nSkipping SQL Server load (no valid credentials).")
print("The transformed data is available in DuckDB at:")
print(f" {DUCKDB_FILE} -> marts.stock_metrics")
Running dlt load pipeline (DuckDB -> SQL Server)...
==================================================
/home/francois/miniconda3/envs/pipeline313/lib/python3.13/site-packages/duckdb_engine/__init__.py:184: DuckDBEngineWarning: duckdb-engine doesn't yet support reflection on indices
warnings.warn(
Load completed!
- Source: ./financial_etl_dlt.duckdb (marts.stock_metrics)
- Destination: SQL Server (FINANCIAL_MART)
Load info:
Pipeline financial_load load step completed in 6.88 seconds
1 load package(s) were loaded to destination mssql and into dataset dbo
The mssql destination used mssql://migadmin:***@localhost:1433/financial_mart location to store data
Load package 1764251428.6438282 is LOADED and contains no failed jobs
We can check that the data is in SQL Server:
SELECT TOP 10 * FROM dbo.stock_metrics ORDER BY trade_date DESC",
| ticker | trade_date | close_price | volume | sma_50_day | _dlt_load_id | _dlt_id | open_price | high_price | low_price | ... | bollinger_upper | bollinger_lower | rsi_14_day | macd_line | sma_9_day | atr_14_day | daily_return_pct | price_range_pct | volume_ratio | volume_sma_20 | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | TSLA | 2025-11-26 | 426.579987 | 63299400 | 425.9326 | 1764251428.6438282 | dp7lDd+JOoBe9g | 423.950012 | 426.940002 | 416.890015 | ... | 473.2135 | 378.6517 | 43.15 | -14.1993 | 407.6211 | 21.3321 | 1.7120 | 2.3559 | 0.72 | 88165984 |
| 1 | GOOGL | 2025-11-26 | 319.950012 | 51290800 | 290.8753 | 1764251428.6438282 | DfvXqKHwDRHjuQ | 320.679993 | 324.500000 | 316.790009 | ... | 319.7867 | 261.9638 | 69.13 | 4.6406 | 298.8444 | 12.1707 | -1.0790 | 2.4097 | 1.12 | 45707632 |
| 2 | META | 2025-11-26 | 633.609985 | 15186100 | 617.5516 | 1764251428.6438282 | j/1Q7cXe0XMQ9Q | 637.690002 | 638.359985 | 631.630005 | ... | 652.9907 | 582.1125 | 56.86 | -8.2399 | 607.3067 | 17.8543 | -0.4102 | 1.0622 | 0.62 | 24326500 |
| 3 | MSFT | 2025-11-26 | 485.500000 | 25697600 | 497.4609 | 1764251428.6438282 | w+FErTh2jGGz1g | 486.309998 | 488.309998 | 481.200012 | ... | 526.8620 | 468.0599 | 43.31 | -5.6150 | 486.8763 | 11.8285 | 1.7841 | 1.4645 | 0.98 | 26267837 |
| 4 | AMZN | 2025-11-26 | 229.160004 | 38435400 | 236.8532 | 1764251428.6438282 | idQBkg7Rc/3mEw | 230.740005 | 231.750000 | 228.770004 | ... | 260.0106 | 213.6957 | 36.48 | -6.3015 | 226.1933 | 6.5457 | -0.2221 | 1.3004 | 0.70 | 55171516 |
| 5 | NVDA | 2025-11-26 | 180.259995 | 183181100 | 189.3247 | 1764251428.6438282 | hNEoZkDqEOIfQA | 181.630005 | 182.910004 | 178.240005 | ... | 206.1015 | 172.5480 | 43.50 | -4.4397 | 182.7556 | 9.0200 | 1.3722 | 2.5907 | 0.83 | 221997200 |
| 6 | AAPL | 2025-11-26 | 277.549988 | 33413600 | 271.1275 | 1764251428.6438282 | KXA3vllpset5WQ | 276.959991 | 279.529999 | 276.630005 | ... | 277.9141 | 264.3409 | 63.02 | 1.0158 | 271.5611 | 5.9532 | 0.2094 | 1.0449 | 0.67 | 49662732 |
| 7 | TSLA | 2025-11-25 | 419.399994 | 71915600 | 425.8967 | 1764251428.6438282 | h4GeCQBQU+CrHg | 414.420013 | 420.480011 | 405.950012 | ... | 474.5472 | 377.2461 | 35.78 | -12.6092 | 404.8889 | 22.9257 | 0.3878 | 3.4645 | 0.80 | 89547461 |
| 8 | GOOGL | 2025-11-25 | 323.440002 | 88632100 | 289.2600 | 1764251428.6438282 | frSIcFsmz+UMgw | 326.209991 | 328.829987 | 317.649994 | ... | 315.2424 | 263.2776 | 71.99 | 3.7683 | 294.2467 | 12.1350 | 1.5255 | 3.4566 | 1.95 | 45397456 |
| 9 | META | 2025-11-25 | 636.219971 | 25213000 | 616.6595 | 1764251428.6438282 | 0bqKg0SIKNKX/g | 624.000000 | 637.049988 | 618.299988 | ... | 652.2370 | 581.0819 | 50.11 | -7.5019 | 604.6711 | 18.6593 | 3.7795 | 2.9471 | 1.02 | 24834300 |
10 rows × 21 columns
Production Deployment: Logging & Return Codes
The pipeline above works well for development, but deploying to production with a scheduler (cron, Airflow, Prefect, etc.) requires a few additional considerations. We found that proper logging and return codes are essential for monitoring and debugging issues.
Why This Matters
| Aspect | Development | Production |
|---|---|---|
| Output | print() statements |
Structured log files |
| Exit behavior | Exceptions crash the script | Return codes signal status |
| Monitoring | Manual inspection | Automated alerts on failures |
Logging Setup
Replace print() with Python’s logging module for production, or loguru:
import logging
import sys
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Usage
logger.info("Starting ETL pipeline...")
logger.warning("Missing data for ticker: XYZ")
logger.error("Failed to connect to SQL Server")
Log Levels:
DEBUG: Detailed diagnostic info (row counts, query times)INFO: General progress updates (“Loaded 2012 rows”)WARNING: Non-critical issues (“Retrying connection…”)ERROR: Failures that stop the pipelineCRITICAL: System-level failures
Return Codes for Schedulers
Schedulers use exit codes to determine success/failure. Use sys.exit() with meaningful codes:
import sys
# Define exit codes
EXIT_SUCCESS = 0
EXIT_GENERAL_ERROR = 1
EXIT_CONFIG_ERROR = 2
EXIT_DATA_QUALITY_ERROR = 3
EXIT_CONNECTION_ERROR = 4
def main():
try:
# Load configuration
if not os.path.exists('credentials.json'):
logger.error("Missing credentials.json")
sys.exit(EXIT_CONFIG_ERROR)
# Extract
logger.info("Extracting data...")
df = extract_data()
# Validate
if len(df) == 0:
logger.error("No data extracted - aborting")
sys.exit(EXIT_DATA_QUALITY_ERROR)
# Transform
logger.info("Running SQLMesh transformations...")
run_sqlmesh()
# Load
logger.info("Loading to SQL Server...")
load_to_sql_server()
logger.info("Pipeline completed successfully")
sys.exit(EXIT_SUCCESS)
except ConnectionError as e:
logger.error(f"Connection failed: {e}")
sys.exit(EXIT_CONNECTION_ERROR)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(EXIT_GENERAL_ERROR)
if __name__ == "__main__":
main()
Sample Scheduler Configuration (cron)
# Run daily at 6 AM, log output, alert on failure
0 6 * * * /usr/bin/python3 /path/to/run_pipeline.py >> /var/log/etl.log 2>&1 || mail -s "ETL Failed" alerts@company.com
Sample Log Output
2024-01-15 06:00:01 - INFO - Starting ETL pipeline...
2024-01-15 06:00:02 - INFO - Extracting data from yfinance...
2024-01-15 06:00:05 - INFO - Extracted 2012 rows for ['AAPL', 'MSFT']
2024-01-15 06:00:06 - INFO - Running SQLMesh transformations...
2024-01-15 06:00:08 - INFO - SQLMesh plan applied successfully
2024-01-15 06:00:09 - INFO - Loading to SQL Server...
2024-01-15 06:00:12 - INFO - Loaded 2012 rows to dbo.Dim_Stock_Metrics
2024-01-15 06:00:12 - INFO - Pipeline completed successfully
Further Engineering Considerations
There are a few more topics worth considering as you move toward production. These go beyond the scope of this example, but we wanted to mention them briefly.
1. Error Handling and Observability
While dlt and SQLMesh provide solid foundations, production-grade pipelines typically need more comprehensive error handling and observability.
- Centralized Logging: Beyond basic file logging, integrate with centralized logging systems for easier searching, aggregation, and analysis of logs across multiple pipeline runs.
- Alerting: Configure alerts based on log errors, audit failures, or unexpected data volumes.
- Monitoring Dashboards: Build dashboards to visualize pipeline health, execution times, data volumes, and data quality metrics over time. This helps in proactive identification of issues and performance bottlenecks.
- Idempotency and Retries: Design pipelines to be idempotent where possible, allowing safe retries without duplicating data or side effects.
dlt’swrite_dispositionandSQLMesh’s transactional updates aid this. Implement retry mechanisms with exponential backoff for transient errors (e.g., network issues, API rate limits).
2. Schema Evolution Strategy
dlt’s automatic schema evolution is a helpful feature, but managing it thoughtfully becomes important when transformations and downstream consumers are involved.
- Graceful Changes: While
dlthandles schema additions, consider how schema changes (e.g., column renaming, type changes) in the raw layer propagate.SQLMesh’s column-level lineage can help identify affected downstream models. - Versioned External Models: For significant schema changes in external sources,
SQLMeshallows versioning of external model definitions. This lets you gracefully transition dependents without breaking existing queries. - Impact Analysis: Before deploying a schema change, use
SQLMesh’splancommand to visualize the impact on dependent models. This helps prevent unexpected issues in the transformation layer.
3. Cost/Resource Management
While DuckDB is efficient for local development and moderate data volumes, larger-scale deployments may require thinking about cost and resource tradeoffs.
- Compute vs. Storage: For cloud environments, understand the trade-offs between compute and storage costs. DuckDB is compute-bound locally; in a cloud data warehouse, query complexity and data scanned directly impact costs.
- Incremental Processing: SQLMesh’s incremental models are critical for cost optimization. By only processing new or changed data, you significantly reduce compute resources and execution time compared to full table rebuilds.
- Resource Allocation: Fine-tune resource allocation (CPU, memory) for pipeline execution environments, especially when running on orchestrators like Airflow.
- Cloud-Native Alternatives: If the DuckDB file grows excessively large or requires distributed processing, consider migrating to cloud-native data warehouses or data lake solutions that offer scalable compute and storage.
4. Testability
Beyond SQLMesh audits, it is recommended to consider a broader testing strategy for production pipelines:
- Unit Tests for dlt Resources: Write Python unit tests for your custom
dltresources (yfinance_eod_pricesin this example). Mock external dependencies (like theyfinanceAPI) to ensure the resource logic works correctly under various conditions. - SQLMesh Unit Tests: `SQLMesh supports SQL unit tests for models. These tests define expected outputs for specific input data, ensuring transformation logic is correct and remains so after changes.
- Integration Tests: Test the full pipeline flow (Extract -> Transform -> Load) in a controlled environment with representative data. This ensures all components work together seamlessly.
- Virtual Data Environments (VDEs) for Development: Leverage
SQLMeshVDEs to create isolated environments for feature development. This allows developers to test changes to models without impacting production data or other developers’ work. Changes can be validated in a VDE before merging to a shared environment.
By considering these aspects, you can evolve a pipeline like this from a working example into something more suitable for production use.