diff --git a/packages/tasq/src/Executor.ts b/packages/tasq/src/Executor.ts index 049f58b..7d34982 100644 --- a/packages/tasq/src/Executor.ts +++ b/packages/tasq/src/Executor.ts @@ -1,33 +1,29 @@ -import { ITask } from './ITask' +import { IRequest } from './ITask' import cp from 'child_process' import { LinkedList } from './LinkedList' export interface IExecutor { - execute(task: ITask): Promise - shutdown(): void + execute(task: IRequest): Promise } export type ExecutorFactory = () => IExecutor export class PromiseExecutor implements IExecutor { - constructor(readonly execute: (task: ITask) => Promise) {} - shutdown() { - // do nothing - } + constructor(readonly execute: (task: IRequest) => Promise) {} } class SubprocessExecutor implements IExecutor { process: cp.ChildProcess constructor( - protected sourceFile: string, protected taskQueue: LinkedList>, + protected sourceFile: string, protected taskQueue: LinkedList>, ) { this.process = cp.fork(sourceFile) } - async execute(task: ITask): Promise { + async execute(task: IRequest): Promise { return new Promise((resolve, reject) => { - this.process.on('status_' + task.id, message => { + this.process.once('response_' + task.id, message => { if (message.error) { reject(message.error) } else { diff --git a/packages/tasq/src/ITask.ts b/packages/tasq/src/ITask.ts index c60d904..d1e74be 100644 --- a/packages/tasq/src/ITask.ts +++ b/packages/tasq/src/ITask.ts @@ -1,9 +1,18 @@ -export interface ITask { - readonly id: number - readonly definition: T +export interface IRequest { + id: number + params: T } -export interface IResult { - readonly id: number - readonly result: T +export interface ISuccessMessage { + id: number + result: T + type: 'success' } + +export interface IErrorMessage { + id: number + error: Error + type: 'error' +} + +export type TResponse = ISuccessMessage | IErrorMessage diff --git a/packages/tasq/src/Messenger.ts b/packages/tasq/src/Messenger.ts new file mode 100644 index 0000000..8b5c11e --- /dev/null +++ b/packages/tasq/src/Messenger.ts @@ -0,0 +1,25 @@ +import {IExecutor} from './Executor' +import { IRequest, TResponse, IErrorMessage } from './ITask' + +export class Messenger { + constructor(readonly executor: IExecutor) { + if (!process.send) { + throw new Error('Messenger can only be used from a forked subprocess') + } + + process.on('message', async (request: IRequest) => { + try { + const result: R = await this.executor.execute(request) + const response: TResponse = {id: request.id, result, type: 'success'} + process.send!('response_' + request.id, response) + } catch (error) { + const response: IErrorMessage = {id: request.id, error, type: 'error'} + process.send!('response_' + request.id, response) + } + }) + } + + exit(code: number) { + process.exit(code) + } +} diff --git a/packages/tasq/src/TaskManager.test.ts b/packages/tasq/src/TaskManager.test.ts index d300c06..9915865 100644 --- a/packages/tasq/src/TaskManager.test.ts +++ b/packages/tasq/src/TaskManager.test.ts @@ -13,8 +13,8 @@ describe('TaskManager', () => { const te = new TaskManager( 1, () => new PromiseExecutor(async task => { - await delay(task.definition) - results.push(task.definition) + await delay(task.params) + results.push(task.params) }), ) te.post(10) @@ -28,8 +28,8 @@ describe('TaskManager', () => { const te = new TaskManager( 2, () => new PromiseExecutor(async task => { - await delay(task.definition) - results.push(task.definition) + await delay(task.params) + results.push(task.params) }), ) te.post(100) // worker1 @@ -45,8 +45,8 @@ describe('TaskManager', () => { const te = new TaskManager( 2, () => new PromiseExecutor(async task => { - await delay(task.definition) - results.push(task.definition) + await delay(task.params) + results.push(task.params) }), ) await Promise.all([ @@ -67,9 +67,9 @@ describe('TaskManager', () => { const te = new TaskManager( 2, () => new PromiseExecutor(async task => { - const {definition} = task - await delay(definition.delay) - return definition.a + definition.b + const {params} = task + await delay(params.delay) + return params.a + params.b }), ) }) @@ -91,8 +91,8 @@ describe('TaskManager', () => { it('does not fail on error', async () => { const tm = new TaskManager(2, () => new PromiseExecutor(async task => { - await delay(task.definition) - if (task.definition % 2 === 0) { + await delay(task.params) + if (task.params % 2 === 0) { throw new Error('Test error: ' + task.id) } })) diff --git a/packages/tasq/src/TaskManager.ts b/packages/tasq/src/TaskManager.ts index cf74af4..3ca92bb 100644 --- a/packages/tasq/src/TaskManager.ts +++ b/packages/tasq/src/TaskManager.ts @@ -3,11 +3,7 @@ import {LinkedList} from './LinkedList' import {Deferred} from './Deferred' import { Worker } from './Worker' import { ExecutorFactory } from './Executor' - -interface ITask { - id: number - definition: T -} +import { IRequest } from './ITask' interface ITaskEventHandler { success: () => void @@ -20,7 +16,7 @@ export interface ITaskManager { } export class TaskManager implements ITaskManager { - protected taskQueue = new LinkedList>() + protected taskQueue = new LinkedList>() protected workers: Set> = new Set() protected deferredTasks = new Map>() @@ -32,11 +28,11 @@ export class TaskManager implements ITaskManager { ) { } - async post(definition: T) { + async post(params: T) { const id = this.getNextTaskId() this.taskQueue.push({ id, - definition, + params, }) const deferred = new Deferred() @@ -59,16 +55,16 @@ export class TaskManager implements ITaskManager { const promise = new Worker( this.createExecutor(), this.taskQueue, - (err, result) => { - const deferred = this.deferredTasks.get(result.id) + response => { + const deferred = this.deferredTasks.get(response.id) if (!deferred) { - throw new Error('No deferred found for task id:' + result.id) + throw new Error('No deferred found for task id:' + response.id) return } - if (err) { - deferred.reject(err) + if (response.type === 'error') { + deferred.reject(response.error) } else { - deferred.resolve(result.result) + deferred.resolve(response.result) } }, ) diff --git a/packages/tasq/src/Worker.ts b/packages/tasq/src/Worker.ts index 675b2c1..83e7d9d 100644 --- a/packages/tasq/src/Worker.ts +++ b/packages/tasq/src/Worker.ts @@ -1,18 +1,18 @@ import cp from 'child_process' import { LinkedList } from './LinkedList' import { IExecutor } from './Executor' -import { IResult, ITask } from './ITask' +import { IRequest, TResponse } from './ITask' export interface IWorker { start(): Promise } -export type ICallback = (err: Error | undefined, result: IResult) => void +export type ICallback = (result: TResponse) => void export class Worker implements IWorker { constructor( protected executor: IExecutor, - protected taskQueue: LinkedList>, + protected taskQueue: LinkedList>, protected callback: ICallback, ) { } @@ -22,14 +22,16 @@ export class Worker implements IWorker { while (task !== undefined) { try { const result = await this.executor.execute(task) - this.callback(undefined, { + this.callback({ id: task.id, result, + type: 'success', }) } catch (err) { - this.callback(err, { + this.callback({ id: task.id, - result: {} as any, + error: err, + type: 'error', }) } task = this.taskQueue.shift() diff --git a/packages/tasq/src/fork.ts b/packages/tasq/src/fork.ts deleted file mode 100644 index e69de29..0000000 diff --git a/packages/tasq/src/sample/fork-add.ts b/packages/tasq/src/sample/fork-add.ts index dcf147b..222244c 100644 --- a/packages/tasq/src/sample/fork-add.ts +++ b/packages/tasq/src/sample/fork-add.ts @@ -1,11 +1,14 @@ -import {ITask} from '../ITask' +import { Messenger } from '../Messenger' +import { IExecutor } from '../Executor' +import { IRequest } from '../ITask' -process.on('message', async (task: ITask<[number, number]>) => { - await new Promise(delay => { - delay(task.definition) - process.send!('status_' + task.id, { - id: task.id, - result: task.definition[0] + task.definition[1], +const executor = new (class implements IExecutor<[number, number], number> { + async execute(task: IRequest<[number, number]>) { + await new Promise(resolve => { + setTimeout(resolve, 1) }) - }) -}) + return task.params[0] + task.params[1] + } +})() + +export const messenger = new Messenger(executor) diff --git a/packages/tasq/src/sample/fork.ts b/packages/tasq/src/sample/fork.ts deleted file mode 100644 index 4d5d65e..0000000 --- a/packages/tasq/src/sample/fork.ts +++ /dev/null @@ -1,3 +0,0 @@ -process.on('message', async task => { - process.send('message', -})