March 27, 202610 min read

SQLAlchemy: Python's Database Toolkit That Scales with You

Learn SQLAlchemy from scratch -- Core vs ORM, models, CRUD, relationships, queries, Alembic migrations, and async patterns for production Python apps.

sqlalchemy python orm database tutorial
Ad 336x280

Most Python developers start with raw SQL strings. You write cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)), it works, and life is good. Then the project grows. You need joins across five tables, migrations when schemas change, and suddenly those string queries are a maintenance nightmare.

SQLAlchemy fixes this. It's not just an ORM -- it's a complete database toolkit that gives you exactly as much abstraction as you want. You can write raw SQL through it, use the expression language for composable queries, or go full ORM with mapped classes. Most serious Python projects land on SQLAlchemy eventually, and for good reason.

Core vs ORM: Two Libraries in One

SQLAlchemy has two distinct layers, and understanding this split saves you a lot of confusion:

SQLAlchemy Core is a SQL expression language. You write Python code that generates SQL. No classes mapping to tables, no identity maps, no sessions. Just a Pythonic way to construct and execute SQL statements. SQLAlchemy ORM builds on top of Core. It maps Python classes to database tables, tracks changes to objects, handles relationships, and manages a "unit of work" that batches your changes into efficient SQL.

You don't have to pick one. Many projects use the ORM for most operations and drop down to Core for complex reporting queries or bulk operations where ORM overhead isn't worth it.

Setting Up: Engine and Connection

Everything starts with an Engine. The engine manages the connection pool and serves as the source for all database interactions.

from sqlalchemy import create_engine

# SQLite (great for development)
engine = create_engine("sqlite:///app.db", echo=True)

# PostgreSQL
engine = create_engine(
    "postgresql://user:password@localhost:5432/mydb",
    pool_size=10,
    max_overflow=20,
)

# MySQL
engine = create_engine("mysql+pymysql://user:password@localhost/mydb")

The echo=True flag logs all generated SQL to stdout -- invaluable during development, terrible in production. The connection URL follows the format dialect+driver://user:password@host:port/database.

Notice you never open or close connections manually. The engine handles connection pooling automatically. When a query finishes, the connection goes back to the pool instead of being destroyed.

Defining Models with the ORM

The modern way to define models uses the DeclarativeBase and Mapped types (SQLAlchemy 2.0 style):

from datetime import datetime
from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    relationship,
)

class Base(DeclarativeBase):
pass

class User(Base):
__tablename__ = "users"

id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(120), unique=True)
bio: Mapped[str | None] = mapped_column(Text, default=None)
created_at: Mapped[datetime] = mapped_column(
server_default=func.now()
)

# Relationship to posts posts: Mapped[list["Post"]] = relationship(back_populates="author")

def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}')>"

class Post(Base):
__tablename__ = "posts"

id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
body: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

author: Mapped["User"] = relationship(back_populates="posts")

def __repr__(self) -> str:
return f"<Post(id={self.id}, title='{self.title}')>"

A few things to notice:

  • Mapped[str] means the column is NOT NULL. Mapped[str | None] makes it nullable.
  • mapped_column() replaces the old Column() syntax. It integrates with type checkers.
  • relationship() defines the Python-side link between objects. back_populates keeps both sides in sync.
  • ForeignKey("users.id") is the actual database constraint.
Create the tables:
Base.metadata.create_all(engine)

This issues CREATE TABLE IF NOT EXISTS for every model that extends Base. Fine for development. For production, you want Alembic (covered later).

Sessions and CRUD Operations

The Session is where all ORM magic happens. It tracks which objects you've added, modified, or deleted, and flushes those changes to the database when you commit.

from sqlalchemy.orm import Session

# Create a session
with Session(engine) as session:
    # CREATE
    user = User(username="alice", email="alice@example.com")
    session.add(user)
    session.commit()

# The user now has an id assigned by the database
    print(user.id)  # 1

Using with ensures the session closes properly even if an exception occurs. Here's the full CRUD:

with Session(engine) as session:
    # CREATE -- add multiple
    users = [
        User(username="bob", email="bob@example.com"),
        User(username="charlie", email="charlie@example.com"),
    ]
    session.add_all(users)
    session.commit()

# READ -- get by primary key
    user = session.get(User, 1)  # SELECT * FROM users WHERE id = 1
    print(user.username)  # "alice"

