Make socket.ts asynchronous

Also do not monkey-patch socket objects with user ids.
This commit is contained in:
Jerko Steiner 2020-03-13 20:28:39 +01:00
parent 6459aa6228
commit 27d2459e1d
7 changed files with 159 additions and 107 deletions

View File

@ -65,7 +65,10 @@ describe('server/app', () => {
expect((handleSocket as jest.Mock).mock.calls).toEqual([[ expect((handleSocket as jest.Mock).mock.calls).toEqual([[
socket, socket,
io, io,
jasmine.any(MemoryStore), {
socketIdByUserId: jasmine.any(MemoryStore),
userIdBySocketId: jasmine.any(MemoryStore),
},
]]) ]])
}) })

View File

@ -10,6 +10,8 @@ import call from './routes/call'
import index from './routes/index' import index from './routes/index'
import ejs from 'ejs' import ejs from 'ejs'
import { MemoryStore } from './store' import { MemoryStore } from './store'
// import Redis from 'ioredis'
// import redisAdapter from 'socket.io-redis'
const debug = _debug('peercalls') const debug = _debug('peercalls')
const logRequest = _debug('peercalls:requests') const logRequest = _debug('peercalls:requests')
@ -22,6 +24,19 @@ debug(`WebSocket URL: ${SOCKET_URL}`)
const app = express() const app = express()
const server = createServer(config, app) const server = createServer(config, app)
export const io = SocketIO(server, { path: SOCKET_URL }) export const io = SocketIO(server, { path: SOCKET_URL })
// const pubClient = new Redis({
// host: '127.0.0.1',
// port: 6379,
// })
// const subClient = new Redis({
// host: '127.0.0.1',
// port: 6379,
// })
// io.adapter(redisAdapter({
// key: 'peercalls',
// pubClient,
// subClient,
// }))
app.set('x-powered-by', false) app.set('x-powered-by', false)
app.locals.version = require('../../package.json').version app.locals.version = require('../../package.json').version
@ -49,7 +64,10 @@ router.use('/call', call)
router.use('/', index) router.use('/', index)
app.use(BASE_URL, router) app.use(BASE_URL, router)
const store = new MemoryStore() const stores = {
io.on('connection', socket => handleSocket(socket, io, store)) socketIdByUserId: new MemoryStore(),
userIdBySocketId: new MemoryStore(),
}
io.on('connection', socket => handleSocket(socket, io, stores))
export default server export default server

View File

