PythonDuckDBPolarsStreamlitData ProductArchitecture

De Pandas au data product : construire un pipeline analytique moderne avec DuckDB et Streamlit

GP
Gaël Penessot
De Pandas au data product : construire un pipeline analytique moderne avec DuckDB et Streamlit

Vous avez un notebook Pandas qui tourne parfaitement sur votre machine.

Vous avez été voir les bonnes données, les bons calculs. Mais dès que vous essayez de le partager, de le planifier, ou de le faire tourner sur un fichier deux fois plus gros — ça casse.

Ce n'est pas un bug. C'est une limite architecturale.

Pandas est un outil d'exploration. Un data product est quelque chose d'autre : un pipeline reproductible, testable, déployable, qui tourne sans vous.

Dans cet article, je vous montre comment passer de l'un à l'autre avec le stack moderne : DuckDB + Polars + Streamlit.

Ce que fait Pandas vs ce que demande un data product

Voici le workflow typique avec Pandas :

import pandas as pd

df = pd.read_csv("ventes_2025.csv")           # 1. Charger tout en RAM
df['date'] = pd.to_datetime(df['date'])        # 2. Transformer
df_clean = df[df['ca'] > 0]                    # 3. Filtrer
result = df_clean.groupby('region')['ca'].sum() # 4. Agréger
result.to_csv("output.csv")                    # 5. Exporter

Ça fonctionne. Jusqu'à ce que :

  • Le fichier dépasse 2GB (MemoryError)
  • Le pipeline doit tourner chaque jour automatiquement
  • Une autre personne doit lire et modifier le code
  • On doit tester que les calculs sont corrects
  • On doit exposer les résultats à un non-développeur

Un data product résout ces cinq problèmes. Voyons comment.

Architecture d'un data product minimal

sources/          ← fichiers bruts (CSV, Parquet, API)
  ventes_2025.parquet
  clients.parquet

pipeline/
  01_ingest.sql   ← DuckDB : charger + valider
  02_transform.py ← Polars : transformer + enrichir
  03_mart.sql     ← DuckDB : agréger en tables analytiques

app/
  main.py         ← Streamlit : interface utilisateur

tests/
  test_pipeline.py ← vérifications automatiques

Chaque étape a une responsabilité unique. Chaque fichier est lisible seul. L'ensemble est testable et déployable.

Étape 1 : L'ingestion avec DuckDB

DuckDB remplace avantageusement pd.read_csv() pour l'ingestion : il lit directement les fichiers sans tout charger en RAM, valide les types, et supporte le SQL complet.

import duckdb

con = duckdb.connect('pipeline.duckdb')

# Charger le CSV et créer une vue — zéro copie en RAM
con.execute("""
    CREATE OR REPLACE VIEW ventes_raw AS
    SELECT *
    FROM read_csv(
        'sources/ventes_2025.csv',
        types={'date': 'DATE', 'ca': 'DOUBLE', 'region': 'VARCHAR'},
        nullstr='NA'
    )
""")

# Valider la qualité des données
quality_check = con.execute("""
    SELECT
        COUNT(*) as total_lignes,
        COUNT(*) FILTER (WHERE ca IS NULL) as ca_null,
        COUNT(*) FILTER (WHERE ca < 0)     as ca_negatif,
        MIN(date) as date_min,
        MAX(date) as date_max
    FROM ventes_raw
""").df()

print(quality_check)
# total_lignes  ca_null  ca_negatif  date_min    date_max
# 245 832       12       0           2025-01-01  2025-12-31

Si les checks échouent, arrêtez le pipeline ici — pas à l'étape 3 avec une erreur cryptique.

assert quality_check['ca_null'][0] < 100, "Trop de valeurs nulles en CA"
assert quality_check['ca_negatif'][0] == 0, "CA négatif détecté"

Convertir CSV → Parquet une seule fois

con.execute("""
    COPY (SELECT * FROM ventes_raw)
    TO 'sources/ventes_2025.parquet'
    (FORMAT PARQUET, COMPRESSION ZSTD)
""")

Un CSV de 800MB devient ~180MB en Parquet ZSTD, et les lectures ultérieures sont 5x plus rapides. Faites cette conversion une fois, utilisez Parquet pour tout le reste.

Étape 2 : La transformation avec Polars LazyFrame

