Incremental Sync

Sync only changed data using watermark-based incremental exports.

What is Incremental Sync?

LakeXpress tracks a “high watermark” (the highest value in a timestamp or numeric column) and only exports rows above that watermark on subsequent syncs, instead of exporting entire tables every time.

When to Use Incremental Sync

Use incremental sync when:

  • Regularly updated tables: Source tables receive frequent inserts or updates
  • Frequent syncs: Exports run multiple times per day or on a schedule
  • Cost efficiency: You want to minimize network and compute usage
  • Time-series data: Tables have timestamp columns tracking record creation or modification

Examples:

  • Daily sales order updates to a data lake
  • Event log aggregation from production systems
  • Time-series metrics collection
  • Transaction processing pipelines

Supported Column Types

Type Format Example Use Case
date YYYY-MM-DD 2025-01-08 Daily transactions, order dates
datetime YYYY-MM-DD HH:MM:SS 2025-01-08 14:30:25 Precise time tracking
timestamp Database timestamp type 2025-01-08T14:30:25Z Creation/modification times
integer Numeric sequence 1000001 Monotonic ID columns, batch IDs

Configuration Syntax

Define incremental tables with --incremental_table:

./LakeXpress config create \
  ... \
  --incremental_table "schema.table:column:type"

Basic Syntax

schema.table:column:type

Parameters:

  • schema.table - Fully qualified table name
  • column - Column to track for watermark (should be indexed)
  • type - Column type: date, datetime, timestamp, or integer

Example:

--incremental_table "sales.orders:created_date:date"

Advanced Syntax (Optional)

schema.table:column:type[:direction][@start_value][!strategy]

Extended Parameters:

  • :direction - Include (:i, default) or exclude (:e)
  • @start_value - Override the initial watermark value
  • !strategy - Loading strategy: append (default) or upsert

Examples:

# Include direction (explicit)
--incremental_table "sales.orders:created_date:date:i"

# Exclude from incremental sync
--incremental_table "sales.returns:return_date:date:e"

# Set initial watermark
--incremental_table "sales.orders:created_date:date@2025-01-01"

# Use UPSERT/MERGE strategy (updates existing rows)
--incremental_table "sales.orders:created_date:date!upsert"

# Combined: direction + start value + upsert
--incremental_table "sales.orders:created_date:date:i@2025-01-01!upsert"

Data Loading Strategies

Append Strategy (Default)

Inserts new rows into the target table. Suitable when:

  • Your target handles duplicates (e.g., external tables)
  • You’re building an append-only data lake
  • You dedup at query time
--incremental_table "events.pageviews:event_time:timestamp!append"

Upsert Strategy (MERGE)

Uses MERGE to update existing rows and insert new ones. Requires:

  • Primary keys defined on the table (stored in the logging database)
  • A target platform supporting MERGE (Snowflake, Databricks, Fabric)
  • Managed/internal tables (not external tables)
--incremental_table "sales.orders:updated_at:datetime!upsert"

How it works:

  1. New data loads into a staging table
  2. MERGE runs: matching rows are updated, new rows are inserted
  3. Staging table is dropped

Benefits:

  • No duplicate rows in target
  • Handles both inserts and updates
  • Target always reflects current state

Requirements:

  • Tables must have primary key columns defined in the source
  • Use --publish_method internal or --publish_method managed
  • Primary keys are automatically detected from the source database schema

Example with Snowflake:

./LakeXpress config create \
  -a credentials.json \
  --log_db_auth_id log_db_postgres \
  --source_db_auth_id source_postgres \
  --source_db_name ecommerce \
  --source_schema_name public \
  --fastbcp_dir_path ./FastBCP_linux-x64/latest/ \
  --target_storage_id s3_01 \
  --incremental_table "public.orders:updated_at:datetime!upsert" \
  --incremental_table "public.customers:updated_at:datetime!upsert" \
  --publish_target snowflake_prod \
  --publish_method internal \
  --n_jobs 4

Example with Databricks:

./LakeXpress config create \
  -a credentials.json \
  --log_db_auth_id log_db_sqlite \
  --source_db_auth_id source_mssql \
  --source_db_name sales \
  --source_schema_name dbo \
  --fastbcp_dir_path ./FastBCP_linux-x64/latest/ \
  --target_storage_id s3_01 \
  --incremental_table "dbo.products:modified_date:datetime!upsert" \
  --publish_target databricks_unity \
  --databricks_table_type managed \
  --n_jobs 4

Choosing a Strategy

Scenario Recommended Strategy
Append-only event logs append
CDC (Change Data Capture) upsert
Transaction history append
Customer/product master data upsert
Time-series metrics append
Order status updates upsert

Complete Example

Step 1: Create Configuration with Incremental Tables

