Separate socket/peer event handlers

This commit is contained in:
Jerko Steiner 2016-04-06 20:44:10 -04:00
parent a71fc2cfb5
commit 5ec24dd399
5 changed files with 131 additions and 92 deletions

View File

@ -24,7 +24,7 @@ lint:
.PHONY: test
test:
jest
jest --verbose
.PHONY: testify
testify:

View File

@ -1,31 +1,35 @@
jest.unmock('../handshake.js');
jest.unmock('../peers.js');
jest.unmock('events');
jest.unmock('underscore');
const EventEmitter = require('events').EventEmitter;
const Peer = require('../Peer.js');
const dispatcher = require('../../dispatcher/dispatcher.js');
const handshake = require('../handshake.js');
const Peer = require('../Peer.js');
const EventEmitter = require('events').EventEmitter;
const peers = require('../peers.js');
describe('handshake', () => {
let socket, peers;
let socket, peerInstances;
beforeEach(() => {
socket = new EventEmitter();
socket.id = 'a';
peers = [];
peerInstances = [];
Peer.init = jest.genMockFunction().mockImplementation(() => {
let peer = new EventEmitter();
peer.destroy = jest.genMockFunction();
peer.signal = jest.genMockFunction();
peers.push(peer);
peerInstances.push(peer);
return peer;
});
dispatcher.dispatch.mockClear();
});
afterEach(() => peers.clear());
describe('socket events', () => {
describe('users', () => {
@ -39,7 +43,7 @@ describe('handshake', () => {
initiator: '/#a',
};
socket.emit('users', payload);
expect(peers.length).toBe(2);
expect(peerInstances.length).toBe(2);
// when
payload = {
@ -49,10 +53,10 @@ describe('handshake', () => {
socket.emit('users', payload);
// then
expect(peers.length).toBe(3);
expect(peers[0].destroy.mock.calls.length).toBe(0);
expect(peers[1].destroy.mock.calls.length).toBe(1);
expect(peers[2].destroy.mock.calls.length).toBe(0);
expect(peerInstances.length).toBe(3);
expect(peerInstances[0].destroy.mock.calls.length).toBe(0);
expect(peerInstances[1].destroy.mock.calls.length).toBe(1);
expect(peerInstances[2].destroy.mock.calls.length).toBe(0);
});
});
@ -74,8 +78,8 @@ describe('handshake', () => {
data
});
expect(peers.length).toBe(1);
expect(peers[0].signal.mock.calls.length).toBe(1);
expect(peerInstances.length).toBe(1);
expect(peerInstances[0].signal.mock.calls.length).toBe(1);
});
it('does nothing if no peer', () => {
@ -84,8 +88,8 @@ describe('handshake', () => {
data
});
expect(peers.length).toBe(1);
expect(peers[0].signal.mock.calls.length).toBe(0);
expect(peerInstances.length).toBe(1);
expect(peerInstances[0].signal.mock.calls.length).toBe(0);
});
});
@ -105,8 +109,8 @@ describe('handshake', () => {
users: [{ id: 'a' }],
initiator: '/#a'
});
expect(peers.length).toBe(1);
peer = peers[0];
expect(peerInstances.length).toBe(1);
peer = peerInstances[0];
expect(ready).toBeDefined();
});

View File

