Database Scaling Strategies: From Read Replicas to Sharding
Database Scaling Strategies: From Read Replicas to Sharding#
Your database was fine at 1,000 users. At 100,000 it slows down. At 1,000,000 it falls over. Every growing system hits this wall — and the path you choose to scale your database defines your architecture for years.
This guide covers the full spectrum of database scaling strategies, from the simplest wins to distributed sharding.
Vertical vs Horizontal Scaling#
Vertical scaling (scaling up) means adding more CPU, RAM, or faster disks to a single server. It is simple but has a hard ceiling — you cannot buy an infinitely large machine.
Horizontal scaling (scaling out) means distributing data across multiple servers. It is harder to implement but offers near-linear capacity growth.
Vertical: [ Small DB ] → [ Bigger DB ] → [ Biggest DB ] → 💀 ceiling
Horizontal: [ DB ] → [ DB ][ DB ] → [ DB ][ DB ][ DB ] → ∞
Rule of thumb: exhaust vertical scaling and read replicas before introducing horizontal complexity.
Read Replicas#
The lowest-friction horizontal scaling technique. A read replica receives a continuous stream of changes from the primary and serves read-only queries.
┌─────────────┐
│ Primary │ ← all writes
└──────┬───────┘
│ replication
┌────────┼────────┐
▼ ▼ ▼
[ Replica ] [ Replica ] [ Replica ] ← reads
PostgreSQL Streaming Replication#
-- On primary: create replication user
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'secret';
-- On replica: base backup then start
pg_basebackup -h primary-host -D /var/lib/postgresql/data -U replicator -Fp -Xs -P
# replica postgresql.conf
primary_conninfo = 'host=primary-host user=replicator password=secret'
hot_standby = on
Replication lag is the tradeoff. Reads may be milliseconds behind writes. Route writes and consistency-critical reads to the primary; everything else to replicas.
Connection Pooling with PgBouncer#
Before adding servers, check if you are exhausting connections. PostgreSQL forks a process per connection — thousands of connections kill performance.
App (5000 conns) → PgBouncer (50 conns) → PostgreSQL
# pgbouncer.ini
[databases]
mydb = host=127.0.0.1 port=5432 dbname=mydb
[pgbouncer]
pool_mode = transaction
max_client_conn = 5000
default_pool_size = 50
Transaction-mode pooling lets thousands of application threads share a small pool. This alone can delay sharding by months.
Caching Layer with Redis#
A cache in front of your database absorbs the hottest reads.
App → Redis (cache hit?) → yes → return
→ no → PostgreSQL → store in Redis → return
import redis, json, psycopg2
r = redis.Redis()
def get_user(user_id: int):
cached = r.get(f"user:{user_id}")
if cached:
return json.loads(cached)
conn = psycopg2.connect(dsn)
cur = conn.cursor()
cur.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,))
row = cur.fetchone()
user = {"id": row[0], "name": row[1], "email": row[2]}
r.setex(f"user:{user_id}", 300, json.dumps(user)) # TTL 5 min
return user
Cache invalidation is the hard part. Use short TTLs or explicit invalidation on writes.
Database Partitioning#
Partitioning splits a single table into smaller physical pieces on the same server. It improves query performance and maintenance without the complexity of distributed systems.
-- Range partitioning in PostgreSQL
CREATE TABLE orders (
id BIGSERIAL,
created_at TIMESTAMPTZ NOT NULL,
total NUMERIC
) PARTITION BY RANGE (created_at);
CREATE TABLE orders_2025 PARTITION OF orders
FOR VALUES FROM ('2025-01-01') TO ('2026-01-01');
CREATE TABLE orders_2026 PARTITION OF orders
FOR VALUES FROM ('2026-01-01') TO ('2027-01-01');
PostgreSQL prunes partitions at query time — a query for WHERE created_at > '2026-06-01' only touches orders_2026.
Database Sharding#
When a single server cannot hold all your data or handle all your writes, you shard — split data across multiple independent database instances.
Sharding Strategies#
┌──────────────────────────────────────────────┐
│ Sharding Strategies │
├──────────────┬───────────────┬───────────────┤
│ Hash-based │ Range-based │ Geo-based │
│ user_id % N │ A-M → shard1 │ US → shard1 │
│ │ N-Z → shard2 │ EU → shard2 │
└──────────────┴───────────────┴───────────────┘
Hash sharding distributes evenly but makes range queries expensive. Range sharding keeps related data together but can create hot spots. Geo sharding keeps data close to users and satisfies data residency requirements.
Application-Level Shard Routing#
SHARD_COUNT = 4
def get_shard(user_id: int) -> str:
shard_index = user_id % SHARD_COUNT
return f"postgres://shard-{shard_index}.db.internal/mydb"
def get_user_orders(user_id: int):
dsn = get_shard(user_id)
conn = psycopg2.connect(dsn)
cur = conn.cursor()
cur.execute("SELECT * FROM orders WHERE user_id = %s", (user_id,))
return cur.fetchall()
When NOT to Shard#
Sharding adds enormous complexity: cross-shard joins, distributed transactions, rebalancing. Avoid it if you can solve the problem with:
- Better indexes and query optimization
- Connection pooling (PgBouncer)
- Caching (Redis)
- Read replicas
- Table partitioning
- Vertical scaling (bigger machine)
If none of these work and write throughput or storage is the bottleneck, sharding is justified.
Tools for Distributed Databases#
| Tool | What It Does | Best For |
|---|---|---|
| Citus | Sharding extension for PostgreSQL | Staying in the PostgreSQL ecosystem |
| Vitess | Sharding middleware (MySQL) | YouTube-scale MySQL workloads |
| CockroachDB | Distributed SQL, automatic sharding | Global, strongly-consistent workloads |
| TiDB | MySQL-compatible distributed DB | Hybrid OLTP/OLAP |
Citus turns standard PostgreSQL into a distributed database with minimal application changes:
-- Citus: distribute a table by tenant_id
SELECT create_distributed_table('orders', 'tenant_id');
-- Queries that include tenant_id route to a single shard
SELECT * FROM orders WHERE tenant_id = 42 AND created_at > '2026-01-01';
Migration Strategy: Zero-Downtime Sharding#
Moving from a single database to a sharded architecture without downtime:
Phase 1: Dual-write — write to old DB + new sharded DB
Phase 2: Backfill — copy historical data to shards
Phase 3: Verify — compare reads from both systems
Phase 4: Cut-over — read from sharded DB, stop writing to old DB
Phase 5: Cleanup — decommission old database
Each phase is independently reversible. Monitor error rates and latency at every step before advancing.
Scaling Decision Flowchart#
Is the DB slow?
├─ Read-heavy? → Add read replicas + Redis cache
├─ Connection-heavy? → Add PgBouncer
├─ Large table scans? → Partition tables
├─ Write-heavy on single table? → Shard by key
└─ Data too large for one node? → Shard or use distributed DB
Summary#
Start simple. Optimize queries, add connection pooling, introduce caching, scale reads with replicas, partition large tables. Only when single-node write capacity or storage is the bottleneck should you reach for sharding — and when you do, pick the strategy (hash, range, geo) that matches your access patterns.
Model your database architecture at codelit.io.
127 articles on system design at codelit.io/blog.
Try it on Codelit
Cost Estimator
See estimated AWS monthly costs for every component in your architecture
Related articles
Build this architecture
Generate an interactive architecture for Database Scaling Strategies in seconds.
Try it in Codelit →
Comments