@ -1,76 +1,64 @@
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import { Socket } from 'socket.io' import { Socket } from 'socket.io'
import { TypedIO, ServerSocket } from '../shared' import { TypedIO } from '../shared'
import handleSocket from './socket' import handleSocket from './socket'
import { MemoryStore, Store } from './store' import { MemoryStore, Store } from './store'
describe('server/socket', () => { describe('server/socket', () => {
type SocketMock = Socket & { type NamespaceMock = Socket & {
id: string id: string
room?: string room?: string
join: jest.Mock join: jest.Mock
leave: jest.Mock leave: jest.Mock
emit: jest.Mock emit: jest.Mock
clients: (callback: (
err: Error | undefined, clients: string[]
) => void) => void
} }
let socket: SocketMock let socket: NamespaceMock
let io: TypedIO & { let io: TypedIO & {
in: jest.Mock<(room: string) => SocketMock> in: jest.Mock<(room: string) => NamespaceMock>
to: jest.Mock<(room: string) => SocketMock> to: jest.Mock<(room: string) => NamespaceMock>
} }
let rooms: Record<string, {emit: any}> let rooms: Record<string, {emit: any}>
const socket0 = {
id: 'socket0',
}
const socket1 = {
id: 'socket1',
}
const socket2 = {
id: 'socket2',
}
let emitPromise: Promise<void>
beforeEach(() => { beforeEach(() => {
socket = new EventEmitter() as SocketMock socket = new EventEmitter() as NamespaceMock
socket.id = 'socket0' socket.id = 'socket0'
socket.join = jest.fn() socket.join = jest.fn()
socket.leave = jest.fn() socket.leave = jest.fn()
rooms = {} rooms = {}
let emitResolve: () => void
emitPromise = new Promise(resolve => {
emitResolve = resolve
})
const socketsByRoom: Record<string, string[]> = {
room1: [socket0.id],
room2: [socket0.id],
room3: [socket0.id, socket1.id, socket2.id],
}
io = {} as any io = {} as any
io.in = io.to = jest.fn().mockImplementation(room => { io.in = io.to = jest.fn().mockImplementation(room => {
return (rooms[room] = rooms[room] || { return (rooms[room] = rooms[room] || {
emit: jest.fn(), emit: jest.fn().mockImplementation(() => emitResolve()),
}) clients: callback => {
}) callback(undefined, socketsByRoom[room] || [])
const sockets = {
socket0: {
id: 'socket0',
userId: 'socket0_userid',
},
socket1: {
id: 'socket1',
userId: 'socket1_userid',
},
socket2: {
id: 'socket2',
userId: 'socket2_userid',
},
}
io.sockets = {
sockets: sockets as any,
adapter: {
rooms: {
room1: {
socket0: true,
} as any,
room2: {
socket0: true,
} as any,
room3: {
sockets: {
socket0: true,
socket1: true,
socket2: true,
},
} as any,
}, },
} as any, } as NamespaceMock)
} as any })
socket.leave = jest.fn()
socket.join = jest.fn()
}) })
it('should be a function', () => { it('should be a function', () => {
@ -78,24 +66,35 @@ describe('server/socket', () => {
}) })
describe('socket events', () => { describe('socket events', () => {
let store: Store let stores: {
userIdBySocketId: Store
socketIdByUserId: Store
}
beforeEach(() => { beforeEach(() => {
store = new MemoryStore() stores = {
handleSocket(socket, io, store) userIdBySocketId: new MemoryStore(),
socketIdByUserId: new MemoryStore(),
}
stores.socketIdByUserId.set('a', socket0.id)
stores.userIdBySocketId.set(socket0.id, 'a')
stores.socketIdByUserId.set('b', socket1.id)
stores.userIdBySocketId.set(socket1.id, 'b')
stores.socketIdByUserId.set('c', socket2.id)
stores.userIdBySocketId.set(socket2.id, 'c')
handleSocket(socket, io, stores)
}) })
describe('signal', () => { describe('signal', () => {
it('should broadcast signal to specific user', () => { it('should broadcast signal to specific user', async () => {
store.set('a', 'a-socket-id')
;(socket as ServerSocket) .userId = 'b'
const signal = { type: 'signal' } const signal = { type: 'signal' }
socket.emit('signal', { userId: 'a', signal }) socket.emit('signal', { userId: 'b', signal })
await emitPromise
expect(io.to.mock.calls).toEqual([[ 'a-socket-id' ]]) expect(io.to.mock.calls).toEqual([[ socket1.id ]])
expect((io.to('a-socket-id').emit as jest.Mock).mock.calls).toEqual([[ expect((io.to(socket1.id).emit as jest.Mock).mock.calls).toEqual([[
'signal', { 'signal', {
userId: 'b', userId: 'a',
signal, signal,
}, },
]]) ]])
@ -103,45 +102,48 @@ describe('server/socket', () => {
}) })
describe('ready', () => { describe('ready', () => {
it('should call socket.leave if socket.room', () => { it('never calls socket.leave', async () => {
socket.room = 'room1' socket.room = 'room1'
socket.emit('ready', { socket.emit('ready', {
userId: 'socket0_userid', userId: 'a',
room: 'room2', room: 'room2',
}) })
await emitPromise
expect(socket.leave.mock.calls).toEqual([[ 'room1' ]]) expect(socket.leave.mock.calls).toEqual([])
expect(socket.join.mock.calls).toEqual([[ 'room2' ]]) expect(socket.join.mock.calls).toEqual([[ 'room2' ]])
}) })
it('should call socket.join to room', () => { it('should call socket.join to room', async () => {
socket.emit('ready', { socket.emit('ready', {
userId: 'socket0_userid', userId: 'b',
room: 'room3', room: 'room3',
}) })
await emitPromise
expect(socket.join.mock.calls).toEqual([[ 'room3' ]]) expect(socket.join.mock.calls).toEqual([[ 'room3' ]])
}) })
it('should emit users', () => { it('should emit users', async () => {
socket.emit('ready', { socket.emit('ready', {
userId: 'socket0_userid', userId: 'a',
room: 'room3', room: 'room3',
}) })
await emitPromise
expect(io.to.mock.calls).toEqual([[ 'room3' ]]) // expect(io.to.mock.calls).toEqual([[ 'room3' ]])
expect((io.to('room3').emit as jest.Mock).mock.calls).toEqual([ expect((io.to('room3').emit as jest.Mock).mock.calls).toEqual([
[ [
'users', { 'users', {
initiator: 'socket0_userid', initiator: 'a',
users: [{ users: [{
socketId: 'socket0', socketId: socket0.id,
userId: 'socket0_userid', userId: 'a',
}, { }, {
socketId: 'socket1', socketId: socket1.id,
userId: 'socket1_userid', userId: 'b',
}, { }, {
socketId: 'socket2', socketId: socket2.id,
userId: 'socket2_userid', userId: 'c',
}], }],
}, },
], ],

View File

@ -1,44 +1,54 @@
'use strict' 'use strict'
import _debug from 'debug' import _debug from 'debug'
import map from 'lodash/map'
import { ServerSocket, TypedIO } from '../shared' import { ServerSocket, TypedIO } from '../shared'
import { Store } from './store' import { Store } from './store'
const debug = _debug('peercalls:socket') const debug = _debug('peercalls:socket')
interface Stores {
userIdBySocketId: Store
socketIdByUserId: Store
}
export default function handleSocket( export default function handleSocket(
socket: ServerSocket, socket: ServerSocket,
io: TypedIO, io: TypedIO,
store: Store, stores: Stores,
) { ) {
socket.once('disconnect', () => { socket.once('disconnect', async () => {
if (socket.userId) { const userId = await stores.userIdBySocketId.get(socket.id)
store.remove(socket.userId) if (userId) {
await Promise.all([
stores.userIdBySocketId.remove(socket.id),
stores.socketIdByUserId.remove(userId),
])
} }
}) })
socket.on('signal', payload => { socket.on('signal', async payload => {
// debug('signal: %s, payload: %o', socket.userId, payload) // debug('signal: %s, payload: %o', socket.userId, payload)
const socketId = store.get(payload.userId) const socketId = await stores.socketIdByUserId.get(payload.userId)
const userId = await stores.userIdBySocketId.get(socket.id)
if (socketId) { if (socketId) {
io.to(socketId).emit('signal', { io.to(socketId).emit('signal', {
userId: socket.userId, userId,
signal: payload.signal, signal: payload.signal,
}) })
} }
}) })
socket.on('ready', payload => { socket.on('ready', async payload => {
const { userId, room } = payload const { userId, room } = payload
debug('ready: %s, room: %s', userId, room) debug('ready: %s, room: %s', userId, room)
if (socket.room) socket.leave(socket.room) // no need to leave rooms because there will be only one room for the
socket.userId = userId // duration of the socket connection
store.set(userId, socket.id) await Promise.all([
socket.room = room stores.socketIdByUserId.set(userId, socket.id),
stores.userIdBySocketId.set(socket.id, userId),
])
socket.join(room) socket.join(room)
socket.room = room
const users = getUsers(room) const users = await getUsers(room)
debug('ready: %s, room: %s, users: %o', userId, room, users) debug('ready: %s, room: %s, users: %o', userId, room, users)
@ -48,14 +58,25 @@ export default function handleSocket(
}) })
}) })
function getUsers (room: string) { async function getUsers (room: string) {
return map(io.sockets.adapter.rooms[room].sockets, (_, socketId) => { const socketIds = await getClientsInRoom(room)
const userSocket = io.sockets.sockets[socketId] as ServerSocket const userIds = await stores.userIdBySocketId.getMany(socketIds)
return { return socketIds.map((socketId, i) => ({
socketId: socketId, socketId,
userId: userSocket.userId, userId: userIds[i],
} }))
})
} }
async function getClientsInRoom(room: string): Promise<string[]> {
return new Promise((resolve, reject) => {
io.in(room).clients((err: Error, clients: string[]) => {
if (err) {
reject(err)
} else {
resolve(clients)
}
})
})
}
} }

View File

@ -3,15 +3,23 @@ import { Store } from './store'
export class MemoryStore implements Store { export class MemoryStore implements Store {
store: Record<string, string> = {} store: Record<string, string> = {}
get(key: string): string | undefined { async getMany(keys: string[]): Promise<Array<string | undefined>> {
return this.store[key] return keys.map(key => this.syncGet(key))
} }
set(key: string, value: string) { private syncGet(key: string): string | undefined {
return this.store[key]
}
async get(key: string): Promise<string | undefined> {
return this.syncGet(key)
}
async set(key: string, value: string) {
this.store[key] = value this.store[key] = value
} }
remove(key: string) { async remove(key: string) {
delete this.store[key] delete this.store[key]
} }
} }

View File

@ -1,5 +1,6 @@
export interface Store { export interface Store {
set(key: string, value: string): void set(key: string, value: string): Promise<void>
get(key: string): string | undefined get(key: string): Promise<string | undefined>
remove(key: string): void getMany(keys: string[]): Promise<Array<string | undefined>>
remove(key: string): Promise<void>
} }

View File

@ -28,8 +28,7 @@ export interface SocketEvent {
export type ServerSocket = export type ServerSocket =
Omit<SocketIO.Socket, TypedEmitterKeys> & Omit<SocketIO.Socket, TypedEmitterKeys> &
TypedEmitter<SocketEvent> & TypedEmitter<SocketEvent>
{ userId?: string, room?: string }
export type TypedIO = SocketIO.Server & { export type TypedIO = SocketIO.Server & {
to(roomName: string): TypedEmitter<SocketEvent> to(roomName: string): TypedEmitter<SocketEvent>