@ -1,81 +1,17 @@
'use strict';
const Peer = require('./Peer.js');
const debug = require('debug')('peer-calls:peer');
const dispatcher = require('../dispatcher/dispatcher.js');
const notify = require('../action/notify.js');
const _ = require('underscore');
const debug = require('debug')('peer-calls:peer');
const notify = require('../action/notify.js');
const peers = require('./peers.js');
function init(socket, roomName, stream) {
let peers = {};
function createPeer(user, initiator) {
debug('create peer: %s', user.id);
notify.warn('Connecting to peer...');
let peer = peers[user.id] = Peer.init({
initiator: '/#' + socket.id === initiator,
stream,
config: {
iceServers: [{
url: 'stun:23.21.150.121',
urls: 'stun:23.21.150.121'
}, {
url: 'turn:numb.viagenie.ca',
credential: 'muazkh',
username: 'webrtc@live.com'
}]
}
});
peer.once('error', err => {
debug('peer: %s, error %s', user.id, err.stack);
notify.error('A peer connection error occurred');
destroyPeer(user.id);
});
peer.on('signal', signal => {
debug('peer: %s, signal: %o', user.id, signal);
let payload = { userId: user.id, signal };
socket.emit('signal', payload);
});
peer.once('connect', () => {
debug('peer: %s, connect', user.id);
notify.warn('Peer connection established');
dispatcher.dispatch({ type: 'play' });
});
peer.on('stream', stream => {
debug('peer: %s, stream', user.id);
dispatcher.dispatch({
type: 'add-stream',
userId: user.id,
stream
});
});
peer.once('close', () => {
debug('peer: %s, close', user.id);
notify.error('Peer connection closed');
dispatcher.dispatch({
type: 'remove-stream',
userId: user.id
});
delete peers[user.id];
});
}
function destroyPeer(userId) {
debug('destroy peer: %s', userId);
let peer = peers[userId];
if (!peer) return debug('peer: %s peer not found', userId);
peer.destroy();
delete peers[userId];
return peers.create({ socket, user, initiator, stream });
}
socket.on('signal', payload => {
let peer = peers[payload.userId];
let peer = peers.get(payload.userId);
let signal = payload.signal;
// debug('socket signal, userId: %s, signal: %o', payload.userId, signal);
@ -83,21 +19,19 @@ function init(socket, roomName, stream) {
peer.signal(signal);
});
socket.on('users', payload => {
let { initiator, users } = payload;
debug('socket users: %o', users);
notify.info('Connected users: {0}', users.length);
users
.filter(user => !peers[user.id] && user.id !== '/#' + socket.id)
.filter(user => !peers.get(user.id) && user.id !== '/#' + socket.id)
.forEach(user => createPeer(user, initiator));
let newUsersMap = _.indexBy(users, 'id');
_.chain(peers)
.map((peer, id) => id)
peers.getIds()
.filter(id => !newUsersMap[id])
.each(destroyPeer);
.forEach(peers.destroy);
});
debug('socket.id: %s', socket.id);

101
src/client/peer/peers.js Normal file
View File

@ -0,0 +1,101 @@
const _ = require('underscore');
const Peer = require('./Peer.js');
const debug = require('debug')('peer-calls:peer');
const dispatcher = require('../dispatcher/dispatcher.js');
const notify = require('../action/notify.js');
let peers = {};
/**
* @param {Socket} socket
* @param {User} user
* @param {String} user.id
* @param {Boolean} [initiator=false]
* @param {MediaStream} [stream]
*/
function create({ socket, user, initiator, stream }) {
debug('create peer: %s', user.id);
notify.warn('Connecting to peer...');
if (peers[user.id]) {
notify.info('Cleaning up old connection...');
destroy(user.id);
}
let peer = peers[user.id] = Peer.init({
initiator: '/#' + socket.id === initiator,
stream,
config: {
iceServers: [{
url: 'stun:23.21.150.121',
urls: 'stun:23.21.150.121'
}, {
url: 'turn:numb.viagenie.ca',
credential: 'muazkh',
username: 'webrtc@live.com'
}]
}
});
peer.once('error', err => {
debug('peer: %s, error %s', user.id, err.stack);
notify.error('A peer connection error occurred');
destroy(user.id);
});
peer.on('signal', signal => {
debug('peer: %s, signal: %o', user.id, signal);
let payload = { userId: user.id, signal };
socket.emit('signal', payload);
});
peer.once('connect', () => {
debug('peer: %s, connect', user.id);
notify.warn('Peer connection established');
dispatcher.dispatch({ type: 'play' });
});
peer.on('stream', stream => {
debug('peer: %s, stream', user.id);
dispatcher.dispatch({
type: 'add-stream',
userId: user.id,
stream
});
});
peer.once('close', () => {
debug('peer: %s, close', user.id);
notify.error('Peer connection closed');
dispatcher.dispatch({
type: 'remove-stream',
userId: user.id
});
delete peers[user.id];
});
}
function get(userId) {
return peers[userId];
}
function getIds() {
return _.map(peers, (peer, id) => id);
}
function clear() {
debug('clear');
_.each(peers, (_, userId) => destroy(userId));
peers = {};
}
function destroy(userId) {
debug('destroy peer: %s', userId);
let peer = peers[userId];
if (!peer) return debug('peer: %s peer not found', userId);
peer.destroy();
delete peers[userId];
}
module.exports = { create, get, getIds, destroy, clear };

View File

@ -5,7 +5,7 @@ const _ = require('underscore');
module.exports = function(socket, io) {
socket.on('signal', payload => {
debug('signal: %s, payload: %o', socket.id, payload);
// debug('signal: %s, payload: %o', socket.id, payload);
io.to(payload.userId).emit('signal', {
userId: socket.id,
signal: payload.signal