Databases: Storing and Retrieving Data
Welcome to Databases! Think of databases as organized filing cabinets for your applicationβs data. They store information persistently and allow you to retrieve, update, and delete data efficiently.
SQLAlchemy Basics
SQLAlchemy is the most popular ORM (Object-Relational Mapping) for Python:
pip install flask-sqlalchemy
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=db.func.now())
def __repr__(self):
return f'<User {self.username}>'
# Create tables
with app.app_context():
db.create_all()
Database Models
Models represent your data structure:
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128))
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
posts = db.relationship('Post', backref='author', lazy=True)
comments = db.relationship('Comment', backref='user', lazy=True)
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
published = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
# Relationships
comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan')
def __repr__(self):
return f'<Post {self.title}>'
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), nullable=False)
def __repr__(self):
return f'<Comment {self.id}>'
CRUD Operations
Create (INSERT)
@app.route('/create_user')
def create_user():
# Create a new user
new_user = User(username='john_doe', email='john@example.com')
db.session.add(new_user)
db.session.commit()
return f'User {new_user.username} created with ID: {new_user.id}'
@app.route('/create_post/<int:user_id>')
def create_post(user_id):
user = User.query.get_or_404(user_id)
new_post = Post(
title='My First Post',
content='This is the content of my first post.',
author=user
)
db.session.add(new_post)
db.session.commit()
return f'Post "{new_post.title}" created!'
Read (SELECT)
@app.route('/users')
def get_users():
# Get all users
users = User.query.all()
# Get user by ID
user = User.query.get(1)
# Get user by username
user = User.query.filter_by(username='john_doe').first()
# Get users with conditions
active_users = User.query.filter_by(is_active=True).all()
# Order by creation date
recent_users = User.query.order_by(User.created_at.desc()).limit(10).all()
# Complex queries
posts_with_comments = Post.query.join(Comment).all()
return render_template('users.html', users=users)
@app.route('/search')
def search_posts():
query = request.args.get('q', '')
if query:
# Search in title and content
posts = Post.query.filter(
db.or_(
Post.title.contains(query),
Post.content.contains(query)
)
).all()
else:
posts = Post.query.all()
return render_template('posts.html', posts=posts)
Update (UPDATE)
@app.route('/update_user/<int:user_id>')
def update_user(user_id):
user = User.query.get_or_404(user_id)
# Update single field
user.email = 'new_email@example.com'
# Update multiple fields
user.username = 'jane_doe'
user.is_active = False
db.session.commit()
return f'User {user.username} updated!'
@app.route('/publish_post/<int:post_id>')
def publish_post(post_id):
post = Post.query.get_or_404(post_id)
if not post.published:
post.published = True
db.session.commit()
return f'Post "{post.title}" published!'
else:
return 'Post already published!'
Delete (DELETE)
@app.route('/delete_user/<int:user_id>')
def delete_user(user_id):
user = User.query.get_or_404(user_id)
# Delete user (this will also delete related posts and comments due to cascade)
db.session.delete(user)
db.session.commit()
return f'User {user.username} deleted!'
@app.route('/delete_comment/<int:comment_id>')
def delete_comment(comment_id):
comment = Comment.query.get_or_404(comment_id)
db.session.delete(comment)
db.session.commit()
return 'Comment deleted!'
Relationships and Joins
One-to-Many Relationships
class Author(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
# One author can have many books
books = db.relationship('Book', backref='author', lazy=True)
class Book(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey('author.id'), nullable=False)
# Usage
@app.route('/author_books/<int:author_id>')
def author_books(author_id):
author = Author.query.get_or_404(author_id)
# Access books through relationship
books = author.books
# Or query books by author
books = Book.query.filter_by(author_id=author_id).all()
return render_template('author_books.html', author=author, books=books)
Many-to-Many Relationships
# Association table for many-to-many relationship
student_course = db.Table('student_course',
db.Column('student_id', db.Integer, db.ForeignKey('student.id'), primary_key=True),
db.Column('course_id', db.Integer, db.ForeignKey('course.id'), primary_key=True)
)
class Student(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
# Many students can take many courses
courses = db.relationship('Course', secondary=student_course, backref='students')
class Course(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
code = db.Column(db.String(10), unique=True, nullable=False)
# Usage
@app.route('/enroll_student/<int:student_id>/<int:course_id>')
def enroll_student(student_id, course_id):
student = Student.query.get_or_404(student_id)
course = Course.query.get_or_404(course_id)
# Add course to student's courses
student.courses.append(course)
db.session.commit()
return f'{student.name} enrolled in {course.name}!'
@app.route('/student_courses/<int:student_id>')
def student_courses(student_id):
student = Student.query.get_or_404(student_id)
return render_template('student_courses.html', student=student)
Database Migrations
Use Flask-Migrate for schema changes:
pip install flask-migrate
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)
migrate = Migrate(app, db)
# Initialize migrations
# flask db init
# Create migration
# flask db migrate -m "Add new column"
# Apply migration
# flask db upgrade
# Rollback migration
# flask db downgrade
Database Configuration
Different database configurations:
# SQLite (development)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
# PostgreSQL
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://username:password@localhost/dbname'
# MySQL
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://username:password@localhost/dbname'
# Environment variable
import os
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')
Query Optimization
Eager Loading
# Lazy loading (N+1 queries problem)
users = User.query.all()
for user in users:
print(user.posts) # Separate query for each user
# Eager loading (single query with join)
users = User.query.options(db.joinedload(User.posts)).all()
for user in users:
print(user.posts) # No additional queries
Pagination
@app.route('/posts')
def posts():
page = request.args.get('page', 1, type=int)
per_page = 10
pagination = Post.query.paginate(page=page, per_page=per_page, error_out=False)
posts = pagination.items
return render_template('posts.html',
posts=posts,
pagination=pagination)
Indexing
class Post(db.Model):
# ... other columns ...
# Add index for frequently queried columns
title = db.Column(db.String(200), index=True)
created_at = db.Column(db.DateTime, index=True)
# Composite index
__table_args__ = (
db.Index('idx_post_user_date', 'user_id', 'created_at'),
)
Practical Examples
Example 1: Blog Application
from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
app.config['SECRET_KEY'] = 'your-secret-key'
db = SQLAlchemy(app)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f'<Post {self.title}>'
@app.route('/')
def index():
posts = Post.query.order_by(Post.created_at.desc()).all()
return render_template('index.html', posts=posts)
@app.route('/post/<int:post_id>')
def post(post_id):
post = Post.query.get_or_404(post_id)
return render_template('post.html', post=post)
@app.route('/create', methods=['GET', 'POST'])
def create_post():
if request.method == 'POST':
title = request.form.get('title')
content = request.form.get('content')
if title and content:
new_post = Post(title=title, content=content)
db.session.add(new_post)
db.session.commit()
flash('Post created successfully!', 'success')
return redirect(url_for('index'))
else:
flash('Title and content are required!', 'error')
return render_template('create.html')
@app.route('/edit/<int:post_id>', methods=['GET', 'POST'])
def edit_post(post_id):
post = Post.query.get_or_404(post_id)
if request.method == 'POST':
post.title = request.form.get('title')
post.content = request.form.get('content')
db.session.commit()
flash('Post updated successfully!', 'success')
return redirect(url_for('post', post_id=post.id))
return render_template('edit.html', post=post)
@app.route('/delete/<int:post_id>', methods=['POST'])
def delete_post(post_id):
post = Post.query.get_or_404(post_id)
db.session.delete(post)
db.session.commit()
flash('Post deleted successfully!', 'success')
return redirect(url_for('index'))
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
Example 2: E-commerce Product Catalog
class Category(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
products = db.relationship('Product', backref='category', lazy=True)
class Product(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False)
stock = db.Column(db.Integer, default=0)
category_id = db.Column(db.Integer, db.ForeignKey('category.id'), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
@app.route('/products')
def products():
category_id = request.args.get('category', type=int)
if category_id:
products = Product.query.filter_by(category_id=category_id).all()
else:
products = Product.query.all()
categories = Category.query.all()
return render_template('products.html',
products=products,
categories=categories,
selected_category=category_id)
@app.route('/product/<int:product_id>')
def product_detail(product_id):
product = Product.query.get_or_404(product_id)
return render_template('product_detail.html', product=product)
@app.route('/search')
def search():
query = request.args.get('q', '')
min_price = request.args.get('min_price', type=float)
max_price = request.args.get('max_price', type=float)
products_query = Product.query
if query:
products_query = products_query.filter(
db.or_(
Product.name.contains(query),
Product.description.contains(query)
)
)
if min_price is not None:
products_query = products_query.filter(Product.price >= min_price)
if max_price is not None:
products_query = products_query.filter(Product.price <= max_price)
products = products_query.all()
return render_template('search.html', products=products, query=query)
Example 3: User Management System
from werkzeug.security import generate_password_hash, check_password_hash
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
if User.query.filter_by(username=username).first():
flash('Username already exists!', 'error')
return redirect(url_for('register'))
if User.query.filter_by(email=email).first():
flash('Email already registered!', 'error')
return redirect(url_for('register'))
user = User(username=username, email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('Registration successful!', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
# In a real app, you'd use Flask-Login for session management
flash('Login successful!', 'success')
return redirect(url_for('dashboard'))
else:
flash('Invalid username or password!', 'error')
return render_template('login.html')
Best Practices
1. Use Transactions
# Good - atomic operations
try:
db.session.begin()
user = User(username='john', email='john@example.com')
db.session.add(user)
post = Post(title='Hello', content='World', user_id=user.id)
db.session.add(post)
db.session.commit()
except Exception as e:
db.session.rollback()
flash('Error creating user and post!', 'error')
2. Handle Database Errors
from sqlalchemy.exc import IntegrityError
try:
db.session.add(new_user)
db.session.commit()
except IntegrityError:
db.session.rollback()
flash('Username or email already exists!', 'error')
3. Use Connection Pooling
# For production
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_pre_ping': True,
'pool_recycle': 300,
}
4. Database Backup
import shutil
import os
from datetime import datetime
def backup_database():
db_path = 'app.db'
backup_dir = 'backups'
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = os.path.join(backup_dir, f'backup_{timestamp}.db')
shutil.copy2(db_path, backup_path)
return backup_path
5. Database Seeding
def seed_database():
# Create categories
categories = [
Category(name='Electronics'),
Category(name='Books'),
Category(name='Clothing')
]
for category in categories:
if not Category.query.filter_by(name=category.name).first():
db.session.add(category)
db.session.commit()
# Create sample products
products = [
Product(name='Laptop', price=999.99, category_id=1),
Product(name='Python Book', price=29.99, category_id=2),
Product(name='T-Shirt', price=19.99, category_id=3)
]
for product in products:
db.session.add(product)
db.session.commit()
Practice Exercises
Exercise 1: Task Management System
Create a task management application with:
- Users can create, edit, delete tasks
- Tasks have title, description, due date, priority
- Mark tasks as complete/incomplete
- Filter tasks by status and priority
- User authentication
Exercise 2: Library Management System
Build a library system with:
- Books with title, author, ISBN, publication year
- Borrowers (library members)
- Borrowing records with due dates
- Overdue book tracking
- Book search and reservation
- Fine calculation for overdue books
Exercise 3: Social Media Platform
Create a simple social media app with:
- User profiles with bio and profile picture
- Posts with text and images
- Like and comment functionality
- Follow/unfollow users
- News feed with posts from followed users
- Search users and posts
Exercise 4: Inventory Management
Build an inventory system with:
- Products with name, description, price, stock level
- Categories and subcategories
- Suppliers and purchase orders
- Stock tracking and low stock alerts
- Sales orders and inventory updates
- Reports on stock levels and sales
Summary
Databases store and manage your applicationβs data:
SQLAlchemy Setup:
from flask_sqlalchemy import SQLAlchemy
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)
class Model(db.Model):
id = db.Column(db.Integer, primary_key=True)
# ... columns ...
with app.app_context():
db.create_all()
CRUD Operations:
# Create
obj = Model(field=value)
db.session.add(obj)
db.session.commit()
# Read
objects = Model.query.all()
obj = Model.query.get(id)
# Update
obj.field = new_value
db.session.commit()
# Delete
db.session.delete(obj)
db.session.commit()
Relationships:
# One-to-Many
class Parent(db.Model):
children = db.relationship('Child', backref='parent')
class Child(db.Model):
parent_id = db.Column(db.Integer, db.ForeignKey('parent.id'))
# Many-to-Many
association_table = db.Table('association',
db.Column('left_id', db.ForeignKey('left.id')),
db.Column('right_id', db.ForeignKey('right.id'))
)
class Left(db.Model):
rights = db.relationship('Right', secondary=association_table)
Key Concepts:
- Database models and schemas
- CRUD operations
- Relationships and joins
- Query optimization
- Database migrations
- Error handling and transactions
Next: Authentication - securing your application with user login and sessions! π