classBroadcastNotifications {
// Broadcast to all connected usersasyncbroadcastGlobal(notification: Notification) {
await redis.publish('notifications:global', JSON.stringify(notification));
}
// Broadcast to users in a group/channelasyncbroadcastToGroup(groupId: string, notification: Notification) {
await redis.publish(`notifications:group:${groupId}`, JSON.stringify(notification));
}
// Subscribe user to a groupasyncjoinGroup(userId: string, groupId: string) {
await redis.sadd(`group:${groupId}:members`, userId);
await redis.sadd(`user:${userId}:groups`, groupId);
}
// Send to all members of a group (stored + real-time)asyncnotifyGroup(groupId: string, notification: Notification) {
const members = await redis.smembers(`group:${groupId}:members`);
const pipeline = redis.pipeline();
for (const userId of members) {
// Store for each user
pipeline.zadd(`notifications:store:${userId}`, Date.now(), notification.id);
// Publish real-time
pipeline.publish(`notifications:${userId}`, JSON.stringify(notification));
}
await pipeline.exec();
}
}
Real-Time Notifications
Build instant notification systems with Redis Pub/Sub. Deliver updates to millions of users in milliseconds.
Why Redis for Notifications?#
Basic Pub/Sub#
Simple real-time messaging between services:
import Redis from 'ioredis'; // Publisher (your backend services) const publisher = new Redis(process.env.ARCTICKEY_URL); async function sendNotification(userId: string, notification: { type: string; title: string; body: string; data?: Record<string, any>; }) { const message = JSON.stringify({ ...notification, id: crypto.randomUUID(), timestamp: Date.now(), }); // Publish to user's personal channel await publisher.publish(`notifications:${userId}`, message); } // Usage await sendNotification('user_123', { type: 'order_shipped', title: 'Your order is on the way!', body: 'Expected delivery: Tomorrow', data: { orderId: 'order_456' }, });// Subscriber (WebSocket server) const subscriber = new Redis(process.env.ARCTICKEY_URL); function subscribeToUser(userId: string, onMessage: (msg: any) => void) { const channel = `notifications:${userId}`; subscriber.subscribe(channel); subscriber.on('message', (ch, message) => { if (ch === channel) { onMessage(JSON.parse(message)); } }); return () => { subscriber.unsubscribe(channel); }; }WebSocket Integration#
Connect browsers to receive real-time updates:
import { WebSocketServer, WebSocket } from 'ws'; import Redis from 'ioredis'; const wss = new WebSocketServer({ port: 8080 }); // Track connections per user const userConnections = new Map<string, Set<WebSocket>>(); wss.on('connection', async (ws, req) => { // Authenticate user from token const token = new URL(req.url!, 'http://localhost').searchParams.get('token'); const user = await verifyToken(token); if (!user) return ws.close(4001, 'Unauthorized'); // Track connection if (!userConnections.has(user.id)) { userConnections.set(user.id, new Set()); await subscribeUser(user.id); } userConnections.get(user.id)!.add(ws); // Cleanup on disconnect ws.on('close', () => { const connections = userConnections.get(user.id); connections?.delete(ws); if (connections?.size === 0) { userConnections.delete(user.id); unsubscribeUser(user.id); } }); // Send unread notifications on connect const unread = await getUnreadNotifications(user.id); ws.send(JSON.stringify({ type: 'init', notifications: unread })); }); // Redis subscriber for all users const subscriber = new Redis(process.env.ARCTICKEY_URL); const subscribedUsers = new Set<string>(); async function subscribeUser(userId: string) { if (subscribedUsers.has(userId)) return; subscribedUsers.add(userId); await subscriber.subscribe(`notifications:${userId}`); } async function unsubscribeUser(userId: string) { subscribedUsers.delete(userId); await subscriber.unsubscribe(`notifications:${userId}`); } subscriber.on('message', (channel, message) => { const userId = channel.replace('notifications:', ''); const connections = userConnections.get(userId); connections?.forEach(ws => { if (ws.readyState === WebSocket.OPEN) { ws.send(message); } }); });Notification Storage#
Store notifications for offline users:
class NotificationStore { private ttl = 60 * 60 * 24 * 30; // 30 days async store(userId: string, notification: Notification) { const key = `notifications:store:${userId}`; const id = notification.id || crypto.randomUUID(); await redis.pipeline() // Store notification data .hset(`notification:${id}`, { ...notification, id, userId, read: 'false', createdAt: Date.now().toString(), }) .expire(`notification:${id}`, this.ttl) // Add to user's list (sorted by timestamp) .zadd(key, Date.now(), id) .expire(key, this.ttl) // Increment unread counter .hincrby(`user:${userId}:counters`, 'unread_notifications', 1) .exec(); return id; } async getRecent(userId: string, limit = 50): Promise<Notification[]> { const key = `notifications:store:${userId}`; // Get recent notification IDs const ids = await redis.zrevrange(key, 0, limit - 1); if (ids.length === 0) return []; // Fetch notification data const pipeline = redis.pipeline(); ids.forEach(id => pipeline.hgetall(`notification:${id}`)); const results = await pipeline.exec(); return results ?.map(([err, data]) => data as Record<string, string>) .filter(Boolean) .map(data => ({ ...data, read: data.read === 'true', createdAt: parseInt(data.createdAt), })) as Notification[]; } async markAsRead(userId: string, notificationId: string) { const wasUnread = await redis.hget(`notification:${notificationId}`, 'read'); if (wasUnread === 'false') { await redis.pipeline() .hset(`notification:${notificationId}`, 'read', 'true') .hincrby(`user:${userId}:counters`, 'unread_notifications', -1) .exec(); } } async markAllAsRead(userId: string) { const key = `notifications:store:${userId}`; const ids = await redis.zrange(key, 0, -1); const pipeline = redis.pipeline(); ids.forEach(id => pipeline.hset(`notification:${id}`, 'read', 'true')); pipeline.hset(`user:${userId}:counters`, 'unread_notifications', '0'); await pipeline.exec(); } async getUnreadCount(userId: string): Promise<number> { const count = await redis.hget(`user:${userId}:counters`, 'unread_notifications'); return parseInt(count || '0'); } }Broadcast Notifications#
Send to all users or specific groups:
class BroadcastNotifications { // Broadcast to all connected users async broadcastGlobal(notification: Notification) { await redis.publish('notifications:global', JSON.stringify(notification)); } // Broadcast to users in a group/channel async broadcastToGroup(groupId: string, notification: Notification) { await redis.publish(`notifications:group:${groupId}`, JSON.stringify(notification)); } // Subscribe user to a group async joinGroup(userId: string, groupId: string) { await redis.sadd(`group:${groupId}:members`, userId); await redis.sadd(`user:${userId}:groups`, groupId); } // Send to all members of a group (stored + real-time) async notifyGroup(groupId: string, notification: Notification) { const members = await redis.smembers(`group:${groupId}:members`); const pipeline = redis.pipeline(); for (const userId of members) { // Store for each user pipeline.zadd(`notifications:store:${userId}`, Date.now(), notification.id); // Publish real-time pipeline.publish(`notifications:${userId}`, JSON.stringify(notification)); } await pipeline.exec(); } }Server-Sent Events (SSE)#
Simpler alternative to WebSockets:
import express from 'express'; const app = express(); app.get('/api/notifications/stream', async (req, res) => { const user = await authenticateRequest(req); if (!user) return res.status(401).end(); // SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); // Send initial unread count const unreadCount = await store.getUnreadCount(user.id); res.write(`data: ${JSON.stringify({ type: 'init', unreadCount })}\n\n`); // Subscribe to user's notifications const subscriber = new Redis(process.env.ARCTICKEY_URL); await subscriber.subscribe(`notifications:${user.id}`); subscriber.on('message', (channel, message) => { res.write(`data: ${message}\n\n`); }); // Heartbeat to keep connection alive const heartbeat = setInterval(() => { res.write(`: heartbeat\n\n`); }, 30000); // Cleanup on disconnect req.on('close', () => { clearInterval(heartbeat); subscriber.unsubscribe(); subscriber.quit(); }); });// Frontend const eventSource = new EventSource('/api/notifications/stream'); eventSource.onmessage = (event) => { const notification = JSON.parse(event.data); if (notification.type === 'init') { updateBadge(notification.unreadCount); } else { showNotification(notification); updateBadge(badge => badge + 1); } }; eventSource.onerror = () => { // Reconnect after 5 seconds setTimeout(() => { eventSource.close(); // Create new connection }, 5000); };React Integration#
// hooks/useNotifications.ts import { useEffect, useState, useCallback } from 'react'; export function useNotifications() { const [notifications, setNotifications] = useState<Notification[]>([]); const [unreadCount, setUnreadCount] = useState(0); useEffect(() => { const eventSource = new EventSource('/api/notifications/stream'); eventSource.onmessage = (event) => { const data = JSON.parse(event.data); if (data.type === 'init') { setUnreadCount(data.unreadCount); } else { setNotifications(prev => [data, ...prev]); setUnreadCount(prev => prev + 1); // Show browser notification if permitted if (Notification.permission === 'granted') { new Notification(data.title, { body: data.body }); } } }; return () => eventSource.close(); }, []); const markAsRead = useCallback(async (id: string) => { await fetch(`/api/notifications/${id}/read`, { method: 'POST' }); setUnreadCount(prev => Math.max(0, prev - 1)); setNotifications(prev => prev.map(n => n.id === id ? { ...n, read: true } : n) ); }, []); return { notifications, unreadCount, markAsRead }; } // Component function NotificationBell() { const { notifications, unreadCount, markAsRead } = useNotifications(); const [open, setOpen] = useState(false); return ( <div className="relative"> <button onClick={() => setOpen(!open)}> 🔔 {unreadCount > 0 && ( <span className="badge">{unreadCount}</span> )} </button> {open && ( <div className="dropdown"> {notifications.map(n => ( <div key={n.id} className={n.read ? 'read' : 'unread'} onClick={() => markAsRead(n.id)} > <strong>{n.title}</strong> <p>{n.body}</p> </div> ))} </div> )} </div> ); }Notification Types & Templates#
const NOTIFICATION_TYPES = { order_shipped: { title: (data) => 'Your order is on the way!', body: (data) => `Order #${data.orderId} shipped via ${data.carrier}`, icon: '📦', action: (data) => `/orders/${data.orderId}`, }, new_follower: { title: (data) => `${data.username} followed you`, body: (data) => `You now have ${data.totalFollowers} followers`, icon: '👤', action: (data) => `/profile/${data.username}`, }, payment_received: { title: (data) => 'Payment received', body: (data) => `${data.amount} ${data.currency} has been added to your account`, icon: '💰', action: () => '/wallet', }, } as const; async function sendTypedNotification<T extends keyof typeof NOTIFICATION_TYPES>( userId: string, type: T, data: Parameters<typeof NOTIFICATION_TYPES[T]['body']>[0] ) { const template = NOTIFICATION_TYPES[type]; await sendNotification(userId, { type, title: template.title(data), body: template.body(data), icon: template.icon, action: template.action(data), data, }); } // Usage await sendTypedNotification('user_123', 'order_shipped', { orderId: 'ORD-456', carrier: 'DHL', });Best Practices#