Build a “Chat with Your Data” App in Streamlit

    November 5, 2025
    10 min read
    63 views
    streamlit
    openai
    postgreSQL
    chat interface

    One-page Streamlit app: ask in natural language → generate safe PostgreSQL → run → visualize with Plotly → summarize insights with OpenAI. Includes data context so the model knows your schema and rules.'


    Build a “Chat with Your Data” App in Streamlit

    What you’ll ship

    One-page Streamlit app: ask in natural language → generate safe PostgreSQL → run → visualize with Plotly → summarize insights with OpenAI. Includes data context so the model knows your schema and rules.


    Prereqs

    • Python 3.11+
    • Reachable PostgreSQL
    • OpenAI API key

    Pin versions in requirements.txt for reproducibility.


    Install and scaffold

    mkdir nlq-sql-app && cd nlq-sql-app
    python -m venv .venv
    # Windows: .\.venv\Scripts\Activate
    # macOS/Linux:
    source .venv/bin/activate
    pip install --upgrade pip
    pip install streamlit psycopg2-binary pandas numpy plotly openai python-dotenv httpx
    

    Recommended repo layout:

    nlq-sql-app/
      app.py
      data_context.py
      viz.py
      insights.py
      .env
      requirements.txt
      Procfile
      runtime.txt
    

    Configure secrets

    Use .env locally and st.secrets in hosted environments.

    OPENAI_API_KEY=sk-...
    DATABASE_URL=postgresql://user:password@host:port/dbname
    

    Data Context: how the app knows your schema and rules

    You will hydrate the prompt with real metadata and enforce guardrails before any SQL runs.

    1) Introspect Postgres

    Create data_context.py:

    # data_context.py
    import psycopg2, psycopg2.extras as extras
    import json
    from typing import Dict
    
    ALLOWED_SCHEMAS = ["public"]          # narrow to what you trust
    READ_ONLY = True                        # enforce SELECT-only
    
    def get_conn(db_url: str):
        return psycopg2.connect(db_url)
    
    def fetch_schema(db_url: str) -> Dict:
        """Collect tables, columns, types, PK/FK."""
        q_cols = """
        SELECT
          c.table_schema, c.table_name, c.column_name, c.data_type
        FROM information_schema.columns c
        WHERE c.table_schema = ANY(%s)
        ORDER BY c.table_schema, c.table_name, c.ordinal_position;
        """
    
        q_pks = """
        SELECT
          tc.table_schema, tc.table_name, kc.column_name
        FROM information_schema.table_constraints tc
        JOIN information_schema.key_column_usage kc
          ON kc.constraint_name = tc.constraint_name
          AND kc.table_schema   = tc.table_schema
          AND kc.table_name     = tc.table_name
        WHERE tc.constraint_type = 'PRIMARY KEY'
          AND tc.table_schema = ANY(%s);
        """
    
        q_fks = """
        SELECT
          tc.table_schema, tc.table_name, kcu.column_name,
          ccu.table_schema AS foreign_table_schema,
          ccu.table_name   AS foreign_table_name,
          ccu.column_name  AS foreign_column_name
        FROM information_schema.table_constraints AS tc
        JOIN information_schema.key_column_usage AS kcu
          ON tc.constraint_name = kcu.constraint_name
          AND tc.table_schema    = kcu.table_schema
        JOIN information_schema.constraint_column_usage AS ccu
          ON ccu.constraint_name = tc.constraint_name
        WHERE tc.constraint_type = 'FOREIGN KEY'
          AND tc.table_schema = ANY(%s);
        """
    
        meta = {"tables": {}, "pks": {}, "fks": []}
        with get_conn(db_url) as conn:
            with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
                cur.execute(q_cols, (ALLOWED_SCHEMAS,))
                for r in cur.fetchall():
                    key = f"{r['table_schema']}.{r['table_name']}"
                    meta["tables"].setdefault(key, []).append(
                        {"name": r["column_name"], "type": r["data_type"]}
                    )
                cur.execute(q_pks, (ALLOWED_SCHEMAS,))
                for r in cur.fetchall():
                    key = f"{r['table_schema']}.{r['table_name']}"
                    meta["pks"].setdefault(key, []).append(r["column_name"])
                cur.execute(q_fks, (ALLOWED_SCHEMAS,))
                meta["fks"] = cur.fetchall()
    
        return meta
    
    def build_schema_context(meta: Dict) -> str:
        """Serialize into compact JSON for the LLM."""
        return json.dumps(meta, separators=(",", ":"))
    
    FORBIDDEN = ["insert", "update", "delete", "drop", "alter", "truncate"]
    def validate_sql(sql: str, meta: Dict) -> None:
        s = sql.strip().lower()
        if any(tok in s for tok in FORBIDDEN):
            raise ValueError("Only read-only queries are allowed")
        if not s.startswith("select"):
            raise ValueError("Query must start with SELECT")
        # Check that referenced tables exist in allow-list
        allowed_tables = set(meta["tables"].keys())
        allowed_tables |= {t.split(".")[1] for t in allowed_tables}
        import re
        for table in re.findall(r"(?:from|join)\s+([a-zA-Z0-9_.]+)", s):
            if table not in allowed_tables:
                raise ValueError(f"Table not allowed or unknown: {table}")
    

    2) Prompt the model with context

    # in app.py
    from data_context import fetch_schema, build_schema_context, validate_sql
    
    schema_meta = fetch_schema(DATABASE_URL)
    schema_ctx = build_schema_context(schema_meta)
    
    def generate_sql(question: str):
        system = (
            "You are a SQL generator for PostgreSQL. "
            "Return ONLY a single SELECT statement. No comments. "
            "Use only tables and columns present in schema_json. "
            "Never guess column names. Prefer INNER JOIN on FK relationships. "
            "Apply the same time window across all joined tables."
        )
        user = f"schema_json: {schema_ctx}\nquestion: {question}"
        resp = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role":"system","content":system},{"role":"user","content":user}],
            temperature=0
        )
        sql = resp.choices[0].message.content.strip().strip("`")
        validate_sql(sql, schema_meta)
        return sql
    

    Plotly: how charts are chosen and rendered

    Create viz.py:

    # viz.py
    import pandas as pd
    import plotly.express as px
    
    def choose_chart(df: pd.DataFrame):
        cols = df.columns.tolist()
        lower = [c.lower() for c in cols]
        if any("date" in c or "time" in c for c in lower):
            date_col = next(c for c in cols if "date" in c.lower() or "time" in c.lower())
            y = next((c for c in cols if pd.api.types.is_numeric_dtype(df[c])), None)
            if y:
                return px.line(df, x=date_col, y=y, title=f"{y} over time")
        cat = [c for c in cols if pd.api.types.is_string_dtype(df[c])]
        num = [c for c in cols if pd.api.types.is_numeric_dtype(df[c])]
        if cat and num:
            return px.bar(df, x=cat[0], y=num[0], title=f"{num[0]} by {cat[0]}")
        if len(num) >= 2:
            return px.scatter(df, x=num[0], y=num[1], trendline="ols", title=f"{num[1]} vs {num[0]}")
        return None
    

    Use in app.py:

    from viz import choose_chart
    # after df is shown
    fig = choose_chart(df)
    if fig:
        st.plotly_chart(fig, use_container_width=True)
    

    OpenAI: translate the data to insights

    Create insights.py:

    # insights.py
    import pandas as pd
    from openai import OpenAI
    
    def dataframe_preview(df: pd.DataFrame, max_rows: int = 20, max_chars: int = 800) -> str:
        csv = df.head(max_rows).to_csv(index=False)
        return csv[:max_chars]
    
    def generate_insights(client: OpenAI, df: pd.DataFrame, user_question: str, caveats: str = "") -> str:
        preview = dataframe_preview(df)
        prompt = f"""
    You are a data analyst. Write a crisp, factual summary in bullet points.
    Call out trends, outliers, and comparisons. Avoid speculation.
    If the sample is partial, say so.
    Question: {user_question}
    CSV sample (may be truncated):
    {preview}
    {f'Caveats: {caveats}' if caveats else ''}
    Return markdown only.
    """
        resp = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role":"user","content":prompt}],
            temperature=0.2
        )
        return resp.choices[0].message.content.strip()
    

    Use in app.py:

    from insights import generate_insights
    # after chart
    st.markdown("### AI Insights")
    st.markdown(generate_insights(client, df, question, caveats="Truncated preview used"))
    

    Full flow

    # app.py excerpt
    question = st.text_input("Ask a question")
    if st.button("Generate") and question:
        try:
            sql = generate_sql(question)   # uses data context
            st.code(sql, language="sql")
            df = query_df(sql)
            st.dataframe(df, use_container_width=True)
    
            from viz import choose_chart
            fig = choose_chart(df)
            if fig: st.plotly_chart(fig, use_container_width=True)
    
            from insights import generate_insights
            st.markdown("### AI Insights")
            st.markdown(generate_insights(client, df, question))
    
        except Exception as e:
            st.error(f"Failed: {e}")
    

    Run locally

    streamlit run app.py
    

    Deploy options

    Streamlit Community Cloud

    • Push to GitHub.
    • Configure secrets in the app’s settings.

    Railway / Heroku

    • Add OPENAI_API_KEY and DATABASE_URL as variables.
    • Procfile:
      web: streamlit run app.py --server.port ${PORT} --server.address 0.0.0.0
      

    Hardening checklist

    • Read-only DB user and schema allow-list.
    • Validate generated SQL against your metadata.
    • Enforce consistent time filters across joined tables.
    • Log generated SQL and latency.
    • Limit data sent to LLMs. Prefer previews and aggregates.