# UPDATE
    user.email = "newalice@example.com"
    session.commit()  # SQLAlchemy tracks the change automatically

# DELETE
    session.delete(user)
    session.commit()

The session tracks changes through something called the "identity map." When you modify user.email, SQLAlchemy knows the object is dirty. On commit(), it generates the appropriate UPDATE statement. You never write the SQL yourself.

Querying and Filtering

SQLAlchemy 2.0 uses select() statements instead of the old session.query() pattern:

from sqlalchemy import select

with Session(engine) as session:
# Get all users
stmt = select(User)
users = session.scalars(stmt).all()

# Filter with where() stmt = select(User).where(User.username == "alice") alice = session.scalars(stmt).first() # Multiple conditions (AND) stmt = select(User).where( User.email.like("%@example.com"), User.created_at >= datetime(2026, 1, 1), ) results = session.scalars(stmt).all() # OR conditions from sqlalchemy import or_

stmt = select(User).where(
or_(User.username == "alice", User.username == "bob")
)

# Ordering and limiting stmt = ( select(User) .order_by(User.created_at.desc()) .limit(10) .offset(20) ) # Count from sqlalchemy import func

stmt = select(func.count()).select_from(User)
count = session.scalar(stmt)

The key difference from raw SQL: these queries are composable. You can build a base query and add filters conditionally:

def get_users(
    session: Session,
    username: str | None = None,
    email_domain: str | None = None,
    limit: int = 50,
) -> list[User]:
    stmt = select(User)

if username:
stmt = stmt.where(User.username.ilike(f"%{username}%"))
if email_domain:
stmt = stmt.where(User.email.like(f"%@{email_domain}"))

stmt = stmt.limit(limit)
return session.scalars(stmt).all()

Try doing that cleanly with string concatenation.

Relationships: One-to-Many and Many-to-Many

We already saw one-to-many (User has many Posts). Let's add many-to-many for tags:

from sqlalchemy import Table, Column, Integer, ForeignKey

# Association table -- no mapped class needed
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)

class Tag(Base):
__tablename__ = "tags"

id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)

posts: Mapped[list["Post"]] = relationship(
secondary=post_tags, back_populates="tags"
)

Then update the Post model:

class Post(Base):
    __tablename__ = "posts"

id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
body: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[list["Tag"]] = relationship(
secondary=post_tags, back_populates="posts"
)

Using it:

with Session(engine) as session:
    python_tag = Tag(name="python")
    tutorial_tag = Tag(name="tutorial")

post = Post(
title="SQLAlchemy Guide",
body="...",
author_id=1,
tags=[python_tag, tutorial_tag],
)
session.add(post)
session.commit()

# Access tags through the relationship for tag in post.tags: print(tag.name) # Find all posts with a specific tag stmt = select(Post).where(Post.tags.any(Tag.name == "python")) python_posts = session.scalars(stmt).all()

The secondary parameter tells SQLAlchemy about the association table. It handles the INSERT and DELETE on post_tags automatically when you modify the tags list.

Eager Loading: Avoiding the N+1 Problem

This is the most common performance trap with any ORM:

# BAD -- N+1 queries
with Session(engine) as session:
    users = session.scalars(select(User)).all()
    for user in users:
        print(user.posts)  # Each access fires a separate SELECT

If you have 100 users, that's 101 queries (1 for users + 100 for posts). Fix it with eager loading:

from sqlalchemy.orm import joinedload, selectinload

# joinedload -- single query with JOIN
stmt = select(User).options(joinedload(User.posts))
users = session.scalars(stmt).unique().all()

# selectinload -- two queries (usually better for collections)
stmt = select(User).options(selectinload(User.posts))
users = session.scalars(stmt).all()
joinedload does a SQL JOIN, which can produce duplicate rows for collections (hence .unique()). selectinload fires a second query with WHERE id IN (...), which is usually more efficient for one-to-many relationships.

Migrations with Alembic

create_all() is fine for prototyping, but it can't modify existing tables. Alembic is the migration tool built for SQLAlchemy.
pip install alembic
alembic init alembic

This creates an alembic/ directory and alembic.ini. Configure the database URL in alembic.ini:

sqlalchemy.url = postgresql://user:password@localhost/mydb