Polars LazyFrame est l'outil idéal pour la transformation : il planifie les opérations sans les exécuter, applique le predicate pushdown (ne lit que les colonnes/lignes nécessaires), et parallélise automatiquement.

import polars as pl

# LazyFrame : rien n'est exécuté encore
lf = pl.scan_parquet("sources/ventes_2025.parquet")

# Définir les transformations (plan d'exécution)
transformed = (
    lf
    .filter(pl.col("ca") > 0)                               # filtrer les anomalies
    .with_columns([
        pl.col("date").dt.month().alias("mois"),
        pl.col("date").dt.year().alias("annee"),
        (pl.col("ca") * 1.2).alias("ca_ttc"),               # ajout TVA
        pl.col("region").str.to_uppercase().alias("region"), # normaliser
    ])
    .with_columns([
        pl.col("ca")
          .sum()
          .over("region")
          .alias("ca_total_region")                          # window function
    ])
    .with_columns([
        (pl.col("ca") / pl.col("ca_total_region") * 100)
          .alias("part_region_pct")
    ])
)

# Exécuter et persister
df = transformed.collect()
df.write_parquet("pipeline/ventes_enrichies.parquet")

print(f"Lignes transformées : {len(df):,}")
print(f"CA total : {df['ca'].sum():,.0f}€")

Le scan_parquet + LazyFrame permet à Polars de ne lire que les colonnes utilisées dans la chaîne — sur un fichier de 50 colonnes, si vous n'en utilisez que 8, vous ne lisez que 16% des données.

Étape 3 : Les tables analytiques avec DuckDB

Une fois les données enrichies, DuckDB crée les agrégats finaux qui alimenteront l'interface :

con = duckdb.connect('pipeline.duckdb')

# Table analytique : CA par région et mois
con.execute("""
    CREATE OR REPLACE TABLE mart_ca_region AS
    SELECT
        region,
        annee,
        mois,
        SUM(ca)      as ca_ht,
        SUM(ca_ttc)  as ca_ttc,
        COUNT(*)     as nb_ventes,
        AVG(ca)      as panier_moyen,
        SUM(ca) OVER (
            PARTITION BY region, annee
            ORDER BY mois
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) as ca_cumule_ytd
    FROM read_parquet('pipeline/ventes_enrichies.parquet')
    GROUP BY region, annee, mois
    ORDER BY region, annee, mois
""")

# Table analytique : top produits
con.execute("""
    CREATE OR REPLACE TABLE mart_top_produits AS
    WITH ranked AS (
        SELECT
            produit,
            SUM(ca) as ca_total,
            COUNT(*) as nb_ventes,
            RANK() OVER (ORDER BY SUM(ca) DESC) as rang
        FROM read_parquet('pipeline/ventes_enrichies.parquet')
        GROUP BY produit
    )
    SELECT * FROM ranked WHERE rang <= 20
""")

con.close()

Ces tables sont pré-calculées — l'interface Streamlit les lit en millisecondes, sans recalculer à chaque interaction utilisateur.

Étape 4 : L'interface Streamlit

L'interface ne fait qu'une chose : lire les tables analytiques et afficher. Zéro logique métier ici.

# app/main.py
import streamlit as st
import duckdb
import plotly.express as px

st.set_page_config(page_title="Dashboard Ventes 2025", layout="wide")

@st.cache_resource
def get_connection():
    return duckdb.connect('pipeline/pipeline.duckdb', read_only=True)

@st.cache_data(ttl=3600)
def load_ca_region():
    con = get_connection()
    return con.execute("SELECT * FROM mart_ca_region").df()

# Sidebar : filtres
st.sidebar.title("Filtres")
regions = st.sidebar.multiselect(
    "Régions",
    options=["IDF", "AURA", "PACA", "HDF", "BFC"],
    default=["IDF", "AURA"]
)

# Données filtrées
df = load_ca_region()
df_filtered = df[df['region'].isin(regions)] if regions else df

# KPIs
col1, col2, col3 = st.columns(3)
col1.metric("CA Total HT", f"{df_filtered['ca_ht'].sum():,.0f}€")
col2.metric("Nb Ventes", f"{df_filtered['nb_ventes'].sum():,}")
col3.metric("Panier Moyen", f"{df_filtered['ca_ht'].sum() / df_filtered['nb_ventes'].sum():,.0f}€")

