De Pandas au data product : construire un pipeline analytique moderne avec DuckDB et Streamlit

Vous avez un notebook Pandas qui tourne parfaitement sur votre machine.
Vous avez été voir les bonnes données, les bons calculs. Mais dès que vous essayez de le partager, de le planifier, ou de le faire tourner sur un fichier deux fois plus gros — ça casse.
Ce n'est pas un bug. C'est une limite architecturale.
Pandas est un outil d'exploration. Un data product est quelque chose d'autre : un pipeline reproductible, testable, déployable, qui tourne sans vous.
Dans cet article, je vous montre comment passer de l'un à l'autre avec le stack moderne : DuckDB + Polars + Streamlit.
Ce que fait Pandas vs ce que demande un data product
Voici le workflow typique avec Pandas :
import pandas as pd
df = pd.read_csv("ventes_2025.csv") # 1. Charger tout en RAM
df['date'] = pd.to_datetime(df['date']) # 2. Transformer
df_clean = df[df['ca'] > 0] # 3. Filtrer
result = df_clean.groupby('region')['ca'].sum() # 4. Agréger
result.to_csv("output.csv") # 5. Exporter
Ça fonctionne. Jusqu'à ce que :
- Le fichier dépasse 2GB (MemoryError)
- Le pipeline doit tourner chaque jour automatiquement
- Une autre personne doit lire et modifier le code
- On doit tester que les calculs sont corrects
- On doit exposer les résultats à un non-développeur
Un data product résout ces cinq problèmes. Voyons comment.
Architecture d'un data product minimal
sources/ ← fichiers bruts (CSV, Parquet, API)
ventes_2025.parquet
clients.parquet
pipeline/
01_ingest.sql ← DuckDB : charger + valider
02_transform.py ← Polars : transformer + enrichir
03_mart.sql ← DuckDB : agréger en tables analytiques
app/
main.py ← Streamlit : interface utilisateur
tests/
test_pipeline.py ← vérifications automatiques
Chaque étape a une responsabilité unique. Chaque fichier est lisible seul. L'ensemble est testable et déployable.
Étape 1 : L'ingestion avec DuckDB
DuckDB remplace avantageusement pd.read_csv() pour l'ingestion : il lit directement les fichiers sans tout charger en RAM, valide les types, et supporte le SQL complet.
import duckdb
con = duckdb.connect('pipeline.duckdb')
# Charger le CSV et créer une vue — zéro copie en RAM
con.execute("""
CREATE OR REPLACE VIEW ventes_raw AS
SELECT *
FROM read_csv(
'sources/ventes_2025.csv',
types={'date': 'DATE', 'ca': 'DOUBLE', 'region': 'VARCHAR'},
nullstr='NA'
)
""")
# Valider la qualité des données
quality_check = con.execute("""
SELECT
COUNT(*) as total_lignes,
COUNT(*) FILTER (WHERE ca IS NULL) as ca_null,
COUNT(*) FILTER (WHERE ca < 0) as ca_negatif,
MIN(date) as date_min,
MAX(date) as date_max
FROM ventes_raw
""").df()
print(quality_check)
# total_lignes ca_null ca_negatif date_min date_max
# 245 832 12 0 2025-01-01 2025-12-31
Si les checks échouent, arrêtez le pipeline ici — pas à l'étape 3 avec une erreur cryptique.
assert quality_check['ca_null'][0] < 100, "Trop de valeurs nulles en CA"
assert quality_check['ca_negatif'][0] == 0, "CA négatif détecté"
Convertir CSV → Parquet une seule fois
con.execute("""
COPY (SELECT * FROM ventes_raw)
TO 'sources/ventes_2025.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD)
""")
Un CSV de 800MB devient ~180MB en Parquet ZSTD, et les lectures ultérieures sont 5x plus rapides. Faites cette conversion une fois, utilisez Parquet pour tout le reste.
Étape 2 : La transformation avec Polars LazyFrame
Polars LazyFrame est l'outil idéal pour la transformation : il planifie les opérations sans les exécuter, applique le predicate pushdown (ne lit que les colonnes/lignes nécessaires), et parallélise automatiquement.
import polars as pl
# LazyFrame : rien n'est exécuté encore
lf = pl.scan_parquet("sources/ventes_2025.parquet")
# Définir les transformations (plan d'exécution)
transformed = (
lf
.filter(pl.col("ca") > 0) # filtrer les anomalies
.with_columns([
pl.col("date").dt.month().alias("mois"),
pl.col("date").dt.year().alias("annee"),
(pl.col("ca") * 1.2).alias("ca_ttc"), # ajout TVA
pl.col("region").str.to_uppercase().alias("region"), # normaliser
])
.with_columns([
pl.col("ca")
.sum()
.over("region")
.alias("ca_total_region") # window function
])
.with_columns([
(pl.col("ca") / pl.col("ca_total_region") * 100)
.alias("part_region_pct")
])
)
# Exécuter et persister
df = transformed.collect()
df.write_parquet("pipeline/ventes_enrichies.parquet")
print(f"Lignes transformées : {len(df):,}")
print(f"CA total : {df['ca'].sum():,.0f}€")
Le scan_parquet + LazyFrame permet à Polars de ne lire que les colonnes utilisées dans la chaîne — sur un fichier de 50 colonnes, si vous n'en utilisez que 8, vous ne lisez que 16% des données.
Étape 3 : Les tables analytiques avec DuckDB
Une fois les données enrichies, DuckDB crée les agrégats finaux qui alimenteront l'interface :
con = duckdb.connect('pipeline.duckdb')
# Table analytique : CA par région et mois
con.execute("""
CREATE OR REPLACE TABLE mart_ca_region AS
SELECT
region,
annee,
mois,
SUM(ca) as ca_ht,
SUM(ca_ttc) as ca_ttc,
COUNT(*) as nb_ventes,
AVG(ca) as panier_moyen,
SUM(ca) OVER (
PARTITION BY region, annee
ORDER BY mois
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as ca_cumule_ytd
FROM read_parquet('pipeline/ventes_enrichies.parquet')
GROUP BY region, annee, mois
ORDER BY region, annee, mois
""")
# Table analytique : top produits
con.execute("""
CREATE OR REPLACE TABLE mart_top_produits AS
WITH ranked AS (
SELECT
produit,
SUM(ca) as ca_total,
COUNT(*) as nb_ventes,
RANK() OVER (ORDER BY SUM(ca) DESC) as rang
FROM read_parquet('pipeline/ventes_enrichies.parquet')
GROUP BY produit
)
SELECT * FROM ranked WHERE rang <= 20
""")
con.close()
Ces tables sont pré-calculées — l'interface Streamlit les lit en millisecondes, sans recalculer à chaque interaction utilisateur.
Étape 4 : L'interface Streamlit
L'interface ne fait qu'une chose : lire les tables analytiques et afficher. Zéro logique métier ici.
# app/main.py
import streamlit as st
import duckdb
import plotly.express as px
st.set_page_config(page_title="Dashboard Ventes 2025", layout="wide")
@st.cache_resource
def get_connection():
return duckdb.connect('pipeline/pipeline.duckdb', read_only=True)
@st.cache_data(ttl=3600)
def load_ca_region():
con = get_connection()
return con.execute("SELECT * FROM mart_ca_region").df()
# Sidebar : filtres
st.sidebar.title("Filtres")
regions = st.sidebar.multiselect(
"Régions",
options=["IDF", "AURA", "PACA", "HDF", "BFC"],
default=["IDF", "AURA"]
)
# Données filtrées
df = load_ca_region()
df_filtered = df[df['region'].isin(regions)] if regions else df
# KPIs
col1, col2, col3 = st.columns(3)
col1.metric("CA Total HT", f"{df_filtered['ca_ht'].sum():,.0f}€")
col2.metric("Nb Ventes", f"{df_filtered['nb_ventes'].sum():,}")
col3.metric("Panier Moyen", f"{df_filtered['ca_ht'].sum() / df_filtered['nb_ventes'].sum():,.0f}€")
# Graphe CA cumulé
fig = px.line(
df_filtered,
x="mois", y="ca_cumule_ytd", color="region",
title="CA cumulé YTD par région",
labels={"ca_cumule_ytd": "CA cumulé (€)", "mois": "Mois"}
)
st.plotly_chart(fig, use_container_width=True)
Notez @st.cache_resource pour la connexion DuckDB (un seul objet partagé) et @st.cache_data(ttl=3600) pour les données (rechargées toutes les heures). Sans ces deux décorateurs, chaque interaction utilisateur relit les fichiers — votre app serait inutilisable en production.
Étape 5 : Tests automatiques
Un data product sans tests n'est pas un data product. C'est un script.
# tests/test_pipeline.py
import duckdb
import polars as pl
import pytest
def test_ingestion_aucune_valeur_nulle():
con = duckdb.connect('pipeline.duckdb')
nulls = con.execute(
"SELECT COUNT(*) FROM ventes_raw WHERE ca IS NULL"
).fetchone()[0]
assert nulls == 0, f"{nulls} valeurs nulles en CA"
def test_transformation_ca_toujours_positif():
df = pl.read_parquet("pipeline/ventes_enrichies.parquet")
negatifs = df.filter(pl.col("ca") < 0).height
assert negatifs == 0, f"{negatifs} lignes avec CA négatif"
def test_mart_ca_region_somme_coherente():
con = duckdb.connect('pipeline.duckdb')
ca_raw = con.execute("SELECT SUM(ca) FROM ventes_raw").fetchone()[0]
ca_mart = con.execute("SELECT SUM(ca_ht) FROM mart_ca_region").fetchone()[0]
# La somme doit être égale (à 1€ près pour les arrondis)
assert abs(ca_raw - ca_mart) < 1, f"Somme incohérente : {ca_raw} vs {ca_mart}"
Ces 3 tests vérifient l'intégrité de bout en bout. Lancez-les à chaque exécution du pipeline :
pytest tests/ -v
Orchestrer le tout
Un Makefile simple suffit pour la plupart des projets :
pipeline:
python pipeline/01_ingest.py
python pipeline/02_transform.py
python pipeline/03_mart.py
pytest tests/ -v
app:
streamlit run app/main.py
deploy:
git push origin main # déclenche le déploiement Streamlit Cloud
Pour une exécution planifiée (cron), ajoutez une GitHub Action :
# .github/workflows/daily-pipeline.yml
on:
schedule:
- cron: '0 6 * * *' # chaque jour à 6h UTC
jobs:
run-pipeline:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install duckdb polars
- run: make pipeline
Comparaison avant / après
| Critère | Notebook Pandas | Data Product DuckDB+Polars+Streamlit |
|---|---|---|
| Fichiers > 2GB | MemoryError | Sans problème (out-of-core) |
| Reproductibilité | "ça marchait hier" | Tests automatiques |
| Partage | "ouvre le notebook" | URL Streamlit |
| Planification | lancement manuel | GitHub Actions cron |
| Maintenabilité | spaghetti cells | modules séparés |
| Temps exécution | 45s (2.4M lignes) | 8s (même dataset) |
Ce que vous apprenez dans nos formations
Ce pipeline utilise exactement les outils enseignés dans SQL Mastery et Streamlit Unleashed :
-
SQL Mastery couvre DuckDB, les window functions (
SUM() OVER,RANK(),LAG()), les CTEs, et la gestion des fichiers CSV/Parquet en SQL pur. -
Streamlit Unleashed couvre
st.cache_resource,st.cache_data, l'intégration DuckDB dans Streamlit, le déploiement Streamlit Cloud et Railway, et les patterns d'architecture pour les apps data en production.
Les deux formations sont complémentaires : SQL Mastery pour la couche analytique, Streamlit Unleashed pour la couche interface.

Approfondir avec mon livre
"Business Intelligence avec Python" - Le guide complet pour maîtriser l'analyse de données
Voir sur Amazon →