Coordinate across multiple servers with Redis-based distributed locks. Prevent race conditions, ensure exactly-once processing, and manage shared resources.
The simplest reliable distributed lock:
import Redis from 'ioredis'; import crypto from 'crypto'; const redis = new Redis(process.env.ARCTICKEY_URL); class DistributedLock { private lockValue: string; constructor() { // Unique value to identify this lock holder this.lockValue = crypto.randomUUID(); } /** * Try to acquire a lock * @param key Lock identifier * @param ttlMs Lock timeout in milliseconds * @returns true if lock acquired, false otherwise */ async acquire(key: string, ttlMs: number = 30000): Promise<boolean> { const result = await redis.set( `lock:${key}`, this.lockValue, 'PX', ttlMs, // Expiry in milliseconds 'NX' // Only set if not exists ); return result === 'OK'; } /** * Release a lock (only if we own it) */ async release(key: string): Promise<boolean> { // Lua script ensures atomic check-and-delete const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end `; const result = await redis.eval(script, 1, `lock:${key}`, this.lockValue); return result === 1; } } // Usage const lock = new DistributedLock(); if (await lock.acquire('process-payment:order-123')) { try { // Critical section - only one server executes this await processPayment(orderId); } finally { await lock.release('process-payment:order-123'); } } else { console.log('Another server is processing this payment'); }
Wait for lock to become available:
class RetryableLock extends DistributedLock { /** * Acquire lock with retries * @param key Lock identifier * @param ttlMs Lock timeout * @param retryDelayMs Delay between retries * @param maxRetries Maximum retry attempts (-1 for infinite) */ async acquireWithRetry( key: string, ttlMs: number = 30000, retryDelayMs: number = 100, maxRetries: number = 50 ): Promise<boolean> { let attempts = 0; while (maxRetries === -1 || attempts < maxRetries) { if (await this.acquire(key, ttlMs)) { return true; } attempts++; await this.sleep(retryDelayMs); } return false; } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } } // Usage const lock = new RetryableLock(); // Wait up to 5 seconds for lock if (await lock.acquireWithRetry('resource', 30000, 100, 50)) { try { await doExclusiveWork(); } finally { await lock.release('resource'); } }
Automatically extend lock while work is in progress:
class ExtendableLock extends DistributedLock { private extensionInterval?: NodeJS.Timeout; async acquireWithExtension( key: string, ttlMs: number = 30000, extensionIntervalMs: number = 10000 ): Promise<boolean> { const acquired = await this.acquire(key, ttlMs); if (acquired) { // Periodically extend the lock this.extensionInterval = setInterval(async () => { await this.extend(key, ttlMs); }, extensionIntervalMs); } return acquired; } async extend(key: string, ttlMs: number): Promise<boolean> { // Only extend if we still own the lock const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) else return 0 end `; const result = await redis.eval( script, 1, `lock:${key}`, this.lockValue, ttlMs ); return result === 1; } async releaseWithExtension(key: string): Promise<boolean> { if (this.extensionInterval) { clearInterval(this.extensionInterval); this.extensionInterval = undefined; } return this.release(key); } } // Usage for long-running tasks const lock = new ExtendableLock(); if (await lock.acquireWithExtension('long-task', 30000, 10000)) { try { // This can take minutes - lock auto-extends every 10s await processLargeDataset(); } finally { await lock.releaseWithExtension('long-task'); } }
A complete mutex implementation with all features:
interface MutexOptions { ttlMs?: number; retryDelayMs?: number; maxRetries?: number; autoExtend?: boolean; extensionIntervalMs?: number; } class Mutex { private redis: Redis; private lockValue: string; private extensionInterval?: NodeJS.Timeout; constructor(redis: Redis) { this.redis = redis; this.lockValue = crypto.randomUUID(); } async lock(key: string, options: MutexOptions = {}): Promise<() => Promise<void>> { const { ttlMs = 30000, retryDelayMs = 100, maxRetries = 50, autoExtend = false, extensionIntervalMs = ttlMs / 3, } = options; const lockKey = `mutex:${key}`; let attempts = 0; while (attempts < maxRetries) { const acquired = await this.redis.set( lockKey, this.lockValue, 'PX', ttlMs, 'NX' ); if (acquired === 'OK') { // Start auto-extension if enabled if (autoExtend) { this.startExtension(lockKey, ttlMs, extensionIntervalMs); } // Return unlock function return async () => { this.stopExtension(); await this.releaseLock(lockKey); }; } attempts++; await new Promise(r => setTimeout(r, retryDelayMs)); } throw new Error(`Failed to acquire lock "${key}" after ${maxRetries} attempts`); } private startExtension(key: string, ttlMs: number, intervalMs: number) { this.extensionInterval = setInterval(async () => { const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) end return 0 `; await this.redis.eval(script, 1, key, this.lockValue, ttlMs); }, intervalMs); } private stopExtension() { if (this.extensionInterval) { clearInterval(this.extensionInterval); this.extensionInterval = undefined; } } private async releaseLock(key: string) { const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) end return 0 `; await this.redis.eval(script, 1, key, this.lockValue); } } // Usage const mutex = new Mutex(redis); const unlock = await mutex.lock('critical-section', { ttlMs: 30000, autoExtend: true, }); try { await doCriticalWork(); } finally { await unlock(); }
async function processWebhook(webhookId: string, payload: any) { const lock = new DistributedLock(); const lockKey = `webhook:${webhookId}`; // Try to acquire lock - if failed, another server already processing if (!await lock.acquire(lockKey, 60000)) { console.log(`Webhook ${webhookId} already being processed`); return { status: 'duplicate' }; } try { // Check if already processed (idempotency) const processed = await redis.get(`processed:${webhookId}`); if (processed) { return { status: 'already_processed' }; } // Process the webhook await handleWebhookPayload(payload); // Mark as processed (keep for 24h for dedup) await redis.setex(`processed:${webhookId}`, 86400, '1'); return { status: 'processed' }; } finally { await lock.release(lockKey); } }
class LeaderElection { private redis: Redis; private nodeId: string; private isLeader: boolean = false; private heartbeatInterval?: NodeJS.Timeout; constructor(redis: Redis) { this.redis = redis; this.nodeId = `node:${crypto.randomUUID()}`; } async start(key: string, ttlMs: number = 10000) { // Try to become leader const tryBecomeLeader = async () => { const result = await this.redis.set( `leader:${key}`, this.nodeId, 'PX', ttlMs, 'NX' ); if (result === 'OK') { if (!this.isLeader) { this.isLeader = true; console.log(`Node ${this.nodeId} became leader`); this.onBecameLeader(); } // Refresh our leadership await this.redis.pexpire(`leader:${key}`, ttlMs); } else { // Check if we're still leader const currentLeader = await this.redis.get(`leader:${key}`); if (currentLeader === this.nodeId) { await this.redis.pexpire(`leader:${key}`, ttlMs); } else if (this.isLeader) { this.isLeader = false; console.log(`Node ${this.nodeId} lost leadership`); this.onLostLeadership(); } } }; // Initial attempt await tryBecomeLeader(); // Periodic heartbeat/election this.heartbeatInterval = setInterval(tryBecomeLeader, ttlMs / 3); } stop() { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); } } isCurrentLeader(): boolean { return this.isLeader; } // Override these protected onBecameLeader() {} protected onLostLeadership() {} } // Usage class CronScheduler extends LeaderElection { protected onBecameLeader() { // Start running cron jobs this.startCronJobs(); } protected onLostLeadership() { // Stop cron jobs this.stopCronJobs(); } } const scheduler = new CronScheduler(redis); await scheduler.start('cron-scheduler');
class RateLimitedResource { private redis: Redis; private maxConcurrent: number; constructor(redis: Redis, maxConcurrent: number) { this.redis = redis; this.maxConcurrent = maxConcurrent; } async acquire(resourceKey: string, ttlMs: number = 30000): Promise<string | null> { const slotId = crypto.randomUUID(); const key = `semaphore:${resourceKey}`; const now = Date.now(); // Lua script for atomic semaphore const script = ` -- Clean up expired slots redis.call("ZREMRANGEBYSCORE", KEYS[1], 0, ARGV[1]) -- Check current count local count = redis.call("ZCARD", KEYS[1]) if count < tonumber(ARGV[2]) then -- Add our slot redis.call("ZADD", KEYS[1], ARGV[3], ARGV[4]) return 1 end return 0 `; const result = await this.redis.eval( script, 1, key, now, // expired threshold this.maxConcurrent, // max slots now + ttlMs, // our expiry slotId // our slot id ); return result === 1 ? slotId : null; } async release(resourceKey: string, slotId: string) { await this.redis.zrem(`semaphore:${resourceKey}`, slotId); } } // Usage: Max 5 concurrent API calls const apiLimiter = new RateLimitedResource(redis, 5); const slot = await apiLimiter.acquire('external-api'); if (slot) { try { await callExternalApi(); } finally { await apiLimiter.release('external-api', slot); } } else { // Too many concurrent requests throw new Error('Rate limit exceeded'); }
Distributed Locks
Coordinate across multiple servers with Redis-based distributed locks. Prevent race conditions, ensure exactly-once processing, and manage shared resources.
Why Redis for Distributed Locks?#
Basic Lock with SET NX#
The simplest reliable distributed lock:
import Redis from 'ioredis'; import crypto from 'crypto'; const redis = new Redis(process.env.ARCTICKEY_URL); class DistributedLock { private lockValue: string; constructor() { // Unique value to identify this lock holder this.lockValue = crypto.randomUUID(); } /** * Try to acquire a lock * @param key Lock identifier * @param ttlMs Lock timeout in milliseconds * @returns true if lock acquired, false otherwise */ async acquire(key: string, ttlMs: number = 30000): Promise<boolean> { const result = await redis.set( `lock:${key}`, this.lockValue, 'PX', ttlMs, // Expiry in milliseconds 'NX' // Only set if not exists ); return result === 'OK'; } /** * Release a lock (only if we own it) */ async release(key: string): Promise<boolean> { // Lua script ensures atomic check-and-delete const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end `; const result = await redis.eval(script, 1, `lock:${key}`, this.lockValue); return result === 1; } } // Usage const lock = new DistributedLock(); if (await lock.acquire('process-payment:order-123')) { try { // Critical section - only one server executes this await processPayment(orderId); } finally { await lock.release('process-payment:order-123'); } } else { console.log('Another server is processing this payment'); }Lock with Automatic Retry#
Wait for lock to become available:
class RetryableLock extends DistributedLock { /** * Acquire lock with retries * @param key Lock identifier * @param ttlMs Lock timeout * @param retryDelayMs Delay between retries * @param maxRetries Maximum retry attempts (-1 for infinite) */ async acquireWithRetry( key: string, ttlMs: number = 30000, retryDelayMs: number = 100, maxRetries: number = 50 ): Promise<boolean> { let attempts = 0; while (maxRetries === -1 || attempts < maxRetries) { if (await this.acquire(key, ttlMs)) { return true; } attempts++; await this.sleep(retryDelayMs); } return false; } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } } // Usage const lock = new RetryableLock(); // Wait up to 5 seconds for lock if (await lock.acquireWithRetry('resource', 30000, 100, 50)) { try { await doExclusiveWork(); } finally { await lock.release('resource'); } }Lock with Auto-Extension#
Automatically extend lock while work is in progress:
class ExtendableLock extends DistributedLock { private extensionInterval?: NodeJS.Timeout; async acquireWithExtension( key: string, ttlMs: number = 30000, extensionIntervalMs: number = 10000 ): Promise<boolean> { const acquired = await this.acquire(key, ttlMs); if (acquired) { // Periodically extend the lock this.extensionInterval = setInterval(async () => { await this.extend(key, ttlMs); }, extensionIntervalMs); } return acquired; } async extend(key: string, ttlMs: number): Promise<boolean> { // Only extend if we still own the lock const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) else return 0 end `; const result = await redis.eval( script, 1, `lock:${key}`, this.lockValue, ttlMs ); return result === 1; } async releaseWithExtension(key: string): Promise<boolean> { if (this.extensionInterval) { clearInterval(this.extensionInterval); this.extensionInterval = undefined; } return this.release(key); } } // Usage for long-running tasks const lock = new ExtendableLock(); if (await lock.acquireWithExtension('long-task', 30000, 10000)) { try { // This can take minutes - lock auto-extends every 10s await processLargeDataset(); } finally { await lock.releaseWithExtension('long-task'); } }Mutex Class (Full Implementation)#
A complete mutex implementation with all features:
interface MutexOptions { ttlMs?: number; retryDelayMs?: number; maxRetries?: number; autoExtend?: boolean; extensionIntervalMs?: number; } class Mutex { private redis: Redis; private lockValue: string; private extensionInterval?: NodeJS.Timeout; constructor(redis: Redis) { this.redis = redis; this.lockValue = crypto.randomUUID(); } async lock(key: string, options: MutexOptions = {}): Promise<() => Promise<void>> { const { ttlMs = 30000, retryDelayMs = 100, maxRetries = 50, autoExtend = false, extensionIntervalMs = ttlMs / 3, } = options; const lockKey = `mutex:${key}`; let attempts = 0; while (attempts < maxRetries) { const acquired = await this.redis.set( lockKey, this.lockValue, 'PX', ttlMs, 'NX' ); if (acquired === 'OK') { // Start auto-extension if enabled if (autoExtend) { this.startExtension(lockKey, ttlMs, extensionIntervalMs); } // Return unlock function return async () => { this.stopExtension(); await this.releaseLock(lockKey); }; } attempts++; await new Promise(r => setTimeout(r, retryDelayMs)); } throw new Error(`Failed to acquire lock "${key}" after ${maxRetries} attempts`); } private startExtension(key: string, ttlMs: number, intervalMs: number) { this.extensionInterval = setInterval(async () => { const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) end return 0 `; await this.redis.eval(script, 1, key, this.lockValue, ttlMs); }, intervalMs); } private stopExtension() { if (this.extensionInterval) { clearInterval(this.extensionInterval); this.extensionInterval = undefined; } } private async releaseLock(key: string) { const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) end return 0 `; await this.redis.eval(script, 1, key, this.lockValue); } } // Usage const mutex = new Mutex(redis); const unlock = await mutex.lock('critical-section', { ttlMs: 30000, autoExtend: true, }); try { await doCriticalWork(); } finally { await unlock(); }Practical Examples#
Prevent Duplicate Processing#
async function processWebhook(webhookId: string, payload: any) { const lock = new DistributedLock(); const lockKey = `webhook:${webhookId}`; // Try to acquire lock - if failed, another server already processing if (!await lock.acquire(lockKey, 60000)) { console.log(`Webhook ${webhookId} already being processed`); return { status: 'duplicate' }; } try { // Check if already processed (idempotency) const processed = await redis.get(`processed:${webhookId}`); if (processed) { return { status: 'already_processed' }; } // Process the webhook await handleWebhookPayload(payload); // Mark as processed (keep for 24h for dedup) await redis.setex(`processed:${webhookId}`, 86400, '1'); return { status: 'processed' }; } finally { await lock.release(lockKey); } }Leader Election#
class LeaderElection { private redis: Redis; private nodeId: string; private isLeader: boolean = false; private heartbeatInterval?: NodeJS.Timeout; constructor(redis: Redis) { this.redis = redis; this.nodeId = `node:${crypto.randomUUID()}`; } async start(key: string, ttlMs: number = 10000) { // Try to become leader const tryBecomeLeader = async () => { const result = await this.redis.set( `leader:${key}`, this.nodeId, 'PX', ttlMs, 'NX' ); if (result === 'OK') { if (!this.isLeader) { this.isLeader = true; console.log(`Node ${this.nodeId} became leader`); this.onBecameLeader(); } // Refresh our leadership await this.redis.pexpire(`leader:${key}`, ttlMs); } else { // Check if we're still leader const currentLeader = await this.redis.get(`leader:${key}`); if (currentLeader === this.nodeId) { await this.redis.pexpire(`leader:${key}`, ttlMs); } else if (this.isLeader) { this.isLeader = false; console.log(`Node ${this.nodeId} lost leadership`); this.onLostLeadership(); } } }; // Initial attempt await tryBecomeLeader(); // Periodic heartbeat/election this.heartbeatInterval = setInterval(tryBecomeLeader, ttlMs / 3); } stop() { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); } } isCurrentLeader(): boolean { return this.isLeader; } // Override these protected onBecameLeader() {} protected onLostLeadership() {} } // Usage class CronScheduler extends LeaderElection { protected onBecameLeader() { // Start running cron jobs this.startCronJobs(); } protected onLostLeadership() { // Stop cron jobs this.stopCronJobs(); } } const scheduler = new CronScheduler(redis); await scheduler.start('cron-scheduler');Rate-Limited Resource Access#
class RateLimitedResource { private redis: Redis; private maxConcurrent: number; constructor(redis: Redis, maxConcurrent: number) { this.redis = redis; this.maxConcurrent = maxConcurrent; } async acquire(resourceKey: string, ttlMs: number = 30000): Promise<string | null> { const slotId = crypto.randomUUID(); const key = `semaphore:${resourceKey}`; const now = Date.now(); // Lua script for atomic semaphore const script = ` -- Clean up expired slots redis.call("ZREMRANGEBYSCORE", KEYS[1], 0, ARGV[1]) -- Check current count local count = redis.call("ZCARD", KEYS[1]) if count < tonumber(ARGV[2]) then -- Add our slot redis.call("ZADD", KEYS[1], ARGV[3], ARGV[4]) return 1 end return 0 `; const result = await this.redis.eval( script, 1, key, now, // expired threshold this.maxConcurrent, // max slots now + ttlMs, // our expiry slotId // our slot id ); return result === 1 ? slotId : null; } async release(resourceKey: string, slotId: string) { await this.redis.zrem(`semaphore:${resourceKey}`, slotId); } } // Usage: Max 5 concurrent API calls const apiLimiter = new RateLimitedResource(redis, 5); const slot = await apiLimiter.acquire('external-api'); if (slot) { try { await callExternalApi(); } finally { await apiLimiter.release('external-api', slot); } } else { // Too many concurrent requests throw new Error('Rate limit exceeded'); }Best Practices#