From 27d2459e1d8d25ab53b4ae994a043a1880a2f222 Mon Sep 17 00:00:00 2001 From: Jerko Steiner Date: Fri, 13 Mar 2020 20:28:39 +0100 Subject: [PATCH] Make socket.ts asynchronous Also do not monkey-patch socket objects with user ids. --- src/server/app.test.ts | 5 +- src/server/app.ts | 22 +++++- src/server/socket.test.ts | 146 +++++++++++++++++++------------------ src/server/socket.ts | 67 +++++++++++------ src/server/store/memory.ts | 16 +++- src/server/store/store.ts | 7 +- src/shared/SocketEvent.ts | 3 +- 7 files changed, 159 insertions(+), 107 deletions(-) diff --git a/src/server/app.test.ts b/src/server/app.test.ts index f0fffea..9467481 100644 --- a/src/server/app.test.ts +++ b/src/server/app.test.ts @@ -65,7 +65,10 @@ describe('server/app', () => { expect((handleSocket as jest.Mock).mock.calls).toEqual([[ socket, io, - jasmine.any(MemoryStore), + { + socketIdByUserId: jasmine.any(MemoryStore), + userIdBySocketId: jasmine.any(MemoryStore), + }, ]]) }) diff --git a/src/server/app.ts b/src/server/app.ts index 0a97e88..93c529c 100644 --- a/src/server/app.ts +++ b/src/server/app.ts @@ -10,6 +10,8 @@ import call from './routes/call' import index from './routes/index' import ejs from 'ejs' import { MemoryStore } from './store' +// import Redis from 'ioredis' +// import redisAdapter from 'socket.io-redis' const debug = _debug('peercalls') const logRequest = _debug('peercalls:requests') @@ -22,6 +24,19 @@ debug(`WebSocket URL: ${SOCKET_URL}`) const app = express() const server = createServer(config, app) export const io = SocketIO(server, { path: SOCKET_URL }) +// const pubClient = new Redis({ +// host: '127.0.0.1', +// port: 6379, +// }) +// const subClient = new Redis({ +// host: '127.0.0.1', +// port: 6379, +// }) +// io.adapter(redisAdapter({ +// key: 'peercalls', +// pubClient, +// subClient, +// })) app.set('x-powered-by', false) app.locals.version = require('../../package.json').version @@ -49,7 +64,10 @@ router.use('/call', call) router.use('/', index) app.use(BASE_URL, router) -const store = new MemoryStore() -io.on('connection', socket => handleSocket(socket, io, store)) +const stores = { + socketIdByUserId: new MemoryStore(), + userIdBySocketId: new MemoryStore(), +} +io.on('connection', socket => handleSocket(socket, io, stores)) export default server diff --git a/src/server/socket.test.ts b/src/server/socket.test.ts index ecf6a1d..e5a9b78 100644 --- a/src/server/socket.test.ts +++ b/src/server/socket.test.ts @@ -1,76 +1,64 @@ import { EventEmitter } from 'events' import { Socket } from 'socket.io' -import { TypedIO, ServerSocket } from '../shared' +import { TypedIO } from '../shared' import handleSocket from './socket' import { MemoryStore, Store } from './store' describe('server/socket', () => { - type SocketMock = Socket & { + type NamespaceMock = Socket & { id: string room?: string join: jest.Mock leave: jest.Mock emit: jest.Mock + clients: (callback: ( + err: Error | undefined, clients: string[] + ) => void) => void } - let socket: SocketMock + let socket: NamespaceMock let io: TypedIO & { - in: jest.Mock<(room: string) => SocketMock> - to: jest.Mock<(room: string) => SocketMock> + in: jest.Mock<(room: string) => NamespaceMock> + to: jest.Mock<(room: string) => NamespaceMock> } let rooms: Record + const socket0 = { + id: 'socket0', + } + const socket1 = { + id: 'socket1', + } + const socket2 = { + id: 'socket2', + } + let emitPromise: Promise beforeEach(() => { - socket = new EventEmitter() as SocketMock + socket = new EventEmitter() as NamespaceMock socket.id = 'socket0' socket.join = jest.fn() socket.leave = jest.fn() rooms = {} + let emitResolve: () => void + emitPromise = new Promise(resolve => { + emitResolve = resolve + }) + + const socketsByRoom: Record = { + room1: [socket0.id], + room2: [socket0.id], + room3: [socket0.id, socket1.id, socket2.id], + } + io = {} as any io.in = io.to = jest.fn().mockImplementation(room => { return (rooms[room] = rooms[room] || { - emit: jest.fn(), - }) - }) - - const sockets = { - socket0: { - id: 'socket0', - userId: 'socket0_userid', - }, - socket1: { - id: 'socket1', - userId: 'socket1_userid', - }, - socket2: { - id: 'socket2', - userId: 'socket2_userid', - }, - } - - io.sockets = { - sockets: sockets as any, - adapter: { - rooms: { - room1: { - socket0: true, - } as any, - room2: { - socket0: true, - } as any, - room3: { - sockets: { - socket0: true, - socket1: true, - socket2: true, - }, - } as any, + emit: jest.fn().mockImplementation(() => emitResolve()), + clients: callback => { + callback(undefined, socketsByRoom[room] || []) }, - } as any, - } as any - - socket.leave = jest.fn() - socket.join = jest.fn() + } as NamespaceMock) + }) }) it('should be a function', () => { @@ -78,24 +66,35 @@ describe('server/socket', () => { }) describe('socket events', () => { - let store: Store + let stores: { + userIdBySocketId: Store + socketIdByUserId: Store + } beforeEach(() => { - store = new MemoryStore() - handleSocket(socket, io, store) + stores = { + userIdBySocketId: new MemoryStore(), + socketIdByUserId: new MemoryStore(), + } + stores.socketIdByUserId.set('a', socket0.id) + stores.userIdBySocketId.set(socket0.id, 'a') + stores.socketIdByUserId.set('b', socket1.id) + stores.userIdBySocketId.set(socket1.id, 'b') + stores.socketIdByUserId.set('c', socket2.id) + stores.userIdBySocketId.set(socket2.id, 'c') + handleSocket(socket, io, stores) }) describe('signal', () => { - it('should broadcast signal to specific user', () => { - store.set('a', 'a-socket-id') - ;(socket as ServerSocket) .userId = 'b' + it('should broadcast signal to specific user', async () => { const signal = { type: 'signal' } - socket.emit('signal', { userId: 'a', signal }) + socket.emit('signal', { userId: 'b', signal }) + await emitPromise - expect(io.to.mock.calls).toEqual([[ 'a-socket-id' ]]) - expect((io.to('a-socket-id').emit as jest.Mock).mock.calls).toEqual([[ + expect(io.to.mock.calls).toEqual([[ socket1.id ]]) + expect((io.to(socket1.id).emit as jest.Mock).mock.calls).toEqual([[ 'signal', { - userId: 'b', + userId: 'a', signal, }, ]]) @@ -103,45 +102,48 @@ describe('server/socket', () => { }) describe('ready', () => { - it('should call socket.leave if socket.room', () => { + it('never calls socket.leave', async () => { socket.room = 'room1' socket.emit('ready', { - userId: 'socket0_userid', + userId: 'a', room: 'room2', }) + await emitPromise - expect(socket.leave.mock.calls).toEqual([[ 'room1' ]]) + expect(socket.leave.mock.calls).toEqual([]) expect(socket.join.mock.calls).toEqual([[ 'room2' ]]) }) - it('should call socket.join to room', () => { + it('should call socket.join to room', async () => { socket.emit('ready', { - userId: 'socket0_userid', + userId: 'b', room: 'room3', }) + await emitPromise expect(socket.join.mock.calls).toEqual([[ 'room3' ]]) }) - it('should emit users', () => { + it('should emit users', async () => { socket.emit('ready', { - userId: 'socket0_userid', + userId: 'a', room: 'room3', }) + await emitPromise - expect(io.to.mock.calls).toEqual([[ 'room3' ]]) + // expect(io.to.mock.calls).toEqual([[ 'room3' ]]) expect((io.to('room3').emit as jest.Mock).mock.calls).toEqual([ [ 'users', { - initiator: 'socket0_userid', + initiator: 'a', users: [{ - socketId: 'socket0', - userId: 'socket0_userid', + socketId: socket0.id, + userId: 'a', }, { - socketId: 'socket1', - userId: 'socket1_userid', + socketId: socket1.id, + userId: 'b', }, { - socketId: 'socket2', - userId: 'socket2_userid', + socketId: socket2.id, + userId: 'c', }], }, ], diff --git a/src/server/socket.ts b/src/server/socket.ts index 9681663..b1c6711 100644 --- a/src/server/socket.ts +++ b/src/server/socket.ts @@ -1,44 +1,54 @@ 'use strict' import _debug from 'debug' -import map from 'lodash/map' import { ServerSocket, TypedIO } from '../shared' import { Store } from './store' const debug = _debug('peercalls:socket') +interface Stores { + userIdBySocketId: Store + socketIdByUserId: Store +} + export default function handleSocket( socket: ServerSocket, io: TypedIO, - store: Store, + stores: Stores, ) { - socket.once('disconnect', () => { - if (socket.userId) { - store.remove(socket.userId) + socket.once('disconnect', async () => { + const userId = await stores.userIdBySocketId.get(socket.id) + if (userId) { + await Promise.all([ + stores.userIdBySocketId.remove(socket.id), + stores.socketIdByUserId.remove(userId), + ]) } }) - socket.on('signal', payload => { + socket.on('signal', async payload => { // debug('signal: %s, payload: %o', socket.userId, payload) - const socketId = store.get(payload.userId) + const socketId = await stores.socketIdByUserId.get(payload.userId) + const userId = await stores.userIdBySocketId.get(socket.id) if (socketId) { io.to(socketId).emit('signal', { - userId: socket.userId, + userId, signal: payload.signal, }) } }) - socket.on('ready', payload => { + socket.on('ready', async payload => { const { userId, room } = payload debug('ready: %s, room: %s', userId, room) - if (socket.room) socket.leave(socket.room) - socket.userId = userId - store.set(userId, socket.id) - socket.room = room + // no need to leave rooms because there will be only one room for the + // duration of the socket connection + await Promise.all([ + stores.socketIdByUserId.set(userId, socket.id), + stores.userIdBySocketId.set(socket.id, userId), + ]) socket.join(room) - socket.room = room - const users = getUsers(room) + const users = await getUsers(room) debug('ready: %s, room: %s, users: %o', userId, room, users) @@ -48,14 +58,25 @@ export default function handleSocket( }) }) - function getUsers (room: string) { - return map(io.sockets.adapter.rooms[room].sockets, (_, socketId) => { - const userSocket = io.sockets.sockets[socketId] as ServerSocket - return { - socketId: socketId, - userId: userSocket.userId, - } - }) + async function getUsers (room: string) { + const socketIds = await getClientsInRoom(room) + const userIds = await stores.userIdBySocketId.getMany(socketIds) + return socketIds.map((socketId, i) => ({ + socketId, + userId: userIds[i], + })) } + async function getClientsInRoom(room: string): Promise { + return new Promise((resolve, reject) => { + io.in(room).clients((err: Error, clients: string[]) => { + if (err) { + reject(err) + } else { + resolve(clients) + } + }) + }) + } } + diff --git a/src/server/store/memory.ts b/src/server/store/memory.ts index 73e8272..0f01694 100644 --- a/src/server/store/memory.ts +++ b/src/server/store/memory.ts @@ -3,15 +3,23 @@ import { Store } from './store' export class MemoryStore implements Store { store: Record = {} - get(key: string): string | undefined { - return this.store[key] + async getMany(keys: string[]): Promise> { + return keys.map(key => this.syncGet(key)) } - set(key: string, value: string) { + private syncGet(key: string): string | undefined { + return this.store[key] + } + + async get(key: string): Promise { + return this.syncGet(key) + } + + async set(key: string, value: string) { this.store[key] = value } - remove(key: string) { + async remove(key: string) { delete this.store[key] } } diff --git a/src/server/store/store.ts b/src/server/store/store.ts index 05e32a2..7181321 100644 --- a/src/server/store/store.ts +++ b/src/server/store/store.ts @@ -1,5 +1,6 @@ export interface Store { - set(key: string, value: string): void - get(key: string): string | undefined - remove(key: string): void + set(key: string, value: string): Promise + get(key: string): Promise + getMany(keys: string[]): Promise> + remove(key: string): Promise } diff --git a/src/shared/SocketEvent.ts b/src/shared/SocketEvent.ts index 1637851..c47f3b9 100644 --- a/src/shared/SocketEvent.ts +++ b/src/shared/SocketEvent.ts @@ -28,8 +28,7 @@ export interface SocketEvent { export type ServerSocket = Omit & - TypedEmitter & - { userId?: string, room?: string } + TypedEmitter export type TypedIO = SocketIO.Server & { to(roomName: string): TypedEmitter