From 368fa5102baf3274fee57c4e5f9e6b3f28a105a1 Mon Sep 17 00:00:00 2001 From: Jerko Steiner Date: Sat, 17 Jun 2017 22:54:29 -0400 Subject: [PATCH] Move peer stuff to actions --- src/client/__tests__/App-test.js | 9 +- src/client/actions/CallActions.js | 10 +- src/client/actions/PeerActions.js | 129 ++++++++++ src/client/actions/SocketActions.js | 64 +++++ src/client/actions/StreamActions.js | 4 +- .../actions/__tests__/CallActions-test.js | 9 +- .../actions/__tests__/PeerActions-test.js | 172 +++++++++++++ .../__tests__/SocketActions-test.js} | 85 +++---- src/client/components/App.js | 20 +- src/client/components/Input.js | 8 +- src/client/components/Video.js | 22 +- src/client/components/__tests__/Input-test.js | 14 +- src/client/constants.js | 33 ++- src/client/containers/App.js | 6 +- src/client/peer/__tests__/peers-test.js | 228 ------------------ src/client/peer/handshake.js | 47 ---- src/client/peer/peers.js | 120 --------- src/client/reducers/__tests__/active-test.js | 16 ++ src/client/reducers/__tests__/streams-test.js | 28 +-- src/client/reducers/active.js | 10 + src/client/reducers/index.js | 4 + src/client/reducers/peers.js | 21 ++ src/client/reducers/streams.js | 20 +- src/client/store.js | 6 +- 24 files changed, 544 insertions(+), 541 deletions(-) create mode 100644 src/client/actions/PeerActions.js create mode 100644 src/client/actions/SocketActions.js create mode 100644 src/client/actions/__tests__/PeerActions-test.js rename src/client/{peer/__tests__/handshake-test.js => actions/__tests__/SocketActions-test.js} (64%) delete mode 100644 src/client/peer/__tests__/peers-test.js delete mode 100644 src/client/peer/handshake.js delete mode 100644 src/client/peer/peers.js create mode 100644 src/client/reducers/__tests__/active-test.js create mode 100644 src/client/reducers/active.js create mode 100644 src/client/reducers/peers.js diff --git a/src/client/__tests__/App-test.js b/src/client/__tests__/App-test.js index 0a1fcd9..47ec622 100644 --- a/src/client/__tests__/App-test.js +++ b/src/client/__tests__/App-test.js @@ -46,11 +46,8 @@ describe('App', () => { describe('state', () => { let alert beforeEach(() => { - state.streams = state.streams.setIn(['all'], { - 'test': { - userId: 'test', - url: 'blob://' - } + state.streams = state.streams.merge({ + test: 'blob://' }) state.notifications = state.notifications.merge({ 'notification1': { @@ -87,7 +84,7 @@ describe('App', () => { const video = node.querySelector('video') TestUtils.Simulate.click(video) expect(store.getActions()).toEqual([{ - type: constants.STREAM_ACTIVATE, + type: constants.ACTIVE_SET, payload: { userId: 'test' } }]) }) diff --git a/src/client/actions/CallActions.js b/src/client/actions/CallActions.js index b588900..0aeb750 100644 --- a/src/client/actions/CallActions.js +++ b/src/client/actions/CallActions.js @@ -1,10 +1,10 @@ -import * as StreamActions from './StreamActions.js' import * as NotifyActions from './NotifyActions.js' +import * as SocketActions from './SocketActions.js' +import * as StreamActions from './StreamActions.js' import * as constants from '../constants.js' import Promise from 'bluebird' import callId from '../callId.js' import getUserMedia from '../window/getUserMedia.js' -import handshake from '../peer/handshake.js' import socket from '../socket.js' export const init = () => dispatch => { @@ -15,7 +15,11 @@ export const init = () => dispatch => { getCameraStream()(dispatch) ]) .spread((socket, stream) => { - handshake({ socket, callId, stream }) + dispatch(SocketActions.handshake({ + socket, + roomName: callId, + stream + })) }) }) } diff --git a/src/client/actions/PeerActions.js b/src/client/actions/PeerActions.js new file mode 100644 index 0000000..b4a62bf --- /dev/null +++ b/src/client/actions/PeerActions.js @@ -0,0 +1,129 @@ +import * as NotifyActions from '../actions/NotifyActions.js' +import * as StreamActions from '../actions/StreamActions.js' +import * as constants from '../constants.js' +import Peer from 'simple-peer' +import _ from 'underscore' +import _debug from 'debug' +import iceServers from '../iceServers.js' +import { play } from '../window/video.js' + +const debug = _debug('peercalls') + +class PeerHandler { + constructor ({ socket, user, stream, dispatch, getState }) { + this.socket = socket + this.user = user + this.stream = stream + this.dispatch = dispatch + this.getState = getState + } + handleError = err => { + const { dispatch, getState, user } = this + debug('peer: %s, error %s', user.id, err.stack) + dispatch(NotifyActions.error('A peer connection error occurred')) + const peer = getState().peers[user.id] + peer && peer.destroy() + dispatch(removePeer(user.id)) + } + handleSignal = signal => { + const { socket, user } = this + debug('peer: %s, signal: %o', user.id, signal) + + const payload = { userId: user.id, signal } + socket.emit('signal', payload) + } + handleConnect = () => { + const { dispatch, user } = this + debug('peer: %s, connect', user.id) + dispatch(NotifyActions.warning('Peer connection established')) + play() + } + handleStream = stream => { + const { user, dispatch } = this + debug('peer: %s, stream', user.id) + dispatch(StreamActions.addStream({ + userId: user.id, + stream + })) + } + handleData = object => { + const { dispatch, user } = this + object = JSON.parse(new window.TextDecoder('utf-8').decode(object)) + debug('peer: %s, message: %o', user.id, object) + const message = user.id + ': ' + object.message + dispatch(NotifyActions.info(message)) + } + handleClose = () => { + const { dispatch, user } = this + debug('peer: %s, close', user.id) + dispatch(NotifyActions.error('Peer connection closed')) + dispatch(StreamActions.removeStream(user.id)) + dispatch(removePeer(user.id)) + } +} + +/** + * @param {Object} options + * @param {Socket} options.socket + * @param {User} options.user + * @param {String} options.user.id + * @param {Boolean} [options.initiator=false] + * @param {MediaStream} [options.stream] + */ +export function createPeer ({ socket, user, initiator, stream }) { + return (dispatch, getState) => { + const userId = user.id + debug('create peer: %s, stream:', userId, stream) + dispatch(NotifyActions.warning('Connecting to peer...')) + + const oldPeer = getState().peers[userId] + if (oldPeer) { + dispatch(NotifyActions.info('Cleaning up old connection...')) + oldPeer.destroy() + dispatch(removePeer(userId)) + } + + const peer = new Peer({ + initiator: socket.id === initiator, + stream, + config: { iceServers } + }) + + const handler = new PeerHandler({ + socket, + user, + stream, + dispatch, + getState + }) + + peer.once(constants.PEER_EVENT_ERROR, handler.handleError) + peer.once(constants.PEER_EVENT_CONNECT, handler.handleConnect) + peer.once(constants.PEER_EVENT_CLOSE, handler.handleClose) + peer.on(constants.PEER_EVENT_SIGNAL, handler.handleSignal) + peer.on(constants.PEER_EVENT_STREAM, handler.handleStream) + peer.on(constants.PEER_EVENT_DATA, handler.handleData) + + dispatch(addPeer({ peer, userId })) + } +} + +export const addPeer = ({ peer, userId }) => ({ + type: constants.PEER_ADD, + payload: { peer, userId } +}) + +export const removePeer = userId => ({ + type: constants.PEER_REMOVE, + payload: { userId } +}) + +export const destroyPeers = () => ({ + type: constants.PEERS_DESTROY +}) + +export const sendMessage = message => (dispatch, getState) => { + message = JSON.stringify({ message }) + const { peers } = getState() + _.each(peers, peer => peer.send(message)) +} diff --git a/src/client/actions/SocketActions.js b/src/client/actions/SocketActions.js new file mode 100644 index 0000000..69c6429 --- /dev/null +++ b/src/client/actions/SocketActions.js @@ -0,0 +1,64 @@ +import * as NotifyActions from '../actions/NotifyActions.js' +import * as PeerActions from '../actions/PeerActions.js' +import * as constants from '../constants.js' +import _ from 'underscore' +import _debug from 'debug' + +const debug = _debug('peercalls') + +class SocketHandler { + constructor ({ socket, roomName, stream, dispatch, getState }) { + this.socket = socket + this.roomName = roomName + this.stream = stream + this.dispatch = dispatch + this.getState = getState + } + handleSignal = ({ userId, signal }) => { + const { getState } = this + const peer = getState().peers[userId] + // debug('socket signal, userId: %s, signal: %o', userId, signal); + if (!peer) return debug('user: %s, no peer found', userId) + peer.signal(signal) + } + handleUsers = ({ initiator, users }) => { + const { socket, stream, dispatch, getState } = this + debug('socket users: %o', users) + dispatch(NotifyActions.info('Connected users: {0}', users.length)) + const { peers } = getState() + + users + .filter(user => !peers[user.id] && user.id !== socket.id) + .forEach(user => dispatch(PeerActions.createPeer({ + socket, + user, + initiator, + stream + }))) + + let newUsersMap = _.indexBy(users, 'id') + _.keys(peers) + .filter(id => !newUsersMap[id]) + .forEach(id => peers[id].destroy()) + } +} + +export function handshake ({ socket, roomName, stream }) { + return (dispatch, getState) => { + const handler = new SocketHandler({ + socket, + roomName, + stream, + dispatch, + getState + }) + + socket.on(constants.SOCKET_EVENT_SIGNAL, handler.handleSignal) + socket.on(constants.SOCKET_EVENT_USERS, handler.handleUsers) + + debug('socket.id: %s', socket.id) + debug('emit ready for room: %s', roomName) + dispatch(NotifyActions.info('Ready for connections')) + socket.emit('ready', roomName) + } +} diff --git a/src/client/actions/StreamActions.js b/src/client/actions/StreamActions.js index 2499448..1f98afd 100644 --- a/src/client/actions/StreamActions.js +++ b/src/client/actions/StreamActions.js @@ -13,7 +13,7 @@ export const removeStream = userId => ({ payload: { userId } }) -export const activateStream = userId => ({ - type: constants.STREAM_ACTIVATE, +export const setActive = userId => ({ + type: constants.ACTIVE_SET, payload: { userId } }) diff --git a/src/client/actions/__tests__/CallActions-test.js b/src/client/actions/__tests__/CallActions-test.js index c3789b2..f081ca1 100644 --- a/src/client/actions/__tests__/CallActions-test.js +++ b/src/client/actions/__tests__/CallActions-test.js @@ -1,15 +1,15 @@ jest.mock('../../callId.js') jest.mock('../../iceServers.js') -jest.mock('../../peer/handshake.js') jest.mock('../../socket.js') jest.mock('../../window/getUserMedia.js') jest.mock('../../store.js') +jest.mock('../SocketActions.js') import * as CallActions from '../CallActions.js' +import * as SocketActions from '../SocketActions.js' import * as constants from '../../constants.js' import * as getUserMediaMock from '../../window/getUserMedia.js' import callId from '../../callId.js' -import handshake from '../../peer/handshake.js' import socket from '../../socket.js' import store from '../../store.js' @@ -20,6 +20,7 @@ describe('reducers/alerts', () => { beforeEach(() => { store.clearActions() getUserMediaMock.fail(false) + SocketActions.handshake.mockReturnValue(jest.fn()) }) afterEach(() => { @@ -43,9 +44,9 @@ describe('reducers/alerts', () => { } }]) promise.then(() => { - expect(handshake.mock.calls).toEqual([[{ + expect(SocketActions.handshake.mock.calls).toEqual([[{ socket, - callId, + roomName: callId, stream: getUserMediaMock.stream }]]) }) diff --git a/src/client/actions/__tests__/PeerActions-test.js b/src/client/actions/__tests__/PeerActions-test.js new file mode 100644 index 0000000..046d056 --- /dev/null +++ b/src/client/actions/__tests__/PeerActions-test.js @@ -0,0 +1,172 @@ +jest.mock('../../window/video.js') +jest.mock('../../callId.js') +jest.mock('../../iceServers.js') +jest.mock('simple-peer') + +import * as PeerActions from '../PeerActions.js' +import Peer from 'simple-peer' +import { EventEmitter } from 'events' +import { createStore } from '../../store.js' +import { play } from '../../window/video.js' + +describe('PeerActions', () => { + function createSocket () { + const socket = new EventEmitter() + socket.id = 'user1' + return socket + } + + let socket, stream, user, store + beforeEach(() => { + store = createStore() + + user = { id: 'user2' } + socket = createSocket() + Peer.instances = [] + Peer.mockClear() + play.mockClear() + stream = { stream: true } + }) + + describe('create', () => { + it('creates a new peer', () => { + store.dispatch( + PeerActions.createPeer({ socket, user, initiator: 'user2', stream }) + ) + + expect(Peer.instances.length).toBe(1) + expect(Peer.mock.calls.length).toBe(1) + expect(Peer.mock.calls[0][0].initiator).toBe(false) + expect(Peer.mock.calls[0][0].stream).toBe(stream) + }) + + it('sets initiator correctly', () => { + store.dispatch( + PeerActions.createPeer({ socket, user, initiator: 'user1', stream }) + ) + + expect(Peer.instances.length).toBe(1) + expect(Peer.mock.calls.length).toBe(1) + expect(Peer.mock.calls[0][0].initiator).toBe(true) + expect(Peer.mock.calls[0][0].stream).toBe(stream) + }) + + it('destroys old peer before creating new one', () => { + store.dispatch( + PeerActions.createPeer({ socket, user, initiator: 'user2', stream }) + ) + store.dispatch( + PeerActions.createPeer({ socket, user, initiator: 'user2', stream }) + ) + + expect(Peer.instances.length).toBe(2) + expect(Peer.mock.calls.length).toBe(2) + expect(Peer.instances[0].destroy.mock.calls.length).toBe(1) + expect(Peer.instances[1].destroy.mock.calls.length).toBe(0) + }) + }) + + describe('events', () => { + let peer + + beforeEach(() => { + store.dispatch( + PeerActions.createPeer({ socket, user, initiator: 'user1', stream }) + ) + peer = Peer.instances[0] + }) + + describe('connect', () => { + beforeEach(() => peer.emit('connect')) + + it('dispatches "play" action', () => { + expect(play.mock.calls.length).toBe(1) + }) + }) + + describe('data', () => { + + beforeEach(() => { + window.TextDecoder = class TextDecoder { + constructor (encoding) { + this.encoding = encoding + } + decode (object) { + return object.toString(this.encoding) + } + } + }) + + it('decodes a message', () => { + const message = 'test' + const object = JSON.stringify({ message }) + peer.emit('data', Buffer.from(object, 'utf-8')) + const { notifications } = store.getState() + const keys = Object.keys(notifications) + const n = notifications[keys[keys.length - 1]] + expect(n).toEqual({ + id: jasmine.any(String), + type: 'info', + message: `${user.id}: ${message}` + }) + }) + }) + }) + + describe('get', () => { + it('returns undefined when not found', () => { + const { peers } = store.getState() + expect(peers[user.id]).not.toBeDefined() + }) + + it('returns Peer instance when found', () => { + store.dispatch( + PeerActions.createPeer({ socket, user, initiator: 'user2', stream }) + ) + + const { peers } = store.getState() + expect(peers[user.id]).toBe(Peer.instances[0]) + }) + }) + + describe('destroyPeers', () => { + it('destroys all peers and removes them', () => { + store.dispatch(PeerActions.createPeer({ + socket, user: { id: 'user2' }, initiator: 'user2', stream + })) + store.dispatch(PeerActions.createPeer({ + socket, user: { id: 'user3' }, initiator: 'user3', stream + })) + + store.dispatch(PeerActions.destroyPeers()) + + expect(Peer.instances[0].destroy.mock.calls.length).toEqual(1) + expect(Peer.instances[1].destroy.mock.calls.length).toEqual(1) + + const { peers } = store.getState() + expect(Object.keys(peers)).toEqual([]) + }) + }) + + describe('sendMessage', () => { + + beforeEach(() => { + store.dispatch(PeerActions.createPeer({ + socket, user: { id: 'user2' }, initiator: 'user2', stream + })) + store.dispatch(PeerActions.createPeer({ + socket, user: { id: 'user3' }, initiator: 'user3', stream + })) + }) + + it('sends a message to all peers', () => { + store.dispatch(PeerActions.sendMessage('test')) + const { peers } = store.getState() + expect(peers['user2'].send.mock.calls) + .toEqual([[ '{"message":"test"}' ]]) + expect(peers['user3'].send.mock.calls) + .toEqual([[ '{"message":"test"}' ]]) + }) + + }) +}) diff --git a/src/client/peer/__tests__/handshake-test.js b/src/client/actions/__tests__/SocketActions-test.js similarity index 64% rename from src/client/peer/__tests__/handshake-test.js rename to src/client/actions/__tests__/SocketActions-test.js index 76d7a4d..dd4b574 100644 --- a/src/client/peer/__tests__/handshake-test.js +++ b/src/client/actions/__tests__/SocketActions-test.js @@ -1,46 +1,46 @@ jest.mock('simple-peer') -jest.mock('../../store.js') jest.mock('../../callId.js') jest.mock('../../iceServers.js') +jest.mock('../../window/createObjectURL.js') +import * as SocketActions from '../SocketActions.js' import * as constants from '../../constants.js' -import handshake from '../handshake.js' import Peer from 'simple-peer' -import peers from '../peers.js' -import store from '../../store.js' +import reducers from '../../reducers/index.js' import { EventEmitter } from 'events' +import { createStore } from '../../store.js' -describe('handshake', () => { - let socket +describe('SocketActions', () => { + const roomName = 'bla' + + let socket, store beforeEach(() => { socket = new EventEmitter() socket.id = 'a' + store = createStore() + Peer.instances = [] - store.clearActions() }) - afterEach(() => peers.clear()) - - describe('socket events', () => { + describe('handshake', () => { describe('users', () => { - it('add a peer for each new user and destroy peers for missing', () => { - handshake({ socket, roomName: 'bla' }) - - // given - let payload = { + beforeEach(() => { + store.dispatch(SocketActions.handshake({ socket, roomName })) + const payload = { users: [{ id: 'a' }, { id: 'b' }], initiator: 'a' } socket.emit('users', payload) expect(Peer.instances.length).toBe(1) + }) - // when - payload = { + it('adds a peer for each new user and destroys peers for missing', () => { + const payload = { users: [{ id: 'a' }, { id: 'c' }], initiator: 'c' } - socket.emit('users', payload) + socket.emit(constants.SOCKET_EVENT_USERS, payload) // then expect(Peer.instances.length).toBe(2) @@ -53,7 +53,7 @@ describe('handshake', () => { let data beforeEach(() => { data = {} - handshake({ socket, roomName: 'bla' }) + store.dispatch(SocketActions.handshake({ socket, roomName })) socket.emit('users', { initiator: 'a', users: [{ id: 'a' }, { id: 'b' }] @@ -88,7 +88,7 @@ describe('handshake', () => { let ready = false socket.once('ready', () => { ready = true }) - handshake({ socket, roomName: 'bla' }) + store.dispatch(SocketActions.handshake({ socket, roomName })) socket.emit('users', { initiator: 'a', @@ -102,7 +102,7 @@ describe('handshake', () => { describe('error', () => { it('destroys peer', () => { - peer.emit('error', new Error('bla')) + peer.emit(constants.PEER_EVENT_ERROR, new Error('bla')) expect(peer.destroy.mock.calls.length).toBe(1) }) }) @@ -123,38 +123,29 @@ describe('handshake', () => { describe('stream', () => { it('adds a stream to streamStore', () => { - store.clearActions() - let stream = {} - peer.emit('stream', stream) + const stream = {} + peer.emit(constants.PEER_EVENT_STREAM, stream) - expect(store.getActions()).toEqual([{ - type: constants.STREAM_ADD, - payload: { - stream, - userId: 'b' - } - }]) + expect(store.getState().streams).toEqual({ + b: jasmine.any(String) + }) }) }) describe('close', () => { - it('removes stream from streamStore', () => { - store.clearActions() - peer.emit('close') + beforeEach(() => { + const stream = {} + peer.emit(constants.PEER_EVENT_STREAM, stream) + expect(store.getState().streams).toEqual({ + b: jasmine.any(String) + }) + }) - expect(store.getActions()).toEqual([{ - type: constants.NOTIFY, - payload: { - id: jasmine.any(String), - message: 'Peer connection closed', - type: 'error' - } - }, { - type: constants.STREAM_REMOVE, - payload: { - userId: 'b' - } - }]) + it('removes stream & peer from store', () => { + expect(store.getState().peers).toEqual({ b: peer }) + peer.emit('close') + expect(store.getState().streams).toEqual({}) + expect(store.getState().peers).toEqual({}) }) }) }) diff --git a/src/client/components/App.js b/src/client/components/App.js index 4bb3cb2..33433ac 100644 --- a/src/client/components/App.js +++ b/src/client/components/App.js @@ -3,15 +3,15 @@ import Input from './Input.js' import Notifications, { NotificationPropTypes } from './Notifications.js' import PropTypes from 'prop-types' import React from 'react' -import Video, { StreamPropType } from './Video.js' +import Video from './Video.js' import _ from 'underscore' export default class App extends React.Component { static propTypes = { dismissAlert: PropTypes.func.isRequired, - streams: PropTypes.objectOf(StreamPropType).isRequired, + streams: PropTypes.objectOf(PropTypes.string).isRequired, alerts: PropTypes.arrayOf(AlertPropType).isRequired, - activate: PropTypes.func.isRequired, + setActive: PropTypes.func.isRequired, active: PropTypes.string, init: PropTypes.func.isRequired, notify: PropTypes.func.isRequired, @@ -23,19 +23,27 @@ export default class App extends React.Component { } render () { const { - active, activate, alerts, dismissAlert, notify, notifications, streams + active, + alerts, + dismissAlert, + notifications, + notify, + sendMessage, + setActive, + streams } = this.props return (
- +
{_.map(streams, (stream, userId) => (