diff --git a/packages/tasq/src/Executor.ts b/packages/tasq/src/Executor.ts index 4488b34..049f58b 100644 --- a/packages/tasq/src/Executor.ts +++ b/packages/tasq/src/Executor.ts @@ -2,17 +2,21 @@ import { ITask } from './ITask' import cp from 'child_process' import { LinkedList } from './LinkedList' -export interface IExecutor { - execute(task: ITask): Promise +export interface IExecutor { + execute(task: ITask): Promise + shutdown(): void } -export type ExecutorFactory = () => IExecutor +export type ExecutorFactory = () => IExecutor -export class PromiseExecutor implements IExecutor { - constructor(readonly execute: (task: ITask) => Promise) {} +export class PromiseExecutor implements IExecutor { + constructor(readonly execute: (task: ITask) => Promise) {} + shutdown() { + // do nothing + } } -class SubprocessExecutor implements IExecutor { +class SubprocessExecutor implements IExecutor { process: cp.ChildProcess constructor( @@ -21,16 +25,19 @@ class SubprocessExecutor implements IExecutor { this.process = cp.fork(sourceFile) } - async execute(task: ITask): Promise { + async execute(task: ITask): Promise { return new Promise((resolve, reject) => { this.process.on('status_' + task.id, message => { if (message.error) { reject(message.error) } else { - resolve() + resolve(message.result) } }) this.process!.send(task) }) } + shutdown() { + this.process.kill('SIGKILL') + } } diff --git a/packages/tasq/src/ITask.ts b/packages/tasq/src/ITask.ts index 1885c5a..c60d904 100644 --- a/packages/tasq/src/ITask.ts +++ b/packages/tasq/src/ITask.ts @@ -1,5 +1,9 @@ export interface ITask { - id: number - definition: T + readonly id: number + readonly definition: T } +export interface IResult { + readonly id: number + readonly result: T +} diff --git a/packages/tasq/src/TaskManager.test.ts b/packages/tasq/src/TaskManager.test.ts index e7d5fda..a6e3901 100644 --- a/packages/tasq/src/TaskManager.test.ts +++ b/packages/tasq/src/TaskManager.test.ts @@ -60,13 +60,38 @@ describe('TaskManager', () => { }) + async function getError(p: Promise): Promise { + let error: Error | undefined + try { + await p + } catch (err) { + error = err + } + expect(error).toBeDefined() + return error! + } + describe('error handling', () => { it('does not fail on error', async () => { + const tm = new TaskManager(2, + () => new PromiseExecutor(async task => { + await delay(task.definition) + if (task.definition % 2 === 0) { + throw new Error('Test error: ' + task.id) + } + })) - }) - - it('triggers failure event on error', async () => { + const results = await Promise.all([ + tm.post(1), + getError(tm.post(2)), + tm.post(3), + getError(tm.post(4)), + ]) + expect(results[0]).toBe(undefined) + expect(results[1].message).toMatch(/test error/i) + expect(results[2]).toBe(undefined) + expect(results[3].message).toMatch(/test error/i) }) }) diff --git a/packages/tasq/src/TaskManager.ts b/packages/tasq/src/TaskManager.ts index f1a47ae..cf74af4 100644 --- a/packages/tasq/src/TaskManager.ts +++ b/packages/tasq/src/TaskManager.ts @@ -16,39 +16,22 @@ interface ITaskEventHandler { export interface ITaskManager { post(task: T): void - wait(): Promise - - addListener( - event: E, listener: ITaskEventHandler[E]): void - removeListener( - event: E, listener: ITaskEventHandler[E]): void } -export class TaskManager implements ITaskManager { +export class TaskManager implements ITaskManager { protected taskQueue = new LinkedList>() protected workers: Set> = new Set() - protected events = new EventEmitter() - - protected deferredTasks = new Map>() + protected deferredTasks = new Map>() protected taskCount = 0 constructor( readonly n: number = 1, - readonly createExecutor: ExecutorFactory, + readonly createExecutor: ExecutorFactory, ) { } - addListener( - event: E, listener: ITaskEventHandler[E]): void { - this.events.addListener(event, listener) - } - removeListener( - event: E, listener: ITaskEventHandler[E]): void { - this.events.removeListener(event, listener) - } - async post(definition: T) { const id = this.getNextTaskId() this.taskQueue.push({ @@ -56,7 +39,7 @@ export class TaskManager implements ITaskManager { definition, }) - const deferred = new Deferred() + const deferred = new Deferred() this.deferredTasks.set(id, deferred) if (this.workers.size < this.n) { @@ -76,17 +59,16 @@ export class TaskManager implements ITaskManager { const promise = new Worker( this.createExecutor(), this.taskQueue, - (id, err) => { - const deferred = this.deferredTasks.get(id) + (err, result) => { + const deferred = this.deferredTasks.get(result.id) if (!deferred) { - throw new Error('No deferred found for task id:' + id) - // TODO this should not happen! + throw new Error('No deferred found for task id:' + result.id) return } if (err) { deferred.reject(err) } else { - deferred.resolve() + deferred.resolve(result.result) } }, ) diff --git a/packages/tasq/src/Worker.ts b/packages/tasq/src/Worker.ts index 00a49fc..675b2c1 100644 --- a/packages/tasq/src/Worker.ts +++ b/packages/tasq/src/Worker.ts @@ -1,17 +1,19 @@ import cp from 'child_process' import { LinkedList } from './LinkedList' import { IExecutor } from './Executor' -import { ITask } from './ITask' +import { IResult, ITask } from './ITask' export interface IWorker { start(): Promise } -export class Worker implements IWorker { +export type ICallback = (err: Error | undefined, result: IResult) => void + +export class Worker implements IWorker { constructor( - protected executor: IExecutor, + protected executor: IExecutor, protected taskQueue: LinkedList>, - protected callback: (id: number, err?: Error) => void, + protected callback: ICallback, ) { } @@ -19,10 +21,16 @@ export class Worker implements IWorker { let task = this.taskQueue.shift() while (task !== undefined) { try { - await this.executor.execute(task) - this.callback(task.id) + const result = await this.executor.execute(task) + this.callback(undefined, { + id: task.id, + result, + }) } catch (err) { - this.callback(task.id, err) + this.callback(err, { + id: task.id, + result: {} as any, + }) } task = this.taskQueue.shift() } diff --git a/packages/tasq/src/fork.ts b/packages/tasq/src/fork.ts new file mode 100644 index 0000000..e69de29 diff --git a/packages/tasq/src/sample/fork-add.ts b/packages/tasq/src/sample/fork-add.ts new file mode 100644 index 0000000..dcf147b --- /dev/null +++ b/packages/tasq/src/sample/fork-add.ts @@ -0,0 +1,11 @@ +import {ITask} from '../ITask' + +process.on('message', async (task: ITask<[number, number]>) => { + await new Promise(delay => { + delay(task.definition) + process.send!('status_' + task.id, { + id: task.id, + result: task.definition[0] + task.definition[1], + }) + }) +}) diff --git a/packages/tasq/src/sample/fork.ts b/packages/tasq/src/sample/fork.ts new file mode 100644 index 0000000..4d5d65e --- /dev/null +++ b/packages/tasq/src/sample/fork.ts @@ -0,0 +1,3 @@ +process.on('message', async task => { + process.send('message', +})