Daily Tech Brief

Top startup stories in your inbox

Subscribe Free

Β© 2026 rakrisi Daily

Databases - Storing and Retrieving Data

Databases: Storing and Retrieving Data

Welcome to Databases! Think of databases as organized filing cabinets for your application’s data. They store information persistently and allow you to retrieve, update, and delete data efficiently.

SQLAlchemy Basics

SQLAlchemy is the most popular ORM (Object-Relational Mapping) for Python:

pip install flask-sqlalchemy
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    created_at = db.Column(db.DateTime, default=db.func.now())

    def __repr__(self):
        return f'<User {self.username}>'

# Create tables
with app.app_context():
    db.create_all()

Database Models

Models represent your data structure:

from datetime import datetime
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(128))
    is_active = db.Column(db.Boolean, default=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    posts = db.relationship('Post', backref='author', lazy=True)
    comments = db.relationship('Comment', backref='user', lazy=True)
    
    def __repr__(self):
        return f'<User {self.username}>'

class Post(db.Model):
    __tablename__ = 'posts'
    
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    content = db.Column(db.Text, nullable=False)
    published = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Foreign keys
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    
    # Relationships
    comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan')
    
    def __repr__(self):
        return f'<Post {self.title}>'

class Comment(db.Model):
    __tablename__ = 'comments'
    
    id = db.Column(db.Integer, primary_key=True)
    content = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    # Foreign keys
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), nullable=False)
    
    def __repr__(self):
        return f'<Comment {self.id}>'

CRUD Operations

Create (INSERT)

@app.route('/create_user')
def create_user():
    # Create a new user
    new_user = User(username='john_doe', email='john@example.com')
    db.session.add(new_user)
    db.session.commit()
    
    return f'User {new_user.username} created with ID: {new_user.id}'

@app.route('/create_post/<int:user_id>')
def create_post(user_id):
    user = User.query.get_or_404(user_id)
    
    new_post = Post(
        title='My First Post',
        content='This is the content of my first post.',
        author=user
    )
    db.session.add(new_post)
    db.session.commit()
    
    return f'Post "{new_post.title}" created!'

Read (SELECT)

@app.route('/users')
def get_users():
    # Get all users
    users = User.query.all()
    
    # Get user by ID
    user = User.query.get(1)
    
    # Get user by username
    user = User.query.filter_by(username='john_doe').first()
    
    # Get users with conditions
    active_users = User.query.filter_by(is_active=True).all()
    
    # Order by creation date
    recent_users = User.query.order_by(User.created_at.desc()).limit(10).all()
    
    # Complex queries
    posts_with_comments = Post.query.join(Comment).all()
    
    return render_template('users.html', users=users)

@app.route('/search')
def search_posts():
    query = request.args.get('q', '')
    
    if query:
        # Search in title and content
        posts = Post.query.filter(
            db.or_(
                Post.title.contains(query),
                Post.content.contains(query)
            )
        ).all()
    else:
        posts = Post.query.all()
    
    return render_template('posts.html', posts=posts)

Update (UPDATE)

@app.route('/update_user/<int:user_id>')
def update_user(user_id):
    user = User.query.get_or_404(user_id)
    
    # Update single field
    user.email = 'new_email@example.com'
    
    # Update multiple fields
    user.username = 'jane_doe'
    user.is_active = False
    
    db.session.commit()
    
    return f'User {user.username} updated!'

@app.route('/publish_post/<int:post_id>')
def publish_post(post_id):
    post = Post.query.get_or_404(post_id)
    
    if not post.published:
        post.published = True
        db.session.commit()
        return f'Post "{post.title}" published!'
    else:
        return 'Post already published!'

Delete (DELETE)

@app.route('/delete_user/<int:user_id>')
def delete_user(user_id):
    user = User.query.get_or_404(user_id)
    
    # Delete user (this will also delete related posts and comments due to cascade)
    db.session.delete(user)
    db.session.commit()
    
    return f'User {user.username} deleted!'