# Graphe CA cumulé
fig = px.line(
    df_filtered,
    x="mois", y="ca_cumule_ytd", color="region",
    title="CA cumulé YTD par région",
    labels={"ca_cumule_ytd": "CA cumulé (€)", "mois": "Mois"}
)
st.plotly_chart(fig, use_container_width=True)

Notez @st.cache_resource pour la connexion DuckDB (un seul objet partagé) et @st.cache_data(ttl=3600) pour les données (rechargées toutes les heures). Sans ces deux décorateurs, chaque interaction utilisateur relit les fichiers — votre app serait inutilisable en production.

Étape 5 : Tests automatiques

Un data product sans tests n'est pas un data product. C'est un script.

# tests/test_pipeline.py
import duckdb
import polars as pl
import pytest

def test_ingestion_aucune_valeur_nulle():
    con = duckdb.connect('pipeline.duckdb')
    nulls = con.execute(
        "SELECT COUNT(*) FROM ventes_raw WHERE ca IS NULL"
    ).fetchone()[0]
    assert nulls == 0, f"{nulls} valeurs nulles en CA"

def test_transformation_ca_toujours_positif():
    df = pl.read_parquet("pipeline/ventes_enrichies.parquet")
    negatifs = df.filter(pl.col("ca") < 0).height
    assert negatifs == 0, f"{negatifs} lignes avec CA négatif"

def test_mart_ca_region_somme_coherente():
    con = duckdb.connect('pipeline.duckdb')
    ca_raw = con.execute("SELECT SUM(ca) FROM ventes_raw").fetchone()[0]
    ca_mart = con.execute("SELECT SUM(ca_ht) FROM mart_ca_region").fetchone()[0]
    # La somme doit être égale (à 1€ près pour les arrondis)
    assert abs(ca_raw - ca_mart) < 1, f"Somme incohérente : {ca_raw} vs {ca_mart}"

Ces 3 tests vérifient l'intégrité de bout en bout. Lancez-les à chaque exécution du pipeline :

pytest tests/ -v

Orchestrer le tout

Un Makefile simple suffit pour la plupart des projets :

pipeline:
    python pipeline/01_ingest.py
    python pipeline/02_transform.py
    python pipeline/03_mart.py
    pytest tests/ -v

app:
    streamlit run app/main.py

deploy:
    git push origin main  # déclenche le déploiement Streamlit Cloud

Pour une exécution planifiée (cron), ajoutez une GitHub Action :

# .github/workflows/daily-pipeline.yml
on:
  schedule:
    - cron: '0 6 * * *' # chaque jour à 6h UTC
jobs:
  run-pipeline:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install duckdb polars
      - run: make pipeline

Comparaison avant / après

Critère Notebook Pandas Data Product DuckDB+Polars+Streamlit
Fichiers > 2GB MemoryError Sans problème (out-of-core)
Reproductibilité "ça marchait hier" Tests automatiques
Partage "ouvre le notebook" URL Streamlit
Planification lancement manuel GitHub Actions cron
Maintenabilité spaghetti cells modules séparés
Temps exécution 45s (2.4M lignes) 8s (même dataset)

Ce que vous apprenez dans nos formations

Ce pipeline utilise exactement les outils enseignés dans SQL Mastery et Streamlit Unleashed :

  • SQL Mastery couvre DuckDB, les window functions (SUM() OVER, RANK(), LAG()), les CTEs, et la gestion des fichiers CSV/Parquet en SQL pur.

  • Streamlit Unleashed couvre st.cache_resource, st.cache_data, l'intégration DuckDB dans Streamlit, le déploiement Streamlit Cloud et Railway, et les patterns d'architecture pour les apps data en production.

Les deux formations sont complémentaires : SQL Mastery pour la couche analytique, Streamlit Unleashed pour la couche interface.

Livre Business Intelligence avec Python

Approfondir avec mon livre

"Business Intelligence avec Python" - Le guide complet pour maîtriser l'analyse de données

Voir sur Amazon →

Formation recommandée

Streamlit Unleashed

Du prototype à la production : authentification, design system, performance. La formation complète pour construire des apps Streamlit professionnelles.

Voir la formation →

Ne manque rien de l'actualité data

Rejoins +1000 professionnels qui reçoivent chaque semaine mes analyses, conseils et découvertes data.

S'abonner gratuitement
Prochaine révision : Trimestre prochain