From b84e4bbbbf2132dd7355a7d2b78bfc6de1cf4d5c Mon Sep 17 00:00:00 2001 From: Jerko Steiner Date: Sun, 1 Sep 2019 17:53:23 +0700 Subject: [PATCH] Refactor tasq to prepare for forked executors --- packages/tasq/src/Deferred.test.ts | 12 +++ packages/tasq/src/Deferred.ts | 23 ++++++ packages/tasq/src/Executor.ts | 36 +++++++++ packages/tasq/src/ITask.ts | 5 ++ packages/tasq/src/TaskExecutor.test.ts | 48 ----------- packages/tasq/src/TaskExecutor.ts | 80 ------------------- packages/tasq/src/TaskManager.test.ts | 73 +++++++++++++++++ packages/tasq/src/TaskManager.ts | 105 +++++++++++++++++++++++++ packages/tasq/src/Worker.ts | 30 +++++++ packages/tasq/src/index.ts | 1 + 10 files changed, 285 insertions(+), 128 deletions(-) create mode 100644 packages/tasq/src/Deferred.test.ts create mode 100644 packages/tasq/src/Deferred.ts create mode 100644 packages/tasq/src/Executor.ts create mode 100644 packages/tasq/src/ITask.ts delete mode 100644 packages/tasq/src/TaskExecutor.test.ts delete mode 100644 packages/tasq/src/TaskExecutor.ts create mode 100644 packages/tasq/src/TaskManager.test.ts create mode 100644 packages/tasq/src/TaskManager.ts create mode 100644 packages/tasq/src/Worker.ts diff --git a/packages/tasq/src/Deferred.test.ts b/packages/tasq/src/Deferred.test.ts new file mode 100644 index 0000000..ff3b187 --- /dev/null +++ b/packages/tasq/src/Deferred.test.ts @@ -0,0 +1,12 @@ +import {Deferred} from './Deferred' + +describe('Deferred', () => { + + it('allows promise to be resolved outside of the callback', async () => { + const d = new Deferred() + d.resolve(3) + const result = await d.promise + expect(result).toBe(3) + }) + +}) diff --git a/packages/tasq/src/Deferred.ts b/packages/tasq/src/Deferred.ts new file mode 100644 index 0000000..59b52dc --- /dev/null +++ b/packages/tasq/src/Deferred.ts @@ -0,0 +1,23 @@ +export interface IDeferred { + resolve(result: T | PromiseLike | undefined): void + reject(err: Error): void +} + +export class Deferred implements IDeferred { + readonly resolve: (result: T | PromiseLike | undefined) => void + readonly reject: (err: Error) => void + readonly promise: Promise + + constructor() { + let res: any + let rej: any + + this.promise = new Promise((resolve, reject) => { + res = resolve + rej = reject + }) + + this.resolve = res + this.reject = rej + } +} diff --git a/packages/tasq/src/Executor.ts b/packages/tasq/src/Executor.ts new file mode 100644 index 0000000..4488b34 --- /dev/null +++ b/packages/tasq/src/Executor.ts @@ -0,0 +1,36 @@ +import { ITask } from './ITask' +import cp from 'child_process' +import { LinkedList } from './LinkedList' + +export interface IExecutor { + execute(task: ITask): Promise +} + +export type ExecutorFactory = () => IExecutor + +export class PromiseExecutor implements IExecutor { + constructor(readonly execute: (task: ITask) => Promise) {} +} + +class SubprocessExecutor implements IExecutor { + process: cp.ChildProcess + + constructor( + protected sourceFile: string, protected taskQueue: LinkedList>, + ) { + this.process = cp.fork(sourceFile) + } + + async execute(task: ITask): Promise { + return new Promise((resolve, reject) => { + this.process.on('status_' + task.id, message => { + if (message.error) { + reject(message.error) + } else { + resolve() + } + }) + this.process!.send(task) + }) + } +} diff --git a/packages/tasq/src/ITask.ts b/packages/tasq/src/ITask.ts new file mode 100644 index 0000000..1885c5a --- /dev/null +++ b/packages/tasq/src/ITask.ts @@ -0,0 +1,5 @@ +export interface ITask { + id: number + definition: T +} + diff --git a/packages/tasq/src/TaskExecutor.test.ts b/packages/tasq/src/TaskExecutor.test.ts deleted file mode 100644 index 08b236e..0000000 --- a/packages/tasq/src/TaskExecutor.test.ts +++ /dev/null @@ -1,48 +0,0 @@ -import {TaskExecutor} from './TaskExecutor' - -describe('TaskExecutor', () => { - - function delay(ms: number): Promise { - return new Promise(resolve => setTimeout(resolve, ms)) - } - - describe('post', () => { - it('posts new tasks and executes asynchronously', async () => { - const results: number[] = [] - const te = new TaskExecutor(1, async task => { - await delay(task) - results.push(task) - }) - te.post(10) - te.post(5) - te.post(7) - await te.wait() - expect(results).toEqual([10, 5, 7]) - }) - it('executes tasks in different order', async () => { - const results: number[] = [] - const te = new TaskExecutor(2, async task => { - await delay(task) - results.push(task) - }) - te.post(100) // worker1 - te.post(50) // worker2 - te.post(85) // worker2 - te.post(10) // worker1 - await te.wait() - expect(results).toEqual([50, 100, 10, 85]) - }) - - }) - - describe('error handling', () => { - it('does not fail on error', async () => { - - }) - - it('triggers failure event on error', async () => { - - }) - }) - -}) diff --git a/packages/tasq/src/TaskExecutor.ts b/packages/tasq/src/TaskExecutor.ts deleted file mode 100644 index 98c82a1..0000000 --- a/packages/tasq/src/TaskExecutor.ts +++ /dev/null @@ -1,80 +0,0 @@ -import {EventEmitter} from 'events' -import {LinkedList} from './LinkedList' - -export interface ITask { - execute(): Promise -} - -interface ITaskEventHandler { - success: () => void - failure: (err: Error) => void -} - -export interface ITaskExecutor { - post(task: T): void - - wait(): Promise - - addListener( - event: E, listener: ITaskEventHandler[E]): void - removeListener( - event: E, listener: ITaskEventHandler[E]): void -} - -let counter = 0 - -export class TaskExecutor implements ITaskExecutor { - protected taskQueue = new LinkedList() - protected workers: Set> = new Set() - protected events = new EventEmitter() - - constructor( - readonly n: number = 1, - readonly processTask: (task: T) => Promise, - ) { - } - - addListener( - event: E, listener: ITaskEventHandler[E]): void { - this.events.addListener(event, listener) - } - removeListener( - event: E, listener: ITaskEventHandler[E]): void { - this.events.removeListener(event, listener) - } - - post(task: T) { - this.taskQueue.push(task) - if (this.workers.size < this.n) { - const worker = this.startWorker() - } - } - - protected async startWorker() { - counter++ - const promise = this._startWorker(counter) - this.workers.add(promise) - await promise - this.workers.delete(promise) - } - - protected async _startWorker(id: number) { - let task = this.taskQueue.shift() - while (task !== undefined) { - try { - await this.processTask(task) - this.events.emit('success') - } catch (err) { - this.events.emit('failure', err) - } - task = this.taskQueue.shift() - } - } - - async wait() { - const workers = Array.from(this.workers) - for (const worker of workers) { - await worker - } - } -} diff --git a/packages/tasq/src/TaskManager.test.ts b/packages/tasq/src/TaskManager.test.ts new file mode 100644 index 0000000..e7d5fda --- /dev/null +++ b/packages/tasq/src/TaskManager.test.ts @@ -0,0 +1,73 @@ +import {TaskManager} from './TaskManager' +import { PromiseExecutor } from './Executor' + +describe('TaskManager', () => { + + function delay(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)) + } + + describe('post', () => { + it('posts new tasks and executes asynchronously', async () => { + const results: number[] = [] + const te = new TaskManager( + 1, + () => new PromiseExecutor(async task => { + await delay(task.definition) + results.push(task.definition) + }), + ) + te.post(10) + te.post(5) + te.post(7) + await te.wait() + expect(results).toEqual([10, 5, 7]) + }) + it('executes tasks in different order', async () => { + const results: number[] = [] + const te = new TaskManager( + 2, + () => new PromiseExecutor(async task => { + await delay(task.definition) + results.push(task.definition) + }), + ) + te.post(100) // worker1 + te.post(50) // worker2 + te.post(85) // worker2 + te.post(10) // worker1 + await te.wait() + expect(results).toEqual([50, 100, 10, 85]) + }) + + it('returns promises when job posted', async () => { + const results: number[] = [] + const te = new TaskManager( + 2, + () => new PromiseExecutor(async task => { + await delay(task.definition) + results.push(task.definition) + }), + ) + await Promise.all([ + te.post(100), // worker1 + te.post(50), // worker2 + te.post(85), // worker2 + te.post(10), // worker1 + ]) + expect(results).toEqual([50, 100, 10, 85]) + }) + + }) + + describe('error handling', () => { + it('does not fail on error', async () => { + + }) + + it('triggers failure event on error', async () => { + + }) + }) + +}) diff --git a/packages/tasq/src/TaskManager.ts b/packages/tasq/src/TaskManager.ts new file mode 100644 index 0000000..f1a47ae --- /dev/null +++ b/packages/tasq/src/TaskManager.ts @@ -0,0 +1,105 @@ +import {EventEmitter} from 'events' +import {LinkedList} from './LinkedList' +import {Deferred} from './Deferred' +import { Worker } from './Worker' +import { ExecutorFactory } from './Executor' + +interface ITask { + id: number + definition: T +} + +interface ITaskEventHandler { + success: () => void + failure: (err: Error) => void +} + +export interface ITaskManager { + post(task: T): void + + wait(): Promise + + addListener( + event: E, listener: ITaskEventHandler[E]): void + removeListener( + event: E, listener: ITaskEventHandler[E]): void +} + +export class TaskManager implements ITaskManager { + protected taskQueue = new LinkedList>() + protected workers: Set> = new Set() + protected events = new EventEmitter() + + protected deferredTasks = new Map>() + + protected taskCount = 0 + + constructor( + readonly n: number = 1, + readonly createExecutor: ExecutorFactory, + ) { + } + + addListener( + event: E, listener: ITaskEventHandler[E]): void { + this.events.addListener(event, listener) + } + removeListener( + event: E, listener: ITaskEventHandler[E]): void { + this.events.removeListener(event, listener) + } + + async post(definition: T) { + const id = this.getNextTaskId() + this.taskQueue.push({ + id, + definition, + }) + + const deferred = new Deferred() + this.deferredTasks.set(id, deferred) + + if (this.workers.size < this.n) { + // deliberately do not use promise here + const worker = this.startWorker() + } + + return deferred.promise + } + + protected getNextTaskId() { + this.taskCount += 1 + return this.taskCount + } + + protected async startWorker() { + const promise = new Worker( + this.createExecutor(), + this.taskQueue, + (id, err) => { + const deferred = this.deferredTasks.get(id) + if (!deferred) { + throw new Error('No deferred found for task id:' + id) + // TODO this should not happen! + return + } + if (err) { + deferred.reject(err) + } else { + deferred.resolve() + } + }, + ) + .start() + this.workers.add(promise) + await promise + this.workers.delete(promise) + } + + async wait() { + const workers = Array.from(this.workers) + for (const worker of workers) { + await worker + } + } +} diff --git a/packages/tasq/src/Worker.ts b/packages/tasq/src/Worker.ts new file mode 100644 index 0000000..00a49fc --- /dev/null +++ b/packages/tasq/src/Worker.ts @@ -0,0 +1,30 @@ +import cp from 'child_process' +import { LinkedList } from './LinkedList' +import { IExecutor } from './Executor' +import { ITask } from './ITask' + +export interface IWorker { + start(): Promise +} + +export class Worker implements IWorker { + constructor( + protected executor: IExecutor, + protected taskQueue: LinkedList>, + protected callback: (id: number, err?: Error) => void, + ) { + } + + async start() { + let task = this.taskQueue.shift() + while (task !== undefined) { + try { + await this.executor.execute(task) + this.callback(task.id) + } catch (err) { + this.callback(task.id, err) + } + task = this.taskQueue.shift() + } + } +} diff --git a/packages/tasq/src/index.ts b/packages/tasq/src/index.ts index 3c52094..0cfcee5 100644 --- a/packages/tasq/src/index.ts +++ b/packages/tasq/src/index.ts @@ -1 +1,2 @@ export * from './debounce' +export * from './Deferred'