From 5ec24dd3992afcc57fedb554de7e60449a3526b1 Mon Sep 17 00:00:00 2001 From: Jerko Steiner Date: Wed, 6 Apr 2016 20:44:10 -0400 Subject: [PATCH] Separate socket/peer event handlers --- Makefile | 2 +- src/client/peer/__tests__/handshake-test.js | 36 +++---- src/client/peer/handshake.js | 82 ++-------------- src/client/peer/peers.js | 101 ++++++++++++++++++++ src/server/socket.js | 2 +- 5 files changed, 131 insertions(+), 92 deletions(-) create mode 100644 src/client/peer/peers.js diff --git a/Makefile b/Makefile index 7599d50..5212c0d 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ lint: .PHONY: test test: - jest + jest --verbose .PHONY: testify testify: diff --git a/src/client/peer/__tests__/handshake-test.js b/src/client/peer/__tests__/handshake-test.js index 495c06f..2ee7247 100644 --- a/src/client/peer/__tests__/handshake-test.js +++ b/src/client/peer/__tests__/handshake-test.js @@ -1,31 +1,35 @@ jest.unmock('../handshake.js'); +jest.unmock('../peers.js'); jest.unmock('events'); jest.unmock('underscore'); +const EventEmitter = require('events').EventEmitter; +const Peer = require('../Peer.js'); const dispatcher = require('../../dispatcher/dispatcher.js'); const handshake = require('../handshake.js'); -const Peer = require('../Peer.js'); -const EventEmitter = require('events').EventEmitter; +const peers = require('../peers.js'); describe('handshake', () => { - let socket, peers; + let socket, peerInstances; beforeEach(() => { socket = new EventEmitter(); socket.id = 'a'; - peers = []; + peerInstances = []; Peer.init = jest.genMockFunction().mockImplementation(() => { let peer = new EventEmitter(); peer.destroy = jest.genMockFunction(); peer.signal = jest.genMockFunction(); - peers.push(peer); + peerInstances.push(peer); return peer; }); dispatcher.dispatch.mockClear(); }); + afterEach(() => peers.clear()); + describe('socket events', () => { describe('users', () => { @@ -39,7 +43,7 @@ describe('handshake', () => { initiator: '/#a', }; socket.emit('users', payload); - expect(peers.length).toBe(2); + expect(peerInstances.length).toBe(2); // when payload = { @@ -49,10 +53,10 @@ describe('handshake', () => { socket.emit('users', payload); // then - expect(peers.length).toBe(3); - expect(peers[0].destroy.mock.calls.length).toBe(0); - expect(peers[1].destroy.mock.calls.length).toBe(1); - expect(peers[2].destroy.mock.calls.length).toBe(0); + expect(peerInstances.length).toBe(3); + expect(peerInstances[0].destroy.mock.calls.length).toBe(0); + expect(peerInstances[1].destroy.mock.calls.length).toBe(1); + expect(peerInstances[2].destroy.mock.calls.length).toBe(0); }); }); @@ -74,8 +78,8 @@ describe('handshake', () => { data }); - expect(peers.length).toBe(1); - expect(peers[0].signal.mock.calls.length).toBe(1); + expect(peerInstances.length).toBe(1); + expect(peerInstances[0].signal.mock.calls.length).toBe(1); }); it('does nothing if no peer', () => { @@ -84,8 +88,8 @@ describe('handshake', () => { data }); - expect(peers.length).toBe(1); - expect(peers[0].signal.mock.calls.length).toBe(0); + expect(peerInstances.length).toBe(1); + expect(peerInstances[0].signal.mock.calls.length).toBe(0); }); }); @@ -105,8 +109,8 @@ describe('handshake', () => { users: [{ id: 'a' }], initiator: '/#a' }); - expect(peers.length).toBe(1); - peer = peers[0]; + expect(peerInstances.length).toBe(1); + peer = peerInstances[0]; expect(ready).toBeDefined(); }); diff --git a/src/client/peer/handshake.js b/src/client/peer/handshake.js index d49be8f..5a370a5 100644 --- a/src/client/peer/handshake.js +++ b/src/client/peer/handshake.js @@ -1,81 +1,17 @@ 'use strict'; -const Peer = require('./Peer.js'); -const debug = require('debug')('peer-calls:peer'); -const dispatcher = require('../dispatcher/dispatcher.js'); -const notify = require('../action/notify.js'); const _ = require('underscore'); +const debug = require('debug')('peer-calls:peer'); +const notify = require('../action/notify.js'); +const peers = require('./peers.js'); function init(socket, roomName, stream) { - let peers = {}; function createPeer(user, initiator) { - debug('create peer: %s', user.id); - notify.warn('Connecting to peer...'); - - let peer = peers[user.id] = Peer.init({ - initiator: '/#' + socket.id === initiator, - stream, - config: { - iceServers: [{ - url: 'stun:23.21.150.121', - urls: 'stun:23.21.150.121' - }, { - url: 'turn:numb.viagenie.ca', - credential: 'muazkh', - username: 'webrtc@live.com' - }] - } - }); - - peer.once('error', err => { - debug('peer: %s, error %s', user.id, err.stack); - notify.error('A peer connection error occurred'); - destroyPeer(user.id); - }); - - peer.on('signal', signal => { - debug('peer: %s, signal: %o', user.id, signal); - - let payload = { userId: user.id, signal }; - socket.emit('signal', payload); - }); - - peer.once('connect', () => { - debug('peer: %s, connect', user.id); - notify.warn('Peer connection established'); - dispatcher.dispatch({ type: 'play' }); - }); - - peer.on('stream', stream => { - debug('peer: %s, stream', user.id); - dispatcher.dispatch({ - type: 'add-stream', - userId: user.id, - stream - }); - }); - - peer.once('close', () => { - debug('peer: %s, close', user.id); - notify.error('Peer connection closed'); - dispatcher.dispatch({ - type: 'remove-stream', - userId: user.id - }); - delete peers[user.id]; - }); - } - - function destroyPeer(userId) { - debug('destroy peer: %s', userId); - let peer = peers[userId]; - if (!peer) return debug('peer: %s peer not found', userId); - peer.destroy(); - delete peers[userId]; + return peers.create({ socket, user, initiator, stream }); } socket.on('signal', payload => { - let peer = peers[payload.userId]; + let peer = peers.get(payload.userId); let signal = payload.signal; // debug('socket signal, userId: %s, signal: %o', payload.userId, signal); @@ -83,21 +19,19 @@ function init(socket, roomName, stream) { peer.signal(signal); }); - socket.on('users', payload => { let { initiator, users } = payload; debug('socket users: %o', users); notify.info('Connected users: {0}', users.length); users - .filter(user => !peers[user.id] && user.id !== '/#' + socket.id) + .filter(user => !peers.get(user.id) && user.id !== '/#' + socket.id) .forEach(user => createPeer(user, initiator)); let newUsersMap = _.indexBy(users, 'id'); - _.chain(peers) - .map((peer, id) => id) + peers.getIds() .filter(id => !newUsersMap[id]) - .each(destroyPeer); + .forEach(peers.destroy); }); debug('socket.id: %s', socket.id); diff --git a/src/client/peer/peers.js b/src/client/peer/peers.js new file mode 100644 index 0000000..6c59231 --- /dev/null +++ b/src/client/peer/peers.js @@ -0,0 +1,101 @@ +const _ = require('underscore'); +const Peer = require('./Peer.js'); +const debug = require('debug')('peer-calls:peer'); +const dispatcher = require('../dispatcher/dispatcher.js'); +const notify = require('../action/notify.js'); + +let peers = {}; + +/** + * @param {Socket} socket + * @param {User} user + * @param {String} user.id + * @param {Boolean} [initiator=false] + * @param {MediaStream} [stream] + */ +function create({ socket, user, initiator, stream }) { + debug('create peer: %s', user.id); + notify.warn('Connecting to peer...'); + + if (peers[user.id]) { + notify.info('Cleaning up old connection...'); + destroy(user.id); + } + + let peer = peers[user.id] = Peer.init({ + initiator: '/#' + socket.id === initiator, + stream, + config: { + iceServers: [{ + url: 'stun:23.21.150.121', + urls: 'stun:23.21.150.121' + }, { + url: 'turn:numb.viagenie.ca', + credential: 'muazkh', + username: 'webrtc@live.com' + }] + } + }); + + peer.once('error', err => { + debug('peer: %s, error %s', user.id, err.stack); + notify.error('A peer connection error occurred'); + destroy(user.id); + }); + + peer.on('signal', signal => { + debug('peer: %s, signal: %o', user.id, signal); + + let payload = { userId: user.id, signal }; + socket.emit('signal', payload); + }); + + peer.once('connect', () => { + debug('peer: %s, connect', user.id); + notify.warn('Peer connection established'); + dispatcher.dispatch({ type: 'play' }); + }); + + peer.on('stream', stream => { + debug('peer: %s, stream', user.id); + dispatcher.dispatch({ + type: 'add-stream', + userId: user.id, + stream + }); + }); + + peer.once('close', () => { + debug('peer: %s, close', user.id); + notify.error('Peer connection closed'); + dispatcher.dispatch({ + type: 'remove-stream', + userId: user.id + }); + delete peers[user.id]; + }); +} + +function get(userId) { + return peers[userId]; +} + +function getIds() { + return _.map(peers, (peer, id) => id); +} + +function clear() { + debug('clear'); + _.each(peers, (_, userId) => destroy(userId)); + peers = {}; +} + +function destroy(userId) { + debug('destroy peer: %s', userId); + let peer = peers[userId]; + if (!peer) return debug('peer: %s peer not found', userId); + peer.destroy(); + delete peers[userId]; +} + +module.exports = { create, get, getIds, destroy, clear }; diff --git a/src/server/socket.js b/src/server/socket.js index 63a54d9..4e45aff 100644 --- a/src/server/socket.js +++ b/src/server/socket.js @@ -5,7 +5,7 @@ const _ = require('underscore'); module.exports = function(socket, io) { socket.on('signal', payload => { - debug('signal: %s, payload: %o', socket.id, payload); + // debug('signal: %s, payload: %o', socket.id, payload); io.to(payload.userId).emit('signal', { userId: socket.id, signal: payload.signal