query_1

In this notebook, we are going to query some Parquet files with the following SQL engines:

  • DuckDB : an in-process SQL OLAP database management system. We are going to use its Python Client API (MIT license).
  • Tableau Hyper : an in-memory data engine. We are going to interact with this engine using the tableauhyperapi Python package (Proprietary License).

Both of these tools are optimized for Online analytical processing (OLAP). We do not want to modify the data but launch queries that require processing a large amount of data. DuckDB and Tableau Hyper make use of some vectorized engine and some amount of parallel processing, well suited for the columnar storage format of Parquet files. This is very well described in this post from the DuckDB team. Here is a quote from this blog post:

DuckDB will read the Parquet files in a streaming fashion, which means you can perform queries on large Parquet files that do not fit in your main memory.

Tableau Hyper engine has the ability to read Parquet files using the external keyword.

External data can be read directly in a SQL query using the set returning function external. In this case, no Hyper table is involved, so such a query can even be used if no database is attached to the current session.

The Parquet files correspond to a very specific use case, since they all describe some road networks from the US or Europe. The US road networks were imported in a previous post: Download some benchmark road networks for Shortest Paths algorithms. The Europe networks were downloaded from this web page and converted to Parquet files. We are only going to use the edge table, not the node coordinates one. The SQL queries in this notebook are also very specific, in a sense that they are related to the graph theory domain. Here are the things that we are going to compute:

  1. occurence of parallel edges
  2. vertex and edge counts
  3. count of connected vertices
  4. count of vertices with one incoming and one outgoing egde
  5. degree distribution

For each query and SQL engine, we are going to measure the elapsed time. In this post, we did not measure the memory consumption.

Notes:

  • The Parquet files are not compressed.
  • both engines usually make use of their own optimized file format, e.g. .hyper files for Tableau hyper. However, they both support direct querying of CSV or Parquet files.
  • We are going to use DuckDB and Tableau Hyper with the default configuration.
  • Most of the SQL queries could probably be optimized, however we believe that they are efficient enough for the comparison purpose of this short post.
  • In all the elapsed time bar charts, lower is better.

Imports

Note that both tools are very easy to install:

$ pip install duckdb  
$ pip install tableauhyperapi  

Here are the imports:

import os
from time import perf_counter

import duckdb
import pandas as pd
from pandas.testing import assert_frame_equal
from tableauhyperapi import Connection, HyperProcess, Telemetry

pd.set_option("display.precision", 2)  # Pandas float number display
TELEMETRY = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU  # not sending telemetry data to Tableau
FS = (12, 6)  # figure size
ALPHA = 0.8  # figure transparency

Package versions:

Python         : 3.10.6
duckdb         : 0.5.1
tableauhyperapi: 0.0.15530
pandas         : 1.5.0

System information:

OS             : Linux
Architecture   : 64bit
CPU            : Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
CPU cores      : 8
RAM            : 32GB

Apache Parquet files

names = ["NY", "BAY", "COL", "FLA", "NW", "NE", "CAL", "LKS", "E", "W", "CTR", "USA"]

stats = {}
parquet_graph_file_paths = {}
parquet_file_sizes = []
for name in names:
    parquet_graph_file_path = f"/home/francois/Data/Disk_1/DIMACS_road_networks/{name}/USA-road-t.{name}.gr.parquet"
    stats[name] = {}
    stats[name]["parquet_file_size_MB"] = (
        os.path.getsize(parquet_graph_file_path) * 1.0e-6
    )
    parquet_graph_file_paths[name] = parquet_graph_file_path

names_osmr = ["osm-bawu", "osm-ger", "osm-eur"]
names += names_osmr
for name in names_osmr:
    parquet_graph_file_path = (
        f"/home/francois/Data/Disk_1/OSMR/{name}/{name}.gr.parquet"
    )
    stats[name] = {}
    stats[name]["parquet_file_size_MB"] = (
        os.path.getsize(parquet_graph_file_path) * 1.0e-6
    )
    parquet_graph_file_paths[name] = parquet_graph_file_path
ordered_names = ["NY", "BAY", "COL", "FLA", "NW", "NE", "CAL", "osm-bawu", "LKS", "E", "W", "CTR", "osm-ger", "USA", "osm-eur"]
stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df.sort_values(by="parquet_file_size_MB", ascending=True, inplace=True)
stats_df
parquet_file_size_MB
NY 9.11
BAY 10.14
COL 13.32
FLA 31.86
NW 34.43
NE 46.36
CAL 56.40
LKS 82.57
E 105.59
osm-bawu 106.99
W 184.64
CTR 428.21
USA 702.06
osm-ger 730.79
osm-eur 6112.20

First query : parallel edges

