Affaan Mustafa b39d2244cf docs: salvage focused stale PR contributions
- add Vite and Redis pattern skills from closed stale PRs

- add frontend-slides support assets

- port skill-comply runner fixes and LLM prompt/provider regressions

- harden agent frontmatter validation and sync catalog counts
2026-05-11 05:31:12 -04:00

404 lines
12 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
name: redis-patterns
description: Redis data structure patterns, caching strategies, distributed locks, rate limiting, pub/sub, and connection management for production applications.
origin: ECC
---
# Redis Patterns
Quick reference for Redis best practices across common backend use cases.
## How It Works
Redis is an in-memory data structure store that supports strings, hashes, lists, sets, sorted sets, streams, and more. Individual Redis commands are atomic on a single instance; multi-step workflows require Lua scripts, MULTI/EXEC transactions, or explicit synchronization to stay atomic. Data is optionally persisted via RDB snapshots or AOF logs. Clients communicate over TCP using the RESP protocol; connection pools are essential to avoid per-request handshake overhead.
## When to Activate
- Adding caching to an application
- Implementing rate limiting or throttling
- Building distributed locks or coordination
- Setting up session or token storage
- Using Pub/Sub or Redis Streams for messaging
- Configuring Redis in production (pooling, eviction, clustering)
## Data Structure Cheat Sheet
| Use Case | Structure | Example Key |
|----------|-----------|-------------|
| Simple cache | String | `product:123` |
| User session | Hash | `session:abc` |
| Leaderboard | Sorted Set | `scores:weekly` |
| Unique visitors | Set | `visitors:2024-01-01` |
| Activity feed | List | `feed:user:456` |
| Event stream | Stream | `events:orders` |
| Counters / rate limits | String (INCR) | `ratelimit:user:123` |
| Bloom filter / HLL | HyperLogLog | `hll:pageviews` |
## Core Patterns
### Cache-Aside (Lazy Loading)
```python
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_product(product_id: int):
cache_key = f"product:{product_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
product = db.query("SELECT * FROM products WHERE id = %s", product_id)
r.setex(cache_key, 3600, json.dumps(product)) # TTL: 1 hour
return product
```
### Write-Through Cache
```python
def update_product(product_id: int, data: dict):
# Write to DB first
db.execute("UPDATE products SET ... WHERE id = %s", product_id)
# Immediately update cache
cache_key = f"product:{product_id}"
r.setex(cache_key, 3600, json.dumps(data))
```
### Cache Invalidation
```python
# Tag-based invalidation — group related keys under a set
def cache_product(product_id: int, category_id: int, data: dict):
key = f"product:{product_id}"
tag = f"tag:category:{category_id}"
pipe = r.pipeline(transaction=True)
pipe.setex(key, 3600, json.dumps(data))
pipe.sadd(tag, key)
pipe.expire(tag, 3600)
pipe.execute()
def invalidate_category(category_id: int):
tag = f"tag:category:{category_id}"
keys = r.smembers(tag)
if keys:
r.delete(*keys)
r.delete(tag)
```
### Session Storage
```python
import time
import uuid
def create_session(user_id: int, ttl: int = 86400) -> str:
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
pipe = r.pipeline(transaction=True)
pipe.hset(key, mapping={
"user_id": user_id,
"created_at": int(time.time()),
})
pipe.expire(key, ttl)
pipe.execute()
return session_id
def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
return data if data else None
def delete_session(session_id: str):
r.delete(f"session:{session_id}")
```
## Rate Limiting
### Fixed Window (Simple)
```python
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}:{int(time.time()) // window}"
pipe = r.pipeline(transaction=True)
pipe.incr(key)
pipe.expire(key, window)
count, _ = pipe.execute()
return count > limit
```
### Sliding Window (Lua — Atomic)
```lua
-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
-- Use unique member (now + sequence) to avoid collisions within the same millisecond
local seq_key = key .. ':seq'
local seq = redis.call('INCR', seq_key)
redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
redis.call('ZADD', key, now, now .. '-' .. seq)
redis.call('EXPIRE', key, math.ceil(window / 1000))
return 1
end
return 0
```
```python
sliding_window = r.register_script(open('sliding_window.lua').read())
def allow_request(user_id: int) -> bool:
key = f"ratelimit:sliding:{user_id}"
now = int(time.time() * 1000)
return bool(sliding_window(keys=[key], args=[now, 60000, 100]))
```
## Distributed Locks
### Distributed Lock (Single Node — SET NX PX)
```python
import uuid
def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
lock_key = f"lock:{resource}"
token = str(uuid.uuid4())
acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
return token if acquired else None
def release_lock(resource: str, token: str) -> bool:
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = r.eval(release_script, 1, f"lock:{resource}", token)
return bool(result)
# Usage
token = acquire_lock("order:payment:123")
if token:
try:
process_payment()
finally:
release_lock("order:payment:123", token)
```
> For multi-node setups use the `redlock-py` library which implements the full Redlock algorithm.
## Pub/Sub & Streams
### Pub/Sub (Fire-and-Forget)
```python
# Publisher
def publish_event(channel: str, payload: dict):
r.publish(channel, json.dumps(payload))
# Subscriber (blocking — run in separate thread/process)
def subscribe_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
handle(json.loads(message['data']))
```
### Redis Streams (Durable Queue)
```python
# Producer
def emit(stream: str, event: dict):
r.xadd(stream, event, maxlen=10000) # Cap stream length
# Consumer group — guarantees at-least-once delivery
try:
r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)
except Exception:
pass # Group already exists
def consume(stream: str, group: str, consumer: str):
while True:
messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000)
for _, entries in (messages or []):
for msg_id, data in entries:
process(data)
r.xack(stream, group, msg_id)
```
> Prefer **Streams** over Pub/Sub when you need delivery guarantees, consumer groups, or replay.
## Key Design
### Naming Conventions
```
# Pattern: resource:id:field
user:123:profile
order:456:status
cache:product:789
# Pattern: namespace:resource:id
myapp:session:abc123
myapp:ratelimit:user:123
# Pattern: resource:date (time-bound keys)
stats:pageviews:2024-01-01
```
### TTL Strategy
| Data Type | Suggested TTL |
|-----------|--------------|
| User session | 24h (`86400`) |
| API response cache | 515 min |
| Rate limit window | Match window size |
| Short-lived tokens | 510 min |
| Leaderboard | 1h24h |
| Static/reference data | 1h1 week |
Always set a TTL. Keys without TTL accumulate indefinitely and cause memory pressure.
## Connection Management
### Connection Pooling
```python
from redis import ConnectionPool, Redis
pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=20,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
r = Redis(connection_pool=pool)
```
### Cluster Mode
```python
from redis.cluster import RedisCluster
r = RedisCluster(
startup_nodes=[{"host": "redis-1", "port": 6379}],
decode_responses=True,
skip_full_coverage_check=True,
)
```
### Sentinel (High Availability)
```python
from redis.sentinel import Sentinel
sentinel = Sentinel(
[('sentinel-1', 26379), ('sentinel-2', 26379)],
socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)
```
## Eviction Policies
| Policy | Behavior | Best For |
|--------|----------|----------|
| `noeviction` | Error on write when full | Queues / critical data |
| `allkeys-lru` | Evict least recently used | General cache |
| `volatile-lru` | LRU only among keys with TTL | Mixed data store |
| `allkeys-lfu` | Evict least frequently used | Skewed access patterns |
| `volatile-ttl` | Evict soonest-to-expire | Prioritize long-lived data |
Set via `redis.conf`: `maxmemory-policy allkeys-lru`
## Anti-Patterns
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Keys with no TTL | Memory grows unbounded | Always set TTL |
| `KEYS *` in production | Blocks the server (O(N)) | Use `SCAN` cursor |
| Storing large blobs (>100KB) | Slow serialization, memory pressure | Store reference + fetch from object store |
| Single Redis for everything | No isolation between cache & queue | Use separate DBs or instances |
| Ignoring connection pool limits | Connection exhaustion under load | Size pool to workload |
| Not handling cache miss stampede | Thundering herd on cold start | Use locks or probabilistic early expiry |
| `FLUSHALL` without thought | Wipes entire instance | Scope deletes by key pattern |
### Cache Miss Stampede Prevention
```python
import threading
_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()
def get_with_lock(key: str, fetch_fn, ttl: int = 300):
cached = r.get(key)
if cached:
return json.loads(cached)
with _locks_mutex:
if key not in _locks:
_locks[key] = threading.Lock()
lock = _locks[key]
with lock:
cached = r.get(key) # Re-check after acquiring lock
if cached:
return json.loads(cached)
value = fetch_fn()
r.setex(key, ttl, json.dumps(value))
return value
```
> Note: for multi-process deployments, replace the in-process lock with `acquire_lock`/`release_lock` from the Distributed Locks section above.
## Examples
**Add caching to a Django/Flask API endpoint:**
Use cache-aside with `setex` and a 5-minute TTL on the response. Key on the request parameters.
**Rate-limit an API by user:**
Use fixed-window with `pipeline(transaction=True)` for low-traffic endpoints; use sliding-window Lua for accurate per-user throttling.
**Coordinate a background job across workers:**
Use `acquire_lock` with a TTL that exceeds the expected job duration. Always release in a `finally` block.
**Fan-out notifications to multiple subscribers:**
Use Pub/Sub for fire-and-forget. Switch to Streams if you need guaranteed delivery or replay for late consumers.
## Quick Reference
| Pattern | When to Use |
|---------|-------------|
| Cache-aside | Read-heavy, tolerate slight staleness |
| Write-through | Strong consistency required |
| Distributed lock | Prevent concurrent access to a resource |
| Sliding window rate limit | Accurate per-user throttling |
| Redis Streams | Durable event queue with consumer groups |
| Pub/Sub | Broadcast with no delivery guarantees needed |
| Sorted Set leaderboard | Ranked scoring, pagination |
| HyperLogLog | Approximate unique count at low memory |
## Related
- Skill: `postgres-patterns` — relational data patterns
- Skill: `backend-patterns` — API and service layer patterns
- Skill: `database-migrations` — schema versioning
- Skill: `django-patterns` — Django cache framework integration
- Agent: `database-reviewer` — full database review workflow