Point Alembic at your models in alembic/env.py:

from myapp.models import Base

target_metadata = Base.metadata

Now generate and run migrations:

# Auto-generate a migration by comparing models to the database
alembic revision --autogenerate -m "add bio column to users"

# Apply the migration
alembic upgrade head

# Rollback one step
alembic downgrade -1

# See current version
alembic current

The auto-generated migration looks like this:

def upgrade() -> None:
    op.add_column("users", sa.Column("bio", sa.Text(), nullable=True))

def downgrade() -> None:
op.drop_column("users", "bio")

Always review auto-generated migrations. Alembic can't detect everything (renamed columns look like a drop + add, for example).

Async SQLAlchemy

For async frameworks like FastAPI, SQLAlchemy supports async sessions:

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
)

# Note the async driver (asyncpg for PostgreSQL)
engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/mydb"
)
async_session = async_sessionmaker(engine, class_=AsyncSession)

async def get_user(user_id: int) -> User | None:
async with async_session() as session:
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()

async def create_user(username: str, email: str) -> User:
async with async_session() as session:
user = User(username=username, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user

The API is almost identical to sync SQLAlchemy. The main differences:

  • Use create_async_engine instead of create_engine
  • Use AsyncSession instead of Session
  • Await session.execute(), session.commit(), session.refresh()
  • Relationship lazy loading doesn't work in async (you must use eager loading)
That last point is important. In async mode, accessing user.posts without eager loading raises an error instead of silently firing a query. This is actually a good thing -- it forces you to be explicit about your data loading.

Common Patterns for Production

Session dependency injection (FastAPI example):
from fastapi import Depends

async def get_session():
async with async_session() as session:
yield session

@app.get("/users/{user_id}")
async def read_user(
user_id: int,
session: AsyncSession = Depends(get_session),
):
user = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=404)
return user

Soft deletes:
class SoftDeleteMixin:
    deleted_at: Mapped[datetime | None] = mapped_column(default=None)

@property
def is_deleted(self) -> bool:
return self.deleted_at is not None

class User(SoftDeleteMixin, Base):
__tablename__ = "users"
# ...

Timestamps mixin:
class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(
        server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        server_default=func.now(),
        onupdate=func.now(),
    )
Bulk operations (bypass ORM overhead):
from sqlalchemy import insert, update

# Bulk insert -- much faster than add() in a loop
with Session(engine) as session:
    session.execute(
        insert(User),
        [
            {"username": f"user_{i}", "email": f"user_{i}@example.com"}
            for i in range(10000)
        ],
    )
    session.commit()

# Bulk update
with Session(engine) as session:
    session.execute(
        update(User)
        .where(User.created_at < datetime(2025, 1, 1))
        .values(bio="Legacy user")
    )
    session.commit()

Common Mistakes

Forgetting to commit. The session batches changes. If you add() without commit(), nothing is saved. If an exception occurs, changes are rolled back when the session closes. Not handling IntegrityError. Unique constraints, foreign keys -- violations throw IntegrityError. Catch it:
from sqlalchemy.exc import IntegrityError

try:
session.add(User(username="alice", email="alice@example.com"))
session.commit()
except IntegrityError:
session.rollback()
# Handle duplicate

Using the session after an error without rollback. After an exception, the session is in a broken state. You must call session.rollback() before using it again. Lazy loading in async. As mentioned, it doesn't work. Use selectinload or joinedload in every async query that touches relationships. Not indexing foreign keys. SQLAlchemy doesn't create indexes on foreign key columns automatically (PostgreSQL does, SQLite doesn't, MySQL depends). Add index=True:
author_id: Mapped[int] = mapped_column(
    ForeignKey("users.id"), index=True
)

What's Next

SQLAlchemy has depth. Once you're comfortable with the basics, explore:

  • Hybrid properties for computed attributes that work both in Python and SQL
  • Events for hooking into the ORM lifecycle (before insert, after update, etc.)
  • Custom types for mapping database columns to complex Python objects
  • Partitioning strategies for multi-tenant applications
  • Connection pooling tuning for high-traffic services
The SQLAlchemy documentation is genuinely excellent -- one of the best in the Python ecosystem. Use it.

If you're building Python projects and want more practical guides like this, check out CodeUp for tutorials that skip the fluff and get to the code.

Ad 728x90