In this first query, we want to check if there are parallel edges in the graph. This should not happen with the US networks, because we removed the parallel edges when we created the parquet files in a previous post. When we imported the Europe networks, we created the parquet files by chunks, so we can guarantee that there is no parallel edge within each chunk, but nor overall. Here is the query:

query_1 = """
SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END 
FROM (
    SELECT source, target, COUNT(*)
    FROM graph_edges
    GROUP BY source, target
    HAVING COUNT(*) > 1
)"""

We expect this query to return 0 for each graph.

DuckDB

res_duckdb = {}

for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]

    connection = duckdb.connect()

    # query
    start = perf_counter()
    query = query_1.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    duplicates = connection.query(query).fetchone()[0]
    elapsed_time_s = perf_counter() - start

    connection.close()

    res_duckdb[name] = {}
    res_duckdb[name]["duplicates"] = duplicates

    assert duplicates == 0

    stats[name]["query_1_DuckDB"] = elapsed_time_s
connection.close()
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:

        parquet_graph_file_path = parquet_graph_file_paths[name]

        with Connection(endpoint=hyper.endpoint) as connection:

            # query
            start = perf_counter()
            query = query_1.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            duplicates = connection.execute_scalar_query(query)
            elapsed_time_s = perf_counter() - start

        res_hyper[name] = {}
        res_hyper[name]["duplicates"] = duplicates

        assert duplicates == 0

        stats[name]["query_1_Hyper"] = elapsed_time_s
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_1")]
query_1_df = stats_df[cols]
ax = query_1_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_1", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_1

Results

There is no parallel edge in any of the networks.

Second query : vertex and edge counts

query_2 = "SELECT COUNT(*), MAX(source), MAX(target) FROM graph_edges"

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]

    connection = duckdb.connect()

    # query
    start = perf_counter()
    query = query_2.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    res = connection.query(query).fetchall()[0]
    elapsed_time_s = perf_counter() - start

    connection.close()

    edge_count = res[0]
    vertex_count = max(res[1:3]) + 1

    stats[name]["vertex_count"] = vertex_count
    stats[name]["edge_count"] = edge_count
    stats[name]["query_2_DuckDB"] = elapsed_time_s

    res_duckdb[name] = {}
    res_duckdb[name]["vertex_count"] = vertex_count
    res_duckdb[name]["edge_count"] = edge_count
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        parquet_graph_file_path = parquet_graph_file_paths[name]

        with Connection(endpoint=hyper.endpoint) as connection:

            # query
            start = perf_counter()
            query = query_2.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            res = connection.execute_list_query(query)[0]
            elapsed_time_s = perf_counter() - start

        edge_count = res[0]
        vertex_count = max(res[1:3]) + 1

        stats[name]["query_2_Hyper"] = elapsed_time_s

        res_hyper[name] = {}
        res_hyper[name]["vertex_count"] = vertex_count
        res_hyper[name]["edge_count"] = edge_count
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_2")]
query_2_df = stats_df[cols]
ax = query_2_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_2", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_2

Results

stats_df[["vertex_count", "edge_count"]]
vertex_count edge_count
NY 264346 730100
BAY 321270 794830
COL 435666 1042400
FLA 1070376 2687902
NW 1207945 2820774
NE 1524453 3868020
CAL 1890815 4630444
osm-bawu 3064263 6183798
LKS 2758119 6794808
E 3598623 8708058
W 6262104 15119284
CTR 14081816 33866826
osm-ger 20690320 41791542
USA 23947347 57708624
osm-eur 173789185 347997111

Third query : count of connected vertices

Some vertices are isolated in the graph : this means that their degree is 0. We want to count the number of connected vertices in the graph (not isolated).

query_3 = f"""
WITH edges AS (
    SELECT source, target 
    FROM graph_edges)
SELECT COUNT(*) 
FROM (
    SELECT source AS node 
    FROM edges     
        UNION     
    SELECT target AS node 
    FROM edges)"""

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]

    connection = duckdb.connect()

    # query
    start = perf_counter()
    query = query_3.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    connected_vertices = connection.query(query).fetchone()[0]
    elapsed_time_s = perf_counter() - start

    connection.close()

    stats[name]["connected_vertices"] = connected_vertices
    stats[name]["mean_degree"] = stats[name]["edge_count"] / connected_vertices
    stats[name]["query_3_DuckDB"] = elapsed_time_s

    res_duckdb[name] = {}
    res_duckdb[name]["connected_vertices"] = connected_vertices
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        parquet_graph_file_path = parquet_graph_file_paths[name]

        with Connection(endpoint=hyper.endpoint) as connection:

            # query
            start = perf_counter()
            query = query_3.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            connected_vertices = connection.execute_scalar_query(query)
            elapsed_time_s = perf_counter() - start

        stats[name]["query_3_Hyper"] = elapsed_time_s

        res_hyper[name] = {}
        res_hyper[name]["connected_vertices"] = connected_vertices
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_3")]
query_3_df = stats_df[cols]
ax = query_3_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_3", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_3

