Refactor ITask into messages

This commit is contained in:
Jerko Steiner 2019-09-02 11:34:22 +07:00
parent d74faf7c0f
commit 8396323ea1
9 changed files with 87 additions and 59 deletions

View File

@ -1,33 +1,29 @@
import { ITask } from './ITask'
import { IRequest } from './ITask'
import cp from 'child_process'
import { LinkedList } from './LinkedList'
export interface IExecutor<T, R> {
execute(task: ITask<T>): Promise<R>
shutdown(): void
execute(task: IRequest<T>): Promise<R>
}
export type ExecutorFactory<T, R> = () => IExecutor<T, R>
export class PromiseExecutor<T, R> implements IExecutor<T, R> {
constructor(readonly execute: (task: ITask<T>) => Promise<R>) {}
shutdown() {
// do nothing
}
constructor(readonly execute: (task: IRequest<T>) => Promise<R>) {}
}
class SubprocessExecutor<T, R> implements IExecutor<T, R> {
process: cp.ChildProcess
constructor(
protected sourceFile: string, protected taskQueue: LinkedList<ITask<T>>,
protected sourceFile: string, protected taskQueue: LinkedList<IRequest<T>>,
) {
this.process = cp.fork(sourceFile)
}
async execute(task: ITask<T>): Promise<R> {
async execute(task: IRequest<T>): Promise<R> {
return new Promise((resolve, reject) => {
this.process.on('status_' + task.id, message => {
this.process.once('response_' + task.id, message => {
if (message.error) {
reject(message.error)
} else {

View File

@ -1,9 +1,18 @@
export interface ITask<T> {
readonly id: number
readonly definition: T
export interface IRequest<T> {
id: number
params: T
}
export interface IResult<T> {
readonly id: number
readonly result: T
export interface ISuccessMessage<T> {
id: number
result: T
type: 'success'
}
export interface IErrorMessage {
id: number
error: Error
type: 'error'
}
export type TResponse<T> = ISuccessMessage<T> | IErrorMessage

View File

@ -0,0 +1,25 @@
import {IExecutor} from './Executor'
import { IRequest, TResponse, IErrorMessage } from './ITask'
export class Messenger<T, R> {
constructor(readonly executor: IExecutor<T, R>) {
if (!process.send) {
throw new Error('Messenger can only be used from a forked subprocess')
}
process.on('message', async (request: IRequest<T>) => {
try {
const result: R = await this.executor.execute(request)
const response: TResponse<R> = {id: request.id, result, type: 'success'}
process.send!('response_' + request.id, response)
} catch (error) {
const response: IErrorMessage = {id: request.id, error, type: 'error'}
process.send!('response_' + request.id, response)
}
})
}
exit(code: number) {
process.exit(code)
}
}

View File

@ -13,8 +13,8 @@ describe('TaskManager', () => {
const te = new TaskManager<number, void>(
1,
() => new PromiseExecutor(async task => {
await delay(task.definition)
results.push(task.definition)
await delay(task.params)
results.push(task.params)
}),
)
te.post(10)
@ -28,8 +28,8 @@ describe('TaskManager', () => {
const te = new TaskManager<number, void>(
2,
() => new PromiseExecutor(async task => {
await delay(task.definition)
results.push(task.definition)
await delay(task.params)
results.push(task.params)
}),
)
te.post(100) // worker1
@ -45,8 +45,8 @@ describe('TaskManager', () => {
const te = new TaskManager<number, void>(
2,
() => new PromiseExecutor(async task => {
await delay(task.definition)
results.push(task.definition)
await delay(task.params)
results.push(task.params)
}),
)
await Promise.all([
@ -67,9 +67,9 @@ describe('TaskManager', () => {
const te = new TaskManager<IParams, number>(
2,
() => new PromiseExecutor(async task => {
const {definition} = task
await delay(definition.delay)
return definition.a + definition.b
const {params} = task
await delay(params.delay)
return params.a + params.b
}),
)
})
@ -91,8 +91,8 @@ describe('TaskManager', () => {
it('does not fail on error', async () => {
const tm = new TaskManager<number, void>(2,
() => new PromiseExecutor(async task => {
await delay(task.definition)
if (task.definition % 2 === 0) {
await delay(task.params)
if (task.params % 2 === 0) {
throw new Error('Test error: ' + task.id)
}
}))

View File

@ -3,11 +3,7 @@ import {LinkedList} from './LinkedList'
import {Deferred} from './Deferred'
import { Worker } from './Worker'
import { ExecutorFactory } from './Executor'
interface ITask<T> {
id: number
definition: T
}
import { IRequest } from './ITask'
interface ITaskEventHandler {
success: () => void
@ -20,7 +16,7 @@ export interface ITaskManager<T> {
}
export class TaskManager<T, R> implements ITaskManager<T> {
protected taskQueue = new LinkedList<ITask<T>>()
protected taskQueue = new LinkedList<IRequest<T>>()
protected workers: Set<Promise<void>> = new Set()
protected deferredTasks = new Map<number, Deferred<R>>()
@ -32,11 +28,11 @@ export class TaskManager<T, R> implements ITaskManager<T> {
) {
}
async post(definition: T) {
async post(params: T) {
const id = this.getNextTaskId()
this.taskQueue.push({
id,
definition,
params,
})
const deferred = new Deferred<R>()
@ -59,16 +55,16 @@ export class TaskManager<T, R> implements ITaskManager<T> {
const promise = new Worker(
this.createExecutor(),
this.taskQueue,
(err, result) => {
const deferred = this.deferredTasks.get(result.id)
response => {
const deferred = this.deferredTasks.get(response.id)
if (!deferred) {
throw new Error('No deferred found for task id:' + result.id)
throw new Error('No deferred found for task id:' + response.id)
return
}
if (err) {
deferred.reject(err)
if (response.type === 'error') {
deferred.reject(response.error)
} else {
deferred.resolve(result.result)
deferred.resolve(response.result)
}
},
)

View File

@ -1,18 +1,18 @@
import cp from 'child_process'
import { LinkedList } from './LinkedList'
import { IExecutor } from './Executor'
import { IResult, ITask } from './ITask'
import { IRequest, TResponse } from './ITask'
export interface IWorker<T> {
start(): Promise<void>
}
export type ICallback<R> = (err: Error | undefined, result: IResult<R>) => void
export type ICallback<R> = (result: TResponse<R>) => void
export class Worker<T, R> implements IWorker<T> {
constructor(
protected executor: IExecutor<T, R>,
protected taskQueue: LinkedList<ITask<T>>,
protected taskQueue: LinkedList<IRequest<T>>,
protected callback: ICallback<R>,
) {
}
@ -22,14 +22,16 @@ export class Worker<T, R> implements IWorker<T> {
while (task !== undefined) {
try {
const result = await this.executor.execute(task)
this.callback(undefined, {
this.callback({
id: task.id,
result,
type: 'success',
})
} catch (err) {
this.callback(err, {
this.callback({
id: task.id,
result: {} as any,
error: err,
type: 'error',
})
}
task = this.taskQueue.shift()

View File

@ -1,11 +1,14 @@
import {ITask} from '../ITask'
import { Messenger } from '../Messenger'
import { IExecutor } from '../Executor'
import { IRequest } from '../ITask'
process.on('message', async (task: ITask<[number, number]>) => {
await new Promise(delay => {
delay(task.definition)
process.send!('status_' + task.id, {
id: task.id,
result: task.definition[0] + task.definition[1],
const executor = new (class implements IExecutor<[number, number], number> {
async execute(task: IRequest<[number, number]>) {
await new Promise(resolve => {
setTimeout(resolve, 1)
})
})
})
return task.params[0] + task.params[1]
}
})()
export const messenger = new Messenger(executor)

View File

@ -1,3 +0,0 @@
process.on('message', async task => {
process.send('message',
})