./LakeXpress config create \
  -a credentials.json \
  --log_db_auth_id log_db_ms \
  --source_db_auth_id ds_04_pg \
  --source_db_name tpch \
  --source_schema_name tpch_1_incremental \
  --fastbcp_dir_path ./FastBCP_linux-x64/latest/ \
  --target_storage_id aws_s3_01 \
  --incremental_table "tpch_1_incremental.orders:o_orderdate:date" \
  --incremental_table "tpch_1_incremental.lineitem:l_shipdate:date" \
  --incremental_safety_lag 3600 \
  --generate_metadata \
  --n_jobs 4 \
  --fastbcp_p 2
Parameter Meaning
--incremental_table "tpch_1_incremental.orders:o_orderdate:date" Track orders by o_orderdate column
--incremental_table "tpch_1_incremental.lineitem:l_shipdate:date" Track lineitem by l_shipdate column
--incremental_safety_lag 3600 Wait 1 hour after the watermark (handles late data)
--generate_metadata Generate CDM metadata for exported tables

Step 2: Run First Sync

./LakeXpress sync

First sync behavior:

  • Exports all rows from orders and lineitem
  • Records the highest o_orderdate and l_shipdate as watermarks
  • Non-incremental tables are fully exported
  • Stores watermarks in the logging database

Step 3: Subsequent Syncs

./LakeXpress sync

Subsequent sync behavior:

  • Loads previous watermarks from the logging database
  • Exports only rows where o_orderdate > previous_watermark - safety_lag
  • Exports only rows where l_shipdate > previous_watermark - safety_lag
  • Updates watermarks with current highest values
  • Non-incremental tables are fully exported again

Watermark Tracking

LakeXpress maintains watermarks in the logging database:

First Sync (Full Load)

Table: orders
Column: o_orderdate
Watermark: NULL → 2025-12-31 (highest date in table)
Records exported: 1,500,000

Second Sync (Incremental)

Query: SELECT * FROM orders WHERE o_orderdate > 2025-12-24 - 1 hour
Expected records: 50,000 (new orders in last week)
Watermark updated: 2025-12-31 → 2026-01-05

Non-incremental tables

Tables not configured with --incremental_table are fully exported on each sync. Useful for small dimension or reference tables.

./LakeXpress config create \
  ... \
  --incremental_table "fact.sales:sale_date:date" \
  --n_jobs 4
  • fact.sales - Exports only new records since last watermark
  • All other tables in the schema - Fully exported on each sync

Safety Lag

The --incremental_safety_lag parameter handles late-arriving data:

./LakeXpress config create \
  ... \
  --incremental_table "events.raw_events:event_timestamp:datetime" \
  --incremental_safety_lag 3600 \
  ...
  • --incremental_safety_lag INT - Lag in seconds (default: 0)

Example with 1-hour lag:

Current time: 2025-01-08 14:00:00
Watermark: 2025-01-08 10:00:00
Query includes: WHERE event_timestamp > 2025-01-08 09:00:00
(1 hour before watermark)

When to use:

  • Asynchronous systems with delayed writes
  • Multi-region databases with replication lag
  • Event streams with out-of-order processing
  • Financial transactions with settlement delays

Real-World Scenarios

Scenario 1: Daily Order Processing

./LakeXpress config create \
  -a credentials.json \
  --log_db_auth_id log_db_postgres \
  --source_db_auth_id source_postgres \
  --source_db_name ecommerce \
  --source_schema_name public \
  --fastbcp_dir_path ./FastBCP_linux-x64/latest/ \
  --target_storage_id s3_01 \
  --incremental_table "public.orders:created_at:datetime" \
  --incremental_table "public.order_items:created_at:datetime" \
  --publish_target snowflake_prod \
  --n_jobs 4

# Run daily via cron
./LakeXpress sync
  • Day 1: Exports 1,000,000 orders (full load)
  • Day 2: Exports ~5,000 new orders (incremental)
  • Day 3: Exports ~4,800 new orders (incremental)
  • Other tables (customers, products) fully exported daily

Scenario 2: Event Log Aggregation

./LakeXpress config create \
  -a credentials.json \
  --log_db_auth_id log_db_ms \
  --source_db_auth_id source_postgres \
  --source_db_name analytics \
  --source_schema_name events \
  --fastbcp_dir_path ./FastBCP_linux-x64/latest/ \
  --target_storage_id aws_s3_01 \
  --incremental_table "events.pageviews:event_time:timestamp" \
  --incremental_table "events.clicks:event_time:timestamp" \
  --incremental_table "events.conversions:event_time:timestamp" \
  --incremental_safety_lag 600 \
  --sub_path production/events \
  --n_jobs 8 \
  --fastbcp_p 4

