Claude ec9ace9c54 docs: add native Japanese translation of ECC documentation (ja-JP)
Translate everything-claude-code repository to Japanese including:
- 17 root documentation files
- 60 agent documentation files
- 80 command documentation files
- 99 rule files across 18 language directories (common, angular, arkts, cpp, csharp, dart, fsharp, golang, java, kotlin, perl, php, python, ruby, rust, swift, typescript, web)
- 199 skill documentation files

Total: 455 files translated to Japanese with:
- Consistent terminology glossary applied throughout
- YAML field names preserved in English (name, description, etc.)
- Code blocks and examples untouched (comments translated)
- Markdown structure and relative links preserved
- Professional translation maintaining technical accuracy

This translation expands ECC accessibility to Japanese-speaking developers and teams.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-05-17 02:31:40 -04:00

12 KiB
Raw Blame History

name, description, origin
name description origin
redis-patterns Redisデータ構造パターン、キャッシング戦略、分散ロック、レート制限、Pub/Sub、本番アプリケーション用コネクション管理。 ECC

Redis Patterns

一般的なバックエンド使用例に対するRedisベストプラクティスの参考資料。

How It Works

Redisはメモリ内データ構造ストアで、文字列、ハッシュ、リスト、セット、ソート済みセット、ストリームなどをサポートします。単一インスタンスでは個々のRedisコマンドは原子的ですが、マルチステップワークフローはLuaスクリプト、MULTI/EXECトランザクション、または明示的な同期化が必要です。RDBスナップショットまたはAOFログを通じてデータをオプションで永続化します。クライアントはRESPプロトコルを使用してTCP経由で通信します。接続プール不可欠でリクエストごとのハンドシェイクオーバーヘッドを回避します。

When to Activate

  • アプリケーションにキャッシング追加
  • レート制限またはスロットリング実装
  • 分散ロックまたはコーディネーション構築
  • セッションまたはトークンストレージ設定
  • Pub/SubまたはRedis Streams for messaging使用
  • 本番環境でRedis設定プール、削除、クラスタリング

Data Structure Cheat Sheet

Use Case Structure Example Key
Simple cache String product:123
User session Hash session:abc
Leaderboard Sorted Set scores:weekly
Unique visitors Set visitors:2024-01-01
Activity feed List feed:user:456
Event stream Stream events:orders
Counters / rate limits String (INCR) ratelimit:user:123
Bloom filter / HLL HyperLogLog hll:pageviews

Core Patterns

Cache-Aside (Lazy Loading)

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_product(product_id: int):
    cache_key = f"product:{product_id}"
    cached = r.get(cache_key)

    if cached:
        return json.loads(cached)

    product = db.query("SELECT * FROM products WHERE id = %s", product_id)
    r.setex(cache_key, 3600, json.dumps(product))  # TTL: 1 hour
    return product

Write-Through Cache

def update_product(product_id: int, data: dict):
    # DB書き込み先
    db.execute("UPDATE products SET ... WHERE id = %s", product_id)

    # キャッシュを即座に更新
    cache_key = f"product:{product_id}"
    r.setex(cache_key, 3600, json.dumps(data))

Cache Invalidation

# タグベース削除 — セット内で関連キーをグループ化
def cache_product(product_id: int, category_id: int, data: dict):
    key = f"product:{product_id}"
    tag = f"tag:category:{category_id}"
    pipe = r.pipeline(transaction=True)
    pipe.setex(key, 3600, json.dumps(data))
    pipe.sadd(tag, key)
    pipe.expire(tag, 3600)
    pipe.execute()

def invalidate_category(category_id: int):
    tag = f"tag:category:{category_id}"
    keys = r.smembers(tag)
    if keys:
        r.delete(*keys)
    r.delete(tag)

Session Storage

import time
import uuid

def create_session(user_id: int, ttl: int = 86400) -> str:
    session_id = str(uuid.uuid4())
    key = f"session:{session_id}"
    pipe = r.pipeline(transaction=True)
    pipe.hset(key, mapping={
        "user_id": user_id,
        "created_at": int(time.time()),
    })
    pipe.expire(key, ttl)
    pipe.execute()
    return session_id

def get_session(session_id: str) -> dict | None:
    data = r.hgetall(f"session:{session_id}")
    return data if data else None

def delete_session(session_id: str):
    r.delete(f"session:{session_id}")

Rate Limiting

Fixed Window (Simple)

def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
    key = f"ratelimit:{user_id}:{int(time.time()) // window}"
    pipe = r.pipeline(transaction=True)
    pipe.incr(key)
    pipe.expire(key, window)
    count, _ = pipe.execute()
    return count > limit

Sliding Window (Lua — Atomic)

-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)

if count < limit then
    -- Use unique member (now + sequence) to avoid collisions within the same millisecond
    local seq_key = key .. ':seq'
    local seq = redis.call('INCR', seq_key)
    redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
    redis.call('ZADD', key, now, now .. '-' .. seq)
    redis.call('EXPIRE', key, math.ceil(window / 1000))
    return 1
end
return 0
sliding_window = r.register_script(open('sliding_window.lua').read())

def allow_request(user_id: int) -> bool:
    key = f"ratelimit:sliding:{user_id}"
    now = int(time.time() * 1000)
    return bool(sliding_window(keys=[key], args=[now, 60000, 100]))

Distributed Locks

Distributed Lock (Single Node — SET NX PX)

import uuid

def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
    lock_key = f"lock:{resource}"
    token = str(uuid.uuid4())
    acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
    return token if acquired else None

