Generate userIDs on server-side

We don't want to depend on:

1) socket.io generated IDs because they change on server reconnect
2) simple-peer generated IDs because they change for every peer
connection

We generate a single ID when the call web page is refreshed and use that
throughout the session (until page refresh).

We keep relations of user-id to socket-id on the server side in memory
and use that to get to the right socket. In the future this might be
replaced with Redis to allow multiple nodes.

If the server is restarted, but people have active calls, we want them
to keep using the active peer connections and only connect to new peers.

Ideally, we do not want to disturb the active peer connections, but peer
connections might be restarted because the in-memory store will not have
the information on for any peers in the room upon restart.
This commit is contained in:
Jerko Steiner 2020-03-13 13:33:51 +01:00
parent ba92214296
commit cd4979c3be
21 changed files with 210 additions and 98 deletions

View File

@ -42,4 +42,6 @@ export const valueOf = jest.fn()
export const callId = 'call1234'
export const userId = 'user1234'
export const iceServers = []

View File

@ -6,7 +6,7 @@ import * as CallActions from './CallActions'
import * as SocketActions from './SocketActions'
import * as constants from '../constants'
import socket from '../socket'
import { callId } from '../window'
import { callId, userId } from '../window'
import { bindActionCreators, createStore, AnyAction, combineReducers, applyMiddleware } from 'redux'
import reducers from '../reducers'
import { middlewares } from '../middlewares'
@ -60,6 +60,7 @@ describe('CallActions', () => {
expect((SocketActions.handshake as jest.Mock).mock.calls).toEqual([[{
socket,
roomName: callId,
userId: userId,
}]])
})

View File