@app.route('/delete_comment/<int:comment_id>')
def delete_comment(comment_id):
    comment = Comment.query.get_or_404(comment_id)
    
    db.session.delete(comment)
    db.session.commit()
    
    return 'Comment deleted!'

Relationships and Joins

One-to-Many Relationships

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    
    # One author can have many books
    books = db.relationship('Book', backref='author', lazy=True)

class Book(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    author_id = db.Column(db.Integer, db.ForeignKey('author.id'), nullable=False)

# Usage
@app.route('/author_books/<int:author_id>')
def author_books(author_id):
    author = Author.query.get_or_404(author_id)
    
    # Access books through relationship
    books = author.books
    
    # Or query books by author
    books = Book.query.filter_by(author_id=author_id).all()
    
    return render_template('author_books.html', author=author, books=books)

Many-to-Many Relationships

# Association table for many-to-many relationship
student_course = db.Table('student_course',
    db.Column('student_id', db.Integer, db.ForeignKey('student.id'), primary_key=True),
    db.Column('course_id', db.Integer, db.ForeignKey('course.id'), primary_key=True)
)

class Student(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    
    # Many students can take many courses
    courses = db.relationship('Course', secondary=student_course, backref='students')

class Course(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    code = db.Column(db.String(10), unique=True, nullable=False)

# Usage
@app.route('/enroll_student/<int:student_id>/<int:course_id>')
def enroll_student(student_id, course_id):
    student = Student.query.get_or_404(student_id)
    course = Course.query.get_or_404(course_id)
    
    # Add course to student's courses
    student.courses.append(course)
    db.session.commit()
    
    return f'{student.name} enrolled in {course.name}!'

@app.route('/student_courses/<int:student_id>')
def student_courses(student_id):
    student = Student.query.get_or_404(student_id)
    
    return render_template('student_courses.html', student=student)

Database Migrations

Use Flask-Migrate for schema changes:

pip install flask-migrate
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'

db = SQLAlchemy(app)
migrate = Migrate(app, db)

# Initialize migrations
# flask db init

# Create migration
# flask db migrate -m "Add new column"

# Apply migration
# flask db upgrade

# Rollback migration
# flask db downgrade

Database Configuration

Different database configurations:

# SQLite (development)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'

# PostgreSQL
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://username:password@localhost/dbname'

# MySQL
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://username:password@localhost/dbname'

# Environment variable
import os
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')

Query Optimization

Eager Loading

# Lazy loading (N+1 queries problem)
users = User.query.all()
for user in users:
    print(user.posts)  # Separate query for each user

# Eager loading (single query with join)
users = User.query.options(db.joinedload(User.posts)).all()
for user in users:
    print(user.posts)  # No additional queries

Pagination

@app.route('/posts')
def posts():
    page = request.args.get('page', 1, type=int)
    per_page = 10
    
    pagination = Post.query.paginate(page=page, per_page=per_page, error_out=False)
    posts = pagination.items
    
    return render_template('posts.html', 
                         posts=posts, 
                         pagination=pagination)

Indexing

class Post(db.Model):
    # ... other columns ...
    
    # Add index for frequently queried columns
    title = db.Column(db.String(200), index=True)
    created_at = db.Column(db.DateTime, index=True)
    
    # Composite index
    __table_args__ = (
        db.Index('idx_post_user_date', 'user_id', 'created_at'),
    )

Practical Examples

Example 1: Blog Application

from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
app.config['SECRET_KEY'] = 'your-secret-key'

db = SQLAlchemy(app)

class Post(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    content = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def __repr__(self):
        return f'<Post {self.title}>'

@app.route('/')
def index():
    posts = Post.query.order_by(Post.created_at.desc()).all()
    return render_template('index.html', posts=posts)

@app.route('/post/<int:post_id>')
def post(post_id):
    post = Post.query.get_or_404(post_id)
    return render_template('post.html', post=post)

@app.route('/create', methods=['GET', 'POST'])
def create_post():
    if request.method == 'POST':
        title = request.form.get('title')
        content = request.form.get('content')
        
        if title and content:
            new_post = Post(title=title, content=content)
            db.session.add(new_post)
            db.session.commit()
            
            flash('Post created successfully!', 'success')
            return redirect(url_for('index'))
        else:
            flash('Title and content are required!', 'error')
    
    return render_template('create.html')

@app.route('/edit/<int:post_id>', methods=['GET', 'POST'])
def edit_post(post_id):
    post = Post.query.get_or_404(post_id)
    
    if request.method == 'POST':
        post.title = request.form.get('title')
        post.content = request.form.get('content')
        db.session.commit()
        
        flash('Post updated successfully!', 'success')
        return redirect(url_for('post', post_id=post.id))
    
    return render_template('edit.html', post=post)

@app.route('/delete/<int:post_id>', methods=['POST'])
def delete_post(post_id):
    post = Post.query.get_or_404(post_id)
    db.session.delete(post)
    db.session.commit()
    
    flash('Post deleted successfully!', 'success')
    return redirect(url_for('index'))

if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run(debug=True)

Example 2: E-commerce Product Catalog

class Category(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    products = db.relationship('Product', backref='category', lazy=True)

class Product(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(200), nullable=False)
    description = db.Column(db.Text)
    price = db.Column(db.Float, nullable=False)
    stock = db.Column(db.Integer, default=0)
    category_id = db.Column(db.Integer, db.ForeignKey('category.id'), nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

@app.route('/products')
def products():
    category_id = request.args.get('category', type=int)
    
    if category_id:
        products = Product.query.filter_by(category_id=category_id).all()
    else:
        products = Product.query.all()
    
    categories = Category.query.all()
    
    return render_template('products.html', 
                         products=products, 
                         categories=categories, 
                         selected_category=category_id)

@app.route('/product/<int:product_id>')
def product_detail(product_id):
    product = Product.query.get_or_404(product_id)
    return render_template('product_detail.html', product=product)

@app.route('/search')
def search():
    query = request.args.get('q', '')
    min_price = request.args.get('min_price', type=float)
    max_price = request.args.get('max_price', type=float)
    
    products_query = Product.query
    
    if query:
        products_query = products_query.filter(
            db.or_(
                Product.name.contains(query),
                Product.description.contains(query)
            )
        )
    
    if min_price is not None:
        products_query = products_query.filter(Product.price >= min_price)
    
    if max_price is not None:
        products_query = products_query.filter(Product.price <= max_price)
    
    products = products_query.all()
    
    return render_template('search.html', products=products, query=query)

Example 3: User Management System

from werkzeug.security import generate_password_hash, check_password_hash

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(128), nullable=False)
    is_admin = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def set_password(self, password):
        self.password_hash = generate_password_hash(password)
    
    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form.get('username')
        email = request.form.get('email')
        password = request.form.get('password')
        
        if User.query.filter_by(username=username).first():
            flash('Username already exists!', 'error')
            return redirect(url_for('register'))
        
        if User.query.filter_by(email=email).first():
            flash('Email already registered!', 'error')
            return redirect(url_for('register'))
        
        user = User(username=username, email=email)
        user.set_password(password)
        
        db.session.add(user)
        db.session.commit()
        
        flash('Registration successful!', 'success')
        return redirect(url_for('login'))
    
    return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        
        user = User.query.filter_by(username=username).first()
        
        if user and user.check_password(password):
            # In a real app, you'd use Flask-Login for session management
            flash('Login successful!', 'success')
            return redirect(url_for('dashboard'))
        else:
            flash('Invalid username or password!', 'error')
    
    return render_template('login.html')

Best Practices

1. Use Transactions

# Good - atomic operations
try:
    db.session.begin()
    
    user = User(username='john', email='john@example.com')
    db.session.add(user)
    
    post = Post(title='Hello', content='World', user_id=user.id)
    db.session.add(post)
    
    db.session.commit()
except Exception as e:
    db.session.rollback()
    flash('Error creating user and post!', 'error')

2. Handle Database Errors

from sqlalchemy.exc import IntegrityError

try:
    db.session.add(new_user)
    db.session.commit()
except IntegrityError:
    db.session.rollback()
    flash('Username or email already exists!', 'error')

3. Use Connection Pooling

# For production
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'pool_pre_ping': True,
    'pool_recycle': 300,
}

4. Database Backup

import shutil
import os
from datetime import datetime

def backup_database():
    db_path = 'app.db'
    backup_dir = 'backups'
    
    if not os.path.exists(backup_dir):
        os.makedirs(backup_dir)
    
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_path = os.path.join(backup_dir, f'backup_{timestamp}.db')
    
    shutil.copy2(db_path, backup_path)
    return backup_path

5. Database Seeding

def seed_database():
    # Create categories
    categories = [
        Category(name='Electronics'),
        Category(name='Books'),
        Category(name='Clothing')
    ]
    
    for category in categories:
        if not Category.query.filter_by(name=category.name).first():
            db.session.add(category)
    
    db.session.commit()
    
    # Create sample products
    products = [
        Product(name='Laptop', price=999.99, category_id=1),
        Product(name='Python Book', price=29.99, category_id=2),
        Product(name='T-Shirt', price=19.99, category_id=3)
    ]
    
    for product in products:
        db.session.add(product)
    
    db.session.commit()

Practice Exercises

Exercise 1: Task Management System

Create a task management application with:

  • Users can create, edit, delete tasks
  • Tasks have title, description, due date, priority
  • Mark tasks as complete/incomplete
  • Filter tasks by status and priority
  • User authentication

Exercise 2: Library Management System

Build a library system with:

  • Books with title, author, ISBN, publication year
  • Borrowers (library members)
  • Borrowing records with due dates
  • Overdue book tracking
  • Book search and reservation
  • Fine calculation for overdue books

Exercise 3: Social Media Platform

Create a simple social media app with:

  • User profiles with bio and profile picture
  • Posts with text and images
  • Like and comment functionality
  • Follow/unfollow users
  • News feed with posts from followed users
  • Search users and posts

Exercise 4: Inventory Management

Build an inventory system with:

  • Products with name, description, price, stock level
  • Categories and subcategories
  • Suppliers and purchase orders
  • Stock tracking and low stock alerts
  • Sales orders and inventory updates
  • Reports on stock levels and sales

Summary

Databases store and manage your application’s data:

SQLAlchemy Setup:

from flask_sqlalchemy import SQLAlchemy

app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)

class Model(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    # ... columns ...

with app.app_context():
    db.create_all()

CRUD Operations:

# Create
obj = Model(field=value)
db.session.add(obj)
db.session.commit()

# Read
objects = Model.query.all()
obj = Model.query.get(id)

# Update
obj.field = new_value
db.session.commit()

# Delete
db.session.delete(obj)
db.session.commit()

Relationships:

# One-to-Many
class Parent(db.Model):
    children = db.relationship('Child', backref='parent')

class Child(db.Model):
    parent_id = db.Column(db.Integer, db.ForeignKey('parent.id'))

# Many-to-Many
association_table = db.Table('association', 
    db.Column('left_id', db.ForeignKey('left.id')),
    db.Column('right_id', db.ForeignKey('right.id'))
)

class Left(db.Model):
    rights = db.relationship('Right', secondary=association_table)

Key Concepts:

  • Database models and schemas
  • CRUD operations
  • Relationships and joins
  • Query optimization
  • Database migrations
  • Error handling and transactions

Next: Authentication - securing your application with user login and sessions! πŸ”