Build a “Chat with Your Data” App in Streamlit
streamlit
openai
postgreSQL
chat interface
One-page Streamlit app: ask in natural language → generate safe PostgreSQL → run → visualize with Plotly → summarize insights with OpenAI. Includes data context so the model knows your schema and rules.'
Build a “Chat with Your Data” App in Streamlit
What you’ll ship
One-page Streamlit app: ask in natural language → generate safe PostgreSQL → run → visualize with Plotly → summarize insights with OpenAI. Includes data context so the model knows your schema and rules.
Prereqs
- Python 3.11+
- Reachable PostgreSQL
- OpenAI API key
Pin versions in requirements.txt for reproducibility.
Install and scaffold
mkdir nlq-sql-app && cd nlq-sql-app
python -m venv .venv
# Windows: .\.venv\Scripts\Activate
# macOS/Linux:
source .venv/bin/activate
pip install --upgrade pip
pip install streamlit psycopg2-binary pandas numpy plotly openai python-dotenv httpx
Recommended repo layout:
nlq-sql-app/
app.py
data_context.py
viz.py
insights.py
.env
requirements.txt
Procfile
runtime.txt
Configure secrets
Use .env locally and st.secrets in hosted environments.
OPENAI_API_KEY=sk-...
DATABASE_URL=postgresql://user:password@host:port/dbname
Data Context: how the app knows your schema and rules
You will hydrate the prompt with real metadata and enforce guardrails before any SQL runs.
1) Introspect Postgres
Create data_context.py:
# data_context.py
import psycopg2, psycopg2.extras as extras
import json
from typing import Dict
ALLOWED_SCHEMAS = ["public"] # narrow to what you trust
READ_ONLY = True # enforce SELECT-only
def get_conn(db_url: str):
return psycopg2.connect(db_url)
def fetch_schema(db_url: str) -> Dict:
"""Collect tables, columns, types, PK/FK."""
q_cols = """
SELECT
c.table_schema, c.table_name, c.column_name, c.data_type
FROM information_schema.columns c
WHERE c.table_schema = ANY(%s)
ORDER BY c.table_schema, c.table_name, c.ordinal_position;
"""
q_pks = """
SELECT
tc.table_schema, tc.table_name, kc.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kc
ON kc.constraint_name = tc.constraint_name
AND kc.table_schema = tc.table_schema
AND kc.table_name = tc.table_name
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = ANY(%s);
"""
q_fks = """
SELECT
tc.table_schema, tc.table_name, kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = ANY(%s);
"""
meta = {"tables": {}, "pks": {}, "fks": []}
with get_conn(db_url) as conn:
with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
cur.execute(q_cols, (ALLOWED_SCHEMAS,))
for r in cur.fetchall():
key = f"{r['table_schema']}.{r['table_name']}"
meta["tables"].setdefault(key, []).append(
{"name": r["column_name"], "type": r["data_type"]}
)
cur.execute(q_pks, (ALLOWED_SCHEMAS,))
for r in cur.fetchall():
key = f"{r['table_schema']}.{r['table_name']}"
meta["pks"].setdefault(key, []).append(r["column_name"])
cur.execute(q_fks, (ALLOWED_SCHEMAS,))
meta["fks"] = cur.fetchall()
return meta
def build_schema_context(meta: Dict) -> str:
"""Serialize into compact JSON for the LLM."""
return json.dumps(meta, separators=(",", ":"))
FORBIDDEN = ["insert", "update", "delete", "drop", "alter", "truncate"]
def validate_sql(sql: str, meta: Dict) -> None:
s = sql.strip().lower()
if any(tok in s for tok in FORBIDDEN):
raise ValueError("Only read-only queries are allowed")
if not s.startswith("select"):
raise ValueError("Query must start with SELECT")
# Check that referenced tables exist in allow-list
allowed_tables = set(meta["tables"].keys())
allowed_tables |= {t.split(".")[1] for t in allowed_tables}
import re
for table in re.findall(r"(?:from|join)\s+([a-zA-Z0-9_.]+)", s):
if table not in allowed_tables:
raise ValueError(f"Table not allowed or unknown: {table}")
2) Prompt the model with context
# in app.py
from data_context import fetch_schema, build_schema_context, validate_sql
schema_meta = fetch_schema(DATABASE_URL)
schema_ctx = build_schema_context(schema_meta)
def generate_sql(question: str):
system = (
"You are a SQL generator for PostgreSQL. "
"Return ONLY a single SELECT statement. No comments. "
"Use only tables and columns present in schema_json. "
"Never guess column names. Prefer INNER JOIN on FK relationships. "
"Apply the same time window across all joined tables."
)
user = f"schema_json: {schema_ctx}\nquestion: {question}"
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{"role":"system","content":system},{"role":"user","content":user}],
temperature=0
)
sql = resp.choices[0].message.content.strip().strip("`")
validate_sql(sql, schema_meta)
return sql
Plotly: how charts are chosen and rendered
Create viz.py:
# viz.py
import pandas as pd
import plotly.express as px
def choose_chart(df: pd.DataFrame):
cols = df.columns.tolist()
lower = [c.lower() for c in cols]
if any("date" in c or "time" in c for c in lower):
date_col = next(c for c in cols if "date" in c.lower() or "time" in c.lower())
y = next((c for c in cols if pd.api.types.is_numeric_dtype(df[c])), None)
if y:
return px.line(df, x=date_col, y=y, title=f"{y} over time")
cat = [c for c in cols if pd.api.types.is_string_dtype(df[c])]
num = [c for c in cols if pd.api.types.is_numeric_dtype(df[c])]
if cat and num:
return px.bar(df, x=cat[0], y=num[0], title=f"{num[0]} by {cat[0]}")
if len(num) >= 2:
return px.scatter(df, x=num[0], y=num[1], trendline="ols", title=f"{num[1]} vs {num[0]}")
return None
Use in app.py:
from viz import choose_chart
# after df is shown
fig = choose_chart(df)
if fig:
st.plotly_chart(fig, use_container_width=True)
OpenAI: translate the data to insights
Create insights.py:
# insights.py
import pandas as pd
from openai import OpenAI
def dataframe_preview(df: pd.DataFrame, max_rows: int = 20, max_chars: int = 800) -> str:
csv = df.head(max_rows).to_csv(index=False)
return csv[:max_chars]
def generate_insights(client: OpenAI, df: pd.DataFrame, user_question: str, caveats: str = "") -> str:
preview = dataframe_preview(df)
prompt = f"""
You are a data analyst. Write a crisp, factual summary in bullet points.
Call out trends, outliers, and comparisons. Avoid speculation.
If the sample is partial, say so.
Question: {user_question}
CSV sample (may be truncated):
{preview}
{f'Caveats: {caveats}' if caveats else ''}
Return markdown only.
"""
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role":"user","content":prompt}],
temperature=0.2
)
return resp.choices[0].message.content.strip()
Use in app.py:
from insights import generate_insights
# after chart
st.markdown("### AI Insights")
st.markdown(generate_insights(client, df, question, caveats="Truncated preview used"))
Full flow
# app.py excerpt
question = st.text_input("Ask a question")
if st.button("Generate") and question:
try:
sql = generate_sql(question) # uses data context
st.code(sql, language="sql")
df = query_df(sql)
st.dataframe(df, use_container_width=True)
from viz import choose_chart
fig = choose_chart(df)
if fig: st.plotly_chart(fig, use_container_width=True)
from insights import generate_insights
st.markdown("### AI Insights")
st.markdown(generate_insights(client, df, question))
except Exception as e:
st.error(f"Failed: {e}")
Run locally
streamlit run app.py
Deploy options
Streamlit Community Cloud
- Push to GitHub.
- Configure secrets in the app’s settings.
Railway / Heroku
- Add
OPENAI_API_KEYandDATABASE_URLas variables. Procfile:web: streamlit run app.py --server.port ${PORT} --server.address 0.0.0.0
Hardening checklist
- Read-only DB user and schema allow-list.
- Validate generated SQL against your metadata.
- Enforce consistent time filters across joined tables.
- Log generated SQL and latency.
- Limit data sent to LLMs. Prefer previews and aggregates.