@ -1,6 +1,6 @@
import socket from '../socket'
import { ThunkResult } from '../store'
import { callId } from '../window'
import { callId, userId } from '../window'
import * as NotifyActions from './NotifyActions'
import * as SocketActions from './SocketActions'
@ -25,6 +25,7 @@ async (dispatch, getState) => {
dispatch(SocketActions.handshake({
socket,
roomName: callId,
userId,
}))
dispatch(initialize())
resolve()

View File

@ -12,7 +12,7 @@ import { PEERCALLS, PEER_EVENT_DATA, ME } from '../constants'
describe('PeerActions', () => {
function createSocket () {
const socket = new EventEmitter() as unknown as ClientSocket
socket.id = 'user1'
socket.id = 'socket-id-user-1'
return socket
}
@ -30,7 +30,7 @@ describe('PeerActions', () => {
dispatch = store.dispatch
getState = store.getState
user = { id: 'user2' }
user = { id: 'user1' }
socket = createSocket()
instances = (Peer as any).instances = [];
(Peer as unknown as jest.Mock).mockClear()
@ -40,7 +40,7 @@ describe('PeerActions', () => {
describe('create', () => {
it('creates a new peer', () => {
PeerActions.createPeer({ socket, user, initiator: 'user2', stream })(
PeerActions.createPeer({ socket, user, initiator: 'other-user', stream })(
dispatch, getState)
expect(instances.length).toBe(1)
@ -52,7 +52,7 @@ describe('PeerActions', () => {
it('sets initiator correctly', () => {
PeerActions
.createPeer({
socket, user, initiator: 'user1', stream,
socket, user, initiator: user.id, stream,
})(dispatch, getState)
expect(instances.length).toBe(1)
@ -124,7 +124,7 @@ describe('PeerActions', () => {
const { list } = store.getState().messages
expect(list.length).toBeGreaterThan(0)
expect(list[list.length - 1]).toEqual({
userId: 'user2',
userId: user.id,
timestamp: jasmine.any(String),
image: undefined,
message: 'test',

View File

@ -173,7 +173,7 @@ export function createPeer (options: CreatePeerOptions) {
}
const peer = new Peer({
initiator: socket.id === initiator,
initiator: userId === initiator,
config: { iceServers },
// Allow the peer to receive video, even if it's not sending stream:
// https://github.com/feross/simple-peer/issues/95

View File

@ -9,6 +9,7 @@ import { createStore, Store, GetState } from '../store'
import { ClientSocket } from '../socket'
import { Dispatch } from 'redux'
import { MediaStream } from '../window'
import { SocketEvent } from '../../shared'
describe('SocketActions', () => {
const roomName = 'bla'
@ -29,22 +30,39 @@ describe('SocketActions', () => {
instances = (Peer as any).instances = []
})
const userA = {
socketId: 'socket-a',
userId: 'user-a',
}
const userId = userA.userId
const userB = {
socketId: 'socket-b',
userId: 'user-b',
}
const userC = {
socketId: 'socket-c',
userId: 'user-c',
}
describe('handshake', () => {
describe('users', () => {
beforeEach(() => {
SocketActions.handshake({ socket, roomName })(dispatch, getState)
SocketActions
.handshake({ socket, roomName, userId })(dispatch, getState)
const payload = {
users: [{ id: 'a' }, { id: 'b' }],
initiator: 'a',
users: [userA, userB],
initiator: userA.userId,
}
socket.emit('users', payload)
expect(instances.length).toBe(1)
})
it('adds a peer for each new user and destroys peers for missing', () => {
it('adds a peer for each new user and destroys missing peers', () => {
const payload = {
users: [{ id: 'a' }, { id: 'c' }],
initiator: 'c',
users: [userA, userC],
initiator: userC.userId,
}
socket.emit(constants.SOCKET_EVENT_USERS, payload)
@ -59,16 +77,17 @@ describe('SocketActions', () => {
let data: Peer.SignalData
beforeEach(() => {
data = {} as any
SocketActions.handshake({ socket, roomName })(dispatch, getState)
SocketActions
.handshake({ socket, roomName, userId })(dispatch, getState)
socket.emit('users', {
initiator: 'a',
users: [{ id: 'a' }, { id: 'b' }],
initiator: userA.userId,
users: [userA, userB],
})
})
it('should forward signal to peer', () => {
socket.emit('signal', {
userId: 'b',
userId: userB.userId,
signal: data,
})
@ -94,11 +113,12 @@ describe('SocketActions', () => {
let ready = false
socket.once('ready', () => { ready = true })
SocketActions.handshake({ socket, roomName })(dispatch, getState)
SocketActions
.handshake({ socket, roomName, userId })(dispatch, getState)
socket.emit('users', {
initiator: 'a',
users: [{ id: 'a' }, { id: 'b' }],
initiator: userA.userId,
users: [userA, userB],
})
expect(instances.length).toBe(1)
peer = instances[0]
@ -117,8 +137,8 @@ describe('SocketActions', () => {
it('emits socket signal with user id', done => {
const signal = { bla: 'bla' }
socket.once('signal', (payload: SocketActions.SignalOptions) => {
expect(payload.userId).toEqual('b')
socket.once('signal', (payload: SocketEvent['signal']) => {
expect(payload.userId).toEqual(userB.userId)
expect(payload.signal).toBe(signal)
done()
})
@ -139,8 +159,8 @@ describe('SocketActions', () => {
peer.emit(constants.PEER_EVENT_TRACK, stream.getTracks()[0], stream)
expect(store.getState().streams).toEqual({
b: {
userId: 'b',
[userB.userId]: {
userId: userB.userId,
streams: [{
stream,
type: undefined,
@ -159,8 +179,8 @@ describe('SocketActions', () => {
// test stream with two tracks
peer.emit(constants.PEER_EVENT_TRACK, track, stream)
expect(store.getState().streams).toEqual({
b: {
userId: 'b',
[userB.userId]: {
userId: userB.userId,
streams: [{
stream,
type: undefined,
@ -171,7 +191,7 @@ describe('SocketActions', () => {
})
it('removes stream & peer from store', () => {
expect(store.getState().peers).toEqual({ b: peer })
expect(store.getState().peers).toEqual({ [userB.userId]: peer })
peer.emit('close')
expect(store.getState().streams).toEqual({})
expect(store.getState().peers).toEqual({})

View File

@ -3,9 +3,9 @@ import * as PeerActions from '../actions/PeerActions'
import * as constants from '../constants'
import keyBy from 'lodash/keyBy'
import _debug from 'debug'
import { SignalData } from 'simple-peer'
import { Dispatch, GetState } from '../store'
import { ClientSocket } from '../socket'
import { SocketEvent } from '../../shared'
const debug = _debug('peercalls')
@ -15,24 +15,16 @@ export interface SocketHandlerOptions {
stream?: MediaStream
dispatch: Dispatch
getState: GetState
}
export interface SignalOptions {
signal: SignalData
userId: string
}
export interface UsersOptions {
initiator: string
users: Array<{ id: string }>
}
class SocketHandler {
socket: ClientSocket
roomName: string
stream?: MediaStream
dispatch: Dispatch
getState: GetState
userId: string
constructor (options: SocketHandlerOptions) {
this.socket = options.socket
@ -40,30 +32,36 @@ class SocketHandler {
this.stream = options.stream
this.dispatch = options.dispatch
this.getState = options.getState
this.userId = options.userId
}
handleSignal = ({ userId, signal }: SignalOptions) => {
handleSignal = ({ userId, signal }: SocketEvent['signal']) => {
const { getState } = this
const peer = getState().peers[userId]
// debug('socket signal, userId: %s, signal: %o', userId, signal);
if (!peer) return debug('user: %s, no peer found', userId)
peer.signal(signal)
}
handleUsers = ({ initiator, users }: UsersOptions) => {
handleUsers = ({ initiator, users }: SocketEvent['users']) => {
const { socket, stream, dispatch, getState } = this
debug('socket users: %o', users)
this.dispatch(NotifyActions.info('Connected users: {0}', users.length))
const { peers } = this.getState()
users
.filter(user => !peers[user.id] && user.id !== socket.id)
.filter(
user =>
user.userId && !peers[user.userId] && user.userId !== this.userId)
.forEach(user => PeerActions.createPeer({
socket,
user,
user: {
// users without id should be filtered out
id: user.userId!,
},
initiator,
stream,
})(dispatch, getState))
const newUsersMap = keyBy(users, 'id')
const newUsersMap = keyBy(users, 'userId')
Object.keys(peers)
.filter(id => !newUsersMap[id])
.forEach(id => peers[id].destroy())
@ -73,11 +71,12 @@ class SocketHandler {
export interface HandshakeOptions {
socket: ClientSocket
roomName: string
userId: string
stream?: MediaStream
}
export function handshake (options: HandshakeOptions) {
const { socket, roomName, stream } = options
const { socket, roomName, stream, userId } = options
return (dispatch: Dispatch, getState: GetState) => {
const handler = new SocketHandler({
@ -86,6 +85,7 @@ export function handshake (options: HandshakeOptions) {
stream,
dispatch,
getState,
userId,
})
// remove listeneres to make seocket reusable
@ -95,9 +95,12 @@ export function handshake (options: HandshakeOptions) {
socket.on(constants.SOCKET_EVENT_SIGNAL, handler.handleSignal)
socket.on(constants.SOCKET_EVENT_USERS, handler.handleUsers)
debug('socket.id: %s', socket.id)
debug('userId: %s', userId)
debug('emit ready for room: %s', roomName)
dispatch(NotifyActions.info('Ready for connections'))
socket.emit('ready', roomName)
socket.emit('ready', {
room: roomName,
userId,
})
}
}

View File

@ -36,13 +36,11 @@ export interface AppProps {
}
export interface AppState {
videos: Record<string, unknown>
chatVisible: boolean
}
export default class App extends React.PureComponent<AppProps, AppState> {
state: AppState = {
videos: {},
chatVisible: false,
}
handleShowChat = () => {
@ -92,8 +90,6 @@ export default class App extends React.PureComponent<AppProps, AppState> {
streams,
} = this.props
const { videos } = this.state
const chatVisibleClassName = classnames({
'chat-visible': this.state.chatVisible,
})
@ -140,7 +136,6 @@ export default class App extends React.PureComponent<AppProps, AppState> {
const key = localStreams.userId + '_' + i
return (
<Video
videos={videos}
key={key}
active={active === key}
onClick={toggleActive}
@ -168,7 +163,6 @@ export default class App extends React.PureComponent<AppProps, AppState> {
play={play}
stream={s}
userId={key}
videos={videos}
/>
)
})

View File

@ -25,7 +25,6 @@ describe('components/Video', () => {
render () {
return <Video
ref={this.ref}
videos={this.props.videos}
active={this.props.active}
stream={this.state.stream || this.props.stream}
onClick={this.props.onClick}
@ -38,7 +37,6 @@ describe('components/Video', () => {
}
let component: VideoWrapper
let videos: Record<string, unknown> = {}
let video: Video
let onClick: (userId: string) => void
let mediaStream: MediaStream
@ -57,7 +55,6 @@ describe('components/Video', () => {
}
async function render (args?: Partial<Flags>) {
const flags: Flags = Object.assign({}, defaultFlags, args)
videos = {}
onClick = jest.fn()
mediaStream = new MediaStream()
const div = document.createElement('div')
@ -70,7 +67,6 @@ describe('components/Video', () => {
ReactDOM.render(
<VideoWrapper
ref={instance => resolve(instance!)}
videos={videos}
active={flags.active}
stream={stream}
onClick={onClick}

View File

@ -4,7 +4,7 @@ import socket from '../socket'
import { StreamWithURL } from '../reducers/streams'
export interface VideoProps {
videos: Record<string, unknown>
// videos: Record<string, unknown>
onClick: (userId: string) => void
active: boolean
stream?: StreamWithURL
@ -49,7 +49,7 @@ export default class Video extends React.PureComponent<VideoProps> {
this.componentDidUpdate()
}
componentDidUpdate () {
const { videos, stream } = this.props
const { stream } = this.props
const video = this.videoRef.current!
const mediaStream = stream && stream.stream || null
const url = stream && stream.url
@ -60,7 +60,6 @@ export default class Video extends React.PureComponent<VideoProps> {
} else if (video.src !== url) {
video.src = url || ''
}
videos[socket.id] = video
}
render () {
const { active, mirrored, muted } = this.props

View File

@ -9,6 +9,7 @@ export const valueOf = (id: string) => {
export const baseUrl = valueOf('baseUrl')
export const callId = valueOf('callId')
export const userId = valueOf('userId')
export const iceServers = JSON.parse(valueOf('iceServers')!)
export const MediaStream = window.MediaStream

View File

@ -10,6 +10,7 @@ import { config } from './config'
import handleSocket from './socket'
import SocketIO from 'socket.io'
import request from 'supertest'
import { MemoryStore } from './store'
const io = SocketIO()
@ -61,7 +62,11 @@ describe('server/app', () => {
it('calls handleSocket with socket', () => {
const socket = { hi: 'me socket' }
io.emit('connection', socket)
expect((handleSocket as jest.Mock).mock.calls).toEqual([[ socket, io ]])
expect((handleSocket as jest.Mock).mock.calls).toEqual([[
socket,
io,
jasmine.any(MemoryStore),
]])
})
})

View File

@ -9,6 +9,7 @@ import SocketIO from 'socket.io'
import call from './routes/call'
import index from './routes/index'
import ejs from 'ejs'
import { MemoryStore } from './store'
const debug = _debug('peercalls')
const logRequest = _debug('peercalls:requests')
@ -48,6 +49,7 @@ router.use('/call', call)
router.use('/', index)
app.use(BASE_URL, router)
io.on('connection', socket => handleSocket(socket, io))
const store = new MemoryStore()
io.on('connection', socket => handleSocket(socket, io, store))
export default server

View File

@ -17,6 +17,7 @@ router.get('/:callId', (req, res) => {
const iceServers = turn.processServers(cfgIceServers)
res.render('call', {
callId: encodeURIComponent(req.params.callId),
userId: v4(),
iceServers,
})
})

View File

@ -1,7 +1,8 @@
import { EventEmitter } from 'events'
import { Socket } from 'socket.io'
import { TypedIO } from '../shared'
import { TypedIO, ServerSocket } from '../shared'
import handleSocket from './socket'
import { MemoryStore, Store } from './store'
describe('server/socket', () => {
type SocketMock = Socket & {
@ -32,7 +33,23 @@ describe('server/socket', () => {
})
})
const sockets = {
socket0: {
id: 'socket0',
userId: 'socket0_userid',
},
socket1: {
id: 'socket1',
userId: 'socket1_userid',
},
socket2: {
id: 'socket2',
userId: 'socket2_userid',
},
}
io.sockets = {
sockets: sockets as any,
adapter: {
rooms: {
room1: {
@ -43,9 +60,9 @@ describe('server/socket', () => {
} as any,
room3: {
sockets: {
'socket0': true,
'socket1': true,
'socket2': true,
socket0: true,
socket1: true,
socket2: true,
},
} as any,
},
@ -61,18 +78,24 @@ describe('server/socket', () => {
})
describe('socket events', () => {
beforeEach(() => handleSocket(socket, io))
let store: Store
beforeEach(() => {
store = new MemoryStore()
handleSocket(socket, io, store)
})
describe('signal', () => {
it('should broadcast signal to specific user', () => {
store.set('a', 'a-socket-id')
;(socket as ServerSocket) .userId = 'b'
const signal = { type: 'signal' }
socket.emit('signal', { userId: 'a', signal })
expect(io.to.mock.calls).toEqual([[ 'a' ]])
expect((io.to('a').emit as jest.Mock).mock.calls).toEqual([[
expect(io.to.mock.calls).toEqual([[ 'a-socket-id' ]])
expect((io.to('a-socket-id').emit as jest.Mock).mock.calls).toEqual([[
'signal', {
userId: 'socket0',
userId: 'b',
signal,
},
]])
@ -82,31 +105,43 @@ describe('server/socket', () => {
describe('ready', () => {
it('should call socket.leave if socket.room', () => {
socket.room = 'room1'
socket.emit('ready', 'room2')
socket.emit('ready', {
userId: 'socket0_userid',
room: 'room2',
})
expect(socket.leave.mock.calls).toEqual([[ 'room1' ]])
expect(socket.join.mock.calls).toEqual([[ 'room2' ]])
})
it('should call socket.join to room', () => {
socket.emit('ready', 'room3')
socket.emit('ready', {
userId: 'socket0_userid',
room: 'room3',
})
expect(socket.join.mock.calls).toEqual([[ 'room3' ]])
})
it('should emit users', () => {
socket.emit('ready', 'room3')
socket.emit('ready', {
userId: 'socket0_userid',
room: 'room3',
})
expect(io.to.mock.calls).toEqual([[ 'room3' ]])
expect((io.to('room3').emit as jest.Mock).mock.calls).toEqual([
[
'users', {
initiator: 'socket0',
initiator: 'socket0_userid',
users: [{
id: 'socket0',
socketId: 'socket0',
userId: 'socket0_userid',
}, {
id: 'socket1',
socketId: 'socket1',
userId: 'socket1_userid',
}, {
id: 'socket2',
socketId: 'socket2',
userId: 'socket2_userid',
}],
},
],

View File

@ -2,38 +2,59 @@
import _debug from 'debug'
import map from 'lodash/map'
import { ServerSocket, TypedIO } from '../shared'
import { Store } from './store'
const debug = _debug('peercalls:socket')
export default function handleSocket(socket: ServerSocket, io: TypedIO) {
socket.on('signal', payload => {
// debug('signal: %s, payload: %o', socket.id, payload)
io.to(payload.userId).emit('signal', {
userId: socket.id,
signal: payload.signal,
})
export default function handleSocket(
socket: ServerSocket,
io: TypedIO,
store: Store,
) {
socket.once('disconnect', () => {
if (socket.userId) {
store.remove(socket.userId)
}
})
socket.on('ready', roomName => {
debug('ready: %s, room: %s', socket.id, roomName)
socket.on('signal', payload => {
// debug('signal: %s, payload: %o', socket.userId, payload)
const socketId = store.get(payload.userId)
if (socketId) {
io.to(socketId).emit('signal', {
userId: socket.userId,
signal: payload.signal,
})
}
})
socket.on('ready', payload => {
const { userId, room } = payload
debug('ready: %s, room: %s', userId, room)
if (socket.room) socket.leave(socket.room)
socket.room = roomName
socket.join(roomName)
socket.room = roomName
socket.userId = userId
store.set(userId, socket.id)
socket.room = room
socket.join(room)
socket.room = room
const users = getUsers(roomName)
const users = getUsers(room)
debug('ready: %s, room: %s, users: %o', socket.id, roomName, users)
debug('ready: %s, room: %s, users: %o', room, users)
io.to(roomName).emit('users', {
initiator: socket.id,
io.to(room).emit('users', {
initiator: userId,
users,
})
})
function getUsers (roomName: string) {
return map(io.sockets.adapter.rooms[roomName].sockets, (_, id) => {
return { id }
function getUsers (room: string) {
return map(io.sockets.adapter.rooms[room].sockets, (_, socketId) => {
const userSocket = io.sockets.sockets[socketId] as ServerSocket
return {
socketId: socketId,
userId: userSocket.userId,
}
})
}

View File

@ -0,0 +1,2 @@
export * from './store'
export * from './memory'

View File

@ -0,0 +1,17 @@
import { Store } from './store'
export class MemoryStore implements Store {
store: Record<string, string> = {}
get(key: string): string | undefined {
return this.store[key]
}
set(key: string, value: string) {
this.store[key] = value
}
remove(key: string) {
delete this.store[key]
}
}

View File

@ -0,0 +1,5 @@
export interface Store {
set(key: string, value: string): void
get(key: string): string | undefined
remove(key: string): void
}

View File

@ -2,7 +2,13 @@ import { TypedEmitter, TypedEmitterKeys } from './TypedEmitter'
import { SignalData } from 'simple-peer'
export interface User {
id: string
socketId: string
userId?: string
}
export interface Ready {
room: string
userId: string
}
export interface SocketEvent {
@ -17,13 +23,13 @@ export interface SocketEvent {
}
connect: undefined
disconnect: undefined
ready: string
ready: Ready
}
export type ServerSocket =
Omit<SocketIO.Socket, TypedEmitterKeys> &
TypedEmitter<SocketEvent> &
{ room?: string }
{ userId?: string, room?: string }
export type TypedIO = SocketIO.Server & {
to(roomName: string): TypedEmitter<SocketEvent>

View File

@ -8,6 +8,7 @@
<input type="hidden" id="baseUrl" value="<%= baseUrl %>">
<input type="hidden" id="callId" value="<%= callId %>">
<input type="hidden" id="iceServers" value='<%- JSON.stringify(iceServers) %>'>
<input type="hidden" id="userId" value="<%= userId %>">
<div id="container"></div>
<script src="<%= baseUrl + '/static/index.js' %>"></script>