Pub/Sub & Real-time Messaging

Redis Pub/Sub enables real-time communication between services — perfect for chat, notifications, and live updates.

How Pub/Sub Works#

  • Publishers send messages to channels
  • Subscribers receive messages from channels they're subscribed to
  • Fire and forget — messages aren't stored, only delivered to current subscribers

Basic Pub/Sub (Node.js)#

TypeScript
import Redis from 'ioredis'; // Subscriber connection (dedicated) const subscriber = new Redis('rediss://:YOUR_PASSWORD@YOUR_INSTANCE.eu.arctickey.com:6379'); // Publisher connection (can be shared) const publisher = new Redis('rediss://:YOUR_PASSWORD@YOUR_INSTANCE.eu.arctickey.com:6379'); // Subscribe to channels subscriber.subscribe('notifications', 'chat:general', (err, count) => { console.log(`Subscribed to ${count} channels`); }); // Handle messages subscriber.on('message', (channel, message) => { console.log(`[${channel}] ${message}`); const data = JSON.parse(message); // Handle the message... }); // Publish messages await publisher.publish('notifications', JSON.stringify({ type: 'alert', message: 'New order received!', timestamp: Date.now(), }));

Important: A Redis connection in subscribe mode can't run other commands. Use separate connections for publishing and subscribing.

Pattern Subscriptions#

Subscribe to multiple channels with patterns:

TypeScript
// Subscribe to all chat channels subscriber.psubscribe('chat:*'); subscriber.on('pmessage', (pattern, channel, message) => { console.log(`[${pattern}] ${channel}: ${message}`); // pattern: 'chat:*' // channel: 'chat:room-123' // message: '{"user": "alice", "text": "Hello!"}' }); // Publish to specific room await publisher.publish('chat:room-123', JSON.stringify({ user: 'alice', text: 'Hello!', }));

Real-time Chat Example#

TypeScript
import express from 'express'; import { Server } from 'socket.io'; import Redis from 'ioredis'; const app = express(); const io = new Server(server); const redisSub = new Redis('rediss://:YOUR_PASSWORD@YOUR_INSTANCE.eu.arctickey.com:6379'); const redisPub = new Redis('rediss://:YOUR_PASSWORD@YOUR_INSTANCE.eu.arctickey.com:6379'); // Subscribe to chat channels redisSub.psubscribe('chat:*'); redisSub.on('pmessage', (pattern, channel, message) => { const roomId = channel.split(':')[1]; io.to(roomId).emit('message', JSON.parse(message)); }); // Socket.IO connection io.on('connection', (socket) => { socket.on('join', (roomId) => { socket.join(roomId); }); socket.on('message', async (data) => { // Publish to Redis (broadcasts to all servers) await redisPub.publish(`chat:${data.roomId}`, JSON.stringify({ user: data.user, text: data.text, timestamp: Date.now(), })); }); });

Live Notifications#

TypeScript
// Notification service class NotificationService { private publisher: Redis; constructor() { this.publisher = new Redis('rediss://:YOUR_PASSWORD@YOUR_INSTANCE.eu.arctickey.com:6379'); } async notify(userId: string, notification: object) { await this.publisher.publish( `user:${userId}:notifications`, JSON.stringify(notification) ); } async broadcast(notification: object) { await this.publisher.publish('global:notifications', JSON.stringify(notification)); } } // API server subscribing const subscriber = new Redis('rediss://:YOUR_PASSWORD@YOUR_INSTANCE.eu.arctickey.com:6379'); subscriber.psubscribe('user:*:notifications', 'global:notifications'); subscriber.on('pmessage', (pattern, channel, message) => { if (channel.startsWith('user:')) { const userId = channel.split(':')[1]; sendToUser(userId, JSON.parse(message)); } else { broadcastToAll(JSON.parse(message)); } });

Cache Invalidation with Pub/Sub#

Coordinate cache invalidation across servers:

TypeScript
// When data changes, publish invalidation async function updateProduct(id: string, data: object) { await db.products.update(id, data); // Notify all servers to invalidate cache await publisher.publish('cache:invalidate', JSON.stringify({ type: 'product', id: id, })); } // All servers subscribe subscriber.subscribe('cache:invalidate'); subscriber.on('message', (channel, message) => { const { type, id } = JSON.parse(message); localCache.delete(`${type}:${id}`); });

Python Pub/Sub#

Python
import redis import json import threading r = redis.Redis( host='YOUR_INSTANCE.eu.arctickey.com', port=6379, password='YOUR_PASSWORD', ssl=True ) # Publisher def publish_message(channel, data): r.publish(channel, json.dumps(data)) # Subscriber (run in separate thread) def subscribe_to_channels(): pubsub = r.pubsub() pubsub.subscribe('notifications') for message in pubsub.listen(): if message['type'] == 'message': data = json.loads(message['data']) print(f"Received: {data}") # Start subscriber in background thread = threading.Thread(target=subscribe_to_channels) thread.daemon = True thread.start() # Publish publish_message('notifications', {'alert': 'Hello!'})

Best Practices#

  1. Use dedicated connections for subscribers
  2. Don't store critical data — Pub/Sub is fire-and-forget
  3. Use pattern subscriptions sparingly — they're more expensive
  4. Handle reconnections — subscribers need to resubscribe after disconnect
  5. Consider Redis Streams for persistent messaging (messages are stored)

Pub/Sub vs Streams#

FeaturePub/SubStreams
PersistenceNoYes
Replay messagesNoYes
Consumer groupsNoYes
Best forReal-time broadcastEvent sourcing, queues