We observe that the elapsed time measures for the largest network differ a lot between DuckDB and Tableau Hyper:

Engine Elapsed time (s)
DuckDB 38.71
Tableau Hyper 357.47

Results

There is no isolated vertex in any of the network.

stats_df[["vertex_count", "connected_vertices", "mean_degree"]]
vertex_count connected_vertices mean_degree
NY 264346 264346 2.76
BAY 321270 321270 2.47
COL 435666 435666 2.39
FLA 1070376 1070376 2.51
NW 1207945 1207945 2.34
NE 1524453 1524453 2.54
CAL 1890815 1890815 2.45
osm-bawu 3064263 3064263 2.02
LKS 2758119 2758119 2.46
E 3598623 3598623 2.42
W 6262104 6262104 2.41
CTR 14081816 14081816 2.41
osm-ger 20690320 20690320 2.02
USA 23947347 23947347 2.41
osm-eur 173789185 173789185 2.00

Forth query : count of vertices with one incoming and one outgoing egde

Count of degree 2 nodes with in-degree=out-degree=1. In the following, we refer to these vertices as in-out vertices.

query_4 = """
    WITH TPARQUET AS (
        SELECT source, target 
        FROM graph_edges),
    TSOURCE AS (
        SELECT source AS node, COUNT(*) 
        FROM TPARQUET 
        GROUP BY source HAVING COUNT(*)=1),
    TTARGET AS (
        SELECT target AS node, COUNT(*) 
        FROM TPARQUET GROUP BY target HAVING COUNT(*)=1),
    TJOIN AS (
        SELECT s.node 
        FROM TSOURCE s 
        INNER JOIN TTARGET t ON s.node = t.node)
    SELECT COUNT(*) from TJOIN"""

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]

    connection = duckdb.connect()

    # query
    start = perf_counter()
    query = query_4.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    inout_vertices = connection.query(query).fetchone()[0]
    elapsed_time_s = perf_counter() - start

    connection.close()

    stats[name]["inout_vertices"] = inout_vertices
    stats[name]["query_4_DuckDB"] = elapsed_time_s

    res_duckdb[name] = {}
    res_duckdb[name]["inout_vertices"] = inout_vertices
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        parquet_graph_file_path = parquet_graph_file_paths[name]

        with Connection(endpoint=hyper.endpoint) as connection:

            # query
            start = perf_counter()
            query = query_4.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            inout_vertices = connection.execute_scalar_query(query)
            elapsed_time_s = perf_counter() - start

        stats[name]["query_4_Hyper"] = elapsed_time_s

        res_hyper[name] = {}
        res_hyper[name]["inout_vertices"] = inout_vertices
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_4")]
query_4_df = stats_df[cols]
ax = query_4_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_4", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_4

Results

stats_df["ratio"] = stats_df.inout_vertices / stats_df.vertex_count
stats_df[["inout_vertices", "vertex_count", "ratio"]]
inout_vertices vertex_count ratio
NY 41169 264346 0.16
BAY 73109 321270 0.23
COL 82537 435666 0.19
FLA 210849 1070376 0.20
NW 282638 1207945 0.23
NE 288695 1524453 0.19
CAL 373947 1890815 0.20
osm-bawu 398242 3064263 0.13
LKS 478263 2758119 0.17
E 764838 3598623 0.21
W 1209550 6262104 0.19
CTR 2787565 14081816 0.20
osm-ger 2874055 20690320 0.14
USA 4762005 23947347 0.20
osm-eur 20309942 173789185 0.12

Fifth query : degree distribution

query_5 = """
    CREATE TEMP TABLE t_edges AS 
        SELECT source, target 
        FROM graph_edges;
    CREATE TEMP TABLE t_nodes AS
        SELECT source AS node 
        FROM t_edges     
            UNION ALL
        SELECT target AS node 
        FROM t_edges;
    CREATE TEMP TABLE t_deg AS
        SELECT COUNT(*) AS deg
        FROM t_nodes
        GROUP BY node;
    SELECT deg, COUNT(*) AS n_occ 
    FROM t_deg
    GROUP BY deg
    ORDER BY deg ASC;"""

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]

    connection = duckdb.connect()
    
    # query
    start = perf_counter()
    query = query_5.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")

    queries = query.removesuffix(";").split(";")
    for sq in queries[:-1]:
        connection.execute(sq)
    sq = queries[-1]
    res = connection.query(sq).fetchall()

    elapsed_time_s = perf_counter() - start

    stats[name]["query_5_DuckDB"] = elapsed_time_s

    connection.close()

    res_duckdb[name] = {}
    for item in res:
        degree = item[0]
        vertex_count = item[1]
        res_duckdb[name]["degree_" + str(degree).zfill(3)] = vertex_count