# Run every 10 minutes
./LakeXpress sync
  • Ingests events from multiple tables continuously
  • 10-minute safety lag handles processing delays

Scenario 3: Time-Series Metrics

./LakeXpress config create \
  -a credentials.json \
  --log_db_auth_id log_db_sqlite \
  --source_db_auth_id source_postgres \
  --source_db_name monitoring \
  --source_schema_name metrics \
  --fastbcp_dir_path ./FastBCP_linux-x64/latest/ \
  --target_storage_id azure_01 \
  --incremental_table "metrics.cpu_usage:recorded_at:timestamp" \
  --incremental_table "metrics.memory_usage:recorded_at:timestamp" \
  --incremental_table "metrics.disk_io:recorded_at:timestamp" \
  --incremental_safety_lag 300 \
  --n_jobs 4 \
  --generate_metadata

# Run every 5 minutes
./LakeXpress sync
  • High-frequency metric collection to Azure storage
  • Each sync captures the last 5+ minutes of data

Query Watermarks

Inspect tracked watermarks by querying the logging database:

-- View all incremental configurations
SELECT
    sync_id,
    config_name,
    source_table,
    incremental_column,
    last_watermark,
    updated_at
FROM sync_configurations
WHERE is_incremental = true
ORDER BY updated_at DESC;

-- View recent watermark updates
SELECT
    run_id,
    sync_id,
    source_table,
    previous_watermark,
    new_watermark,
    rows_exported,
    started_at,
    completed_at
FROM incremental_watermarks
ORDER BY completed_at DESC
LIMIT 10;

Performance Considerations

Column Selection

Choose the incremental column carefully:

-- GOOD: Indexed column, never updates
--incremental_table "sales.orders:order_id:integer"

-- GOOD: Updated on insert only
--incremental_table "sales.orders:created_date:date"

-- CAUTION: May need resync if updated
--incremental_table "sales.orders:updated_at:datetime"

-- AVOID: May miss updates
--incremental_table "sales.orders:order_amount:integer"

Indexing

Create an index on the watermark column:

CREATE INDEX idx_orders_created_at ON orders(created_at);

Combining incremental and full exports

./LakeXpress config create \
  ... \
  --incremental_table "fact.transactions:txn_date:date" \
  --incremental_safety_lag 3600 \
  --n_jobs 4 \
  --fastbcp_p 8
  • Fact tables configured with --incremental_table sync incrementally
  • All other tables in the schema are fully exported (useful for dimension tables)

Troubleshooting

Issue: Missing Recent Data

Symptom: Data added to source appears 2-3 syncs later.

Solution: Reduce --incremental_safety_lag:

# Current: 1-hour lag
--incremental_safety_lag 3600

# Reduced: 10-minute lag
--incremental_safety_lag 600

Issue: Duplicate Rows After Resync

Symptom: Running the same sync twice produces duplicates.

Solutions:

  1. Use upsert strategy (recommended): Configure with !upsert to use MERGE:
--incremental_table "sales.orders:created_date:date!upsert"
  1. Deduplicate in queries: If using append mode, use ROW_NUMBER() or QUALIFY at query time.

  2. Use external tables: External tables over Parquet files replace files on each sync rather than inserting rows.

Issue: Watermark Not Advancing

Symptom: Watermark stays at the same value across syncs.

Solution: Verify the incremental column has new values:

SELECT MAX(created_at) FROM orders;
-- If unchanged, the watermark is correct

Advanced Topics

Multiple Watermark Columns

Track changes in multiple columns with separate configurations:

# Configuration 1: Track by create date
lakexpress config create \
  ... \
  --incremental_table "transactions:created_at:datetime"

# Configuration 2: Track by update date
lakexpress config create \
  ... \
  --incremental_table "transactions:updated_at:datetime"

Run both syncs to capture creates and updates separately.

Resetting Watermarks

To do a full reload:

# Option 1: Delete and recreate the configuration
./LakeXpress config delete \
  -a credentials.json \
  --log_db_auth_id log_db_postgres \
  --sync_id 20251208-xxxxx

# Then create a new one
./LakeXpress config create ...

# Option 2: Override watermark on next sync
./LakeXpress sync --reset-watermarks

Combining with Snowflake Publishing

./LakeXpress config create \
  ... \
  --incremental_table "sales.orders:order_date:date" \
  --incremental_table "sales.returns:return_date:date" \
  --target_storage_id s3_01 \
  --publish_target snowflake_prod \
  --publish_method internal \
  --publish_schema_pattern "{schema}_incremental" \
  --n_jobs 4

./LakeXpress sync

Creates Snowflake tables continuously updated with new data. Non-incremental tables are fully exported and published on each sync.

See Also