def release_lock(resource: str, token: str) -> bool:
    release_script = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    result = r.eval(release_script, 1, f"lock:{resource}", token)
    return bool(result)

# Usage
token = acquire_lock("order:payment:123")
if token:
    try:
        process_payment()
    finally:
        release_lock("order:payment:123", token)

マルチード設定の場合、フルRedlockアルゴリズムを実装する redlock-py ライブラリを使用してください。

Pub/Sub & Streams

Pub/Sub (Fire-and-Forget)

# Publisher
def publish_event(channel: str, payload: dict):
    r.publish(channel, json.dumps(payload))

# Subscriber (blocking — run in separate thread/process)
def subscribe_events(channel: str):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            handle(json.loads(message['data']))

Redis Streams (Durable Queue)

# Producer
def emit(stream: str, event: dict):
    r.xadd(stream, event, maxlen=10000)  # Cap stream length

# Consumer group — guarantees at-least-once delivery
try:
    r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)
except Exception:
    pass  # Group already exists

def consume(stream: str, group: str, consumer: str):
    while True:
        messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000)
        for _, entries in (messages or []):
            for msg_id, data in entries:
                process(data)
                r.xack(stream, group, msg_id)

配信保証、コンシューマーグループ、または再生が必要な場合、Pub/Sub代わりにStreamsを優先してください。

Key Design

Naming Conventions

# Pattern: resource:id:field
user:123:profile
order:456:status
cache:product:789

# Pattern: namespace:resource:id
myapp:session:abc123
myapp:ratelimit:user:123

# Pattern: resource:date (time-bound keys)
stats:pageviews:2024-01-01

TTL Strategy

Data Type Suggested TTL
User session 24h (86400)
API response cache 515 min
Rate limit window Match window size
Short-lived tokens 510 min
Leaderboard 1h24h
Static/reference data 1h1 week

常にTTLを設定してください。TTLなしのキーは無限に蓄積してメモリ圧力を引き起こします。

Connection Management

Connection Pooling

from redis import ConnectionPool, Redis

pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20,
    decode_responses=True,
    socket_connect_timeout=2,
    socket_timeout=2,
)

r = Redis(connection_pool=pool)

Cluster Mode

from redis.cluster import RedisCluster

r = RedisCluster(
    startup_nodes=[{"host": "redis-1", "port": 6379}],
    decode_responses=True,
    skip_full_coverage_check=True,
)

Sentinel (High Availability)

from redis.sentinel import Sentinel

sentinel = Sentinel(
    [('sentinel-1', 26379), ('sentinel-2', 26379)],
    socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)

Eviction Policies

Policy Behavior Best For
noeviction Error on write when full Queues / critical data
allkeys-lru Evict least recently used General cache
volatile-lru LRU only among keys with TTL Mixed data store
allkeys-lfu Evict least frequently used Skewed access patterns
volatile-ttl Evict soonest-to-expire Prioritize long-lived data

redis.confを通じて設定:maxmemory-policy allkeys-lru

Anti-Patterns

Anti-Pattern Problem Fix
Keys with no TTL Memory grows unbounded Always set TTL
KEYS * in production Blocks the server (O(N)) Use SCAN cursor
Storing large blobs (>100KB) Slow serialization, memory pressure Store reference + fetch from object store
Single Redis for everything No isolation between cache & queue Use separate DBs or instances
Ignoring connection pool limits Connection exhaustion under load Size pool to workload
Not handling cache miss stampede Thundering herd on cold start Use locks or probabilistic early expiry
FLUSHALL without thought Wipes entire instance Scope deletes by key pattern

Cache Miss Stampede Prevention

import threading

_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()

def get_with_lock(key: str, fetch_fn, ttl: int = 300):
    cached = r.get(key)
    if cached:
        return json.loads(cached)

    with _locks_mutex:
        if key not in _locks:
            _locks[key] = threading.Lock()
        lock = _locks[key]
    with lock:
        cached = r.get(key)  # Re-check after acquiring lock
        if cached:
            return json.loads(cached)
        value = fetch_fn()
        r.setex(key, ttl, json.dumps(value))
        return value

マルチプロセスデプロイメント:インプロセスロックを上記の分散ロックセクション から acquire_lock/release_lock に置き換えてください。

Examples

Django/Flask APIエンドポイントにキャッシング追加 レスポンスに5分TTLでCache-asideを使用。リクエストパラメータでキーを指定。

ユーザーごとにAPIレート制限 低トラフィックエンドポイントに固定ウィンドウを pipeline(transaction=True) で使用正確なユーザーごと制限にはsliding-windowの Lua使用。

ワーカー間のバックグラウンドジョブ調整: 予想ジョブ期間を超えるTTLで acquire_lock を使用。常に finally ブロックでリリース。

複数購読者への通知のファンアウト: ファイアアンドフォーゲットにPub/Subを使用。保証配信または再生が必要な場合、Streamsに切り替え。

Quick Reference

Pattern When to Use
Cache-aside Read-heavy, tolerate slight staleness
Write-through Strong consistency required
Distributed lock Prevent concurrent access to a resource
Sliding window rate limit Accurate per-user throttling
Redis Streams Durable event queue with consumer groups
Pub/Sub Broadcast with no delivery guarantees needed
Sorted Set leaderboard Ranked scoring, pagination
HyperLogLog Approximate unique count at low memory
  • Skill: postgres-patterns — リレーショナルデータパターン
  • Skill: backend-patterns — APIおよびサービスレイヤーパターン
  • Skill: database-migrations — スキーマバージョニング
  • Skill: django-patterns — Djangoキャッシュフレームワーク統合
  • Agent: database-reviewer — 全データベースレビューワークフロー