res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")
res_duckdb_df = res_duckdb_df.sort_index(axis=1)
res_duckdb_df = res_duckdb_df.fillna(0).astype(int)

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        with Connection(endpoint=hyper.endpoint) as connection:

            parquet_graph_file_path = parquet_graph_file_paths[name]

            # query
            start = perf_counter()
            query = query_5.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )

            queries = query.removesuffix(";").split(";")
            for sq in queries[:-1]:
                connection.execute_command(sq)
            sq = queries[-1]
            res = connection.execute_list_query(sq)

            elapsed_time_s = perf_counter() - start

        for item in res:
            degree = item[0]
            vertex_count = item[1]
            stats[name]["degree_" + str(degree).zfill(3)] = vertex_count
        stats[name]["query_5_Hyper"] = elapsed_time_s

        res_hyper[name] = {}
        for item in res:
            degree = item[0]
            vertex_count = item[1]
            res_hyper[name]["degree_" + str(degree).zfill(3)] = vertex_count
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")
res_hyper_df = res_hyper_df.sort_index(axis=1)
res_hyper_df = res_hyper_df.fillna(0).astype(int)

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_5")]
query_5_df = stats_df[cols]
ax = query_5_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_5", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_5

Results

degree_cols = sorted([c for c in stats_df.columns if c.startswith("degree_")])
stats_df[degree_cols] = stats_df[degree_cols].fillna(0).astype(int)
degrees = 100 *  stats_df[degree_cols].div(stats_df["vertex_count"], axis=0)
cols = degrees.columns
degrees.columns = [int(c.split('_')[-1]) for c in cols]
tot = degrees.sum(axis=0)
degrees = degrees[list(tot[tot > 0.01].index.values)]
degrees['total'] = degrees.sum(axis=1)
styler = degrees.style.background_gradient(axis=1, cmap='YlOrRd')
styler = styler.format(precision=2)
styler 
  2 3 4 5 6 7 8 10 12 total
NY 15.57 0.00 15.27 0.00 47.11 0.00 21.56 0.43 0.06 100.00
BAY 22.76 0.00 20.30 0.00 43.94 0.00 12.81 0.17 0.02 100.00
COL 18.95 0.00 33.71 0.00 36.59 0.00 10.66 0.09 0.01 100.00
FLA 19.70 0.00 22.73 0.00 44.49 0.00 12.94 0.13 0.01 100.00
NW 23.40 0.00 28.99 0.00 38.47 0.00 9.01 0.13 0.01 100.00
NE 18.94 0.00 21.90 0.00 45.95 0.00 12.96 0.23 0.03 100.00
CAL 19.78 0.00 27.53 0.00 40.89 0.00 11.66 0.14 0.01 100.00
osm-bawu 13.00 0.60 72.27 0.58 12.26 0.07 1.21 0.01 0.00 100.00
LKS 17.34 0.00 32.01 0.00 37.75 0.00 12.77 0.12 0.01 100.00
E 21.25 0.00 26.19 0.00 42.07 0.00 10.30 0.16 0.02 100.00
W 19.32 0.00 30.72 0.00 39.33 0.00 10.49 0.13 0.01 100.00
CTR 19.80 0.00 31.11 0.00 38.01 0.00 10.97 0.10 0.01 100.00
osm-ger 13.89 0.60 70.33 0.66 13.17 0.08 1.26 0.01 0.00 100.00
USA 19.89 0.00 30.27 0.00 38.97 0.00 10.75 0.12 0.01 100.00
osm-eur 11.69 0.84 76.03 0.50 9.80 0.05 1.09 0.01 0.00 100.00

Conclusion

Apache Parquet is a column-oriented data file format designed for efficient data storage and retrieval. It is widespread in the data analysis ecosystem. Querying them directly with an efficient SQL engine is really convenient. Both engines, DuckDB and Tableau Hyper are amazing tools, allowing to efficiently query Parquet files, among other capabilities. We only scratched the surface of this feature in this post, with a very specific use case. We observed similar timings for most of the queries. We did not measure memory usage. However, we observed that it is important to write SQL queries that are a little bit optimized regarding memory consumption, when dealing with large datasets and “large” queries. Also, it is advised to specify a temp directory to the engine, so that it can write some temporary data there (temp_directory setting with DuckDB).