Add result to scheduled tasks

This commit is contained in:
Jerko Steiner 2019-09-01 18:41:23 +07:00
parent b84e4bbbbf
commit a22c59bec5
8 changed files with 86 additions and 46 deletions

View File

@ -2,17 +2,21 @@ import { ITask } from './ITask'
import cp from 'child_process'
import { LinkedList } from './LinkedList'
export interface IExecutor<T> {
execute(task: ITask<T>): Promise<void>
export interface IExecutor<T, R> {
execute(task: ITask<T>): Promise<R>
shutdown(): void
}
export type ExecutorFactory<T> = () => IExecutor<T>
export type ExecutorFactory<T, R> = () => IExecutor<T, R>
export class PromiseExecutor<T> implements IExecutor<T> {
constructor(readonly execute: (task: ITask<T>) => Promise<void>) {}
export class PromiseExecutor<T, R> implements IExecutor<T, R> {
constructor(readonly execute: (task: ITask<T>) => Promise<R>) {}
shutdown() {
// do nothing
}
}
class SubprocessExecutor<T> implements IExecutor<T> {
class SubprocessExecutor<T, R> implements IExecutor<T, R> {
process: cp.ChildProcess
constructor(
@ -21,16 +25,19 @@ class SubprocessExecutor<T> implements IExecutor<T> {
this.process = cp.fork(sourceFile)
}
async execute(task: ITask<T>): Promise<void> {
async execute(task: ITask<T>): Promise<R> {
return new Promise((resolve, reject) => {
this.process.on('status_' + task.id, message => {
if (message.error) {
reject(message.error)
} else {
resolve()
resolve(message.result)
}
})
this.process!.send(task)
})
}
shutdown() {
this.process.kill('SIGKILL')
}
}

View File

@ -1,5 +1,9 @@
export interface ITask<T> {
id: number
definition: T
readonly id: number
readonly definition: T
}
export interface IResult<T> {
readonly id: number
readonly result: T
}

View File

@ -60,13 +60,38 @@ describe('TaskManager', () => {
})
async function getError(p: Promise<unknown>): Promise<Error> {
let error: Error | undefined
try {
await p
} catch (err) {
error = err
}
expect(error).toBeDefined()
return error!
}
describe('error handling', () => {
it('does not fail on error', async () => {
const tm = new TaskManager<number>(2,
() => new PromiseExecutor(async task => {
await delay(task.definition)
if (task.definition % 2 === 0) {
throw new Error('Test error: ' + task.id)
}
}))
})
it('triggers failure event on error', async () => {
const results = await Promise.all([
tm.post(1),
getError(tm.post(2)),
tm.post(3),
getError(tm.post(4)),
])
expect(results[0]).toBe(undefined)
expect(results[1].message).toMatch(/test error/i)
expect(results[2]).toBe(undefined)
expect(results[3].message).toMatch(/test error/i)
})
})

View File

@ -16,39 +16,22 @@ interface ITaskEventHandler {
export interface ITaskManager<T> {
post(task: T): void
wait(): Promise<void>
addListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void
removeListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void
}
export class TaskManager<T> implements ITaskManager<T> {
export class TaskManager<T, R> implements ITaskManager<T> {
protected taskQueue = new LinkedList<ITask<T>>()
protected workers: Set<Promise<void>> = new Set()
protected events = new EventEmitter()
protected deferredTasks = new Map<number, Deferred<void>>()
protected deferredTasks = new Map<number, Deferred<R>>()
protected taskCount = 0
constructor(
readonly n: number = 1,
readonly createExecutor: ExecutorFactory<T>,
readonly createExecutor: ExecutorFactory<T, R>,
) {
}
addListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void {
this.events.addListener(event, listener)
}
removeListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void {
this.events.removeListener(event, listener)
}
async post(definition: T) {
const id = this.getNextTaskId()
this.taskQueue.push({
@ -56,7 +39,7 @@ export class TaskManager<T> implements ITaskManager<T> {
definition,
})
const deferred = new Deferred<void>()
const deferred = new Deferred<R>()
this.deferredTasks.set(id, deferred)
if (this.workers.size < this.n) {
@ -76,17 +59,16 @@ export class TaskManager<T> implements ITaskManager<T> {
const promise = new Worker(
this.createExecutor(),
this.taskQueue,
(id, err) => {
const deferred = this.deferredTasks.get(id)
(err, result) => {
const deferred = this.deferredTasks.get(result.id)
if (!deferred) {
throw new Error('No deferred found for task id:' + id)
// TODO this should not happen!
throw new Error('No deferred found for task id:' + result.id)
return
}
if (err) {
deferred.reject(err)
} else {
deferred.resolve()
deferred.resolve(result.result)
}
},
)

View File

@ -1,17 +1,19 @@
import cp from 'child_process'
import { LinkedList } from './LinkedList'
import { IExecutor } from './Executor'
import { ITask } from './ITask'
import { IResult, ITask } from './ITask'
export interface IWorker<T> {
start(): Promise<void>
}
export class Worker<T> implements IWorker<T> {
export type ICallback<R> = (err: Error | undefined, result: IResult<R>) => void
export class Worker<T, R> implements IWorker<T> {
constructor(
protected executor: IExecutor<T>,
protected executor: IExecutor<T, R>,
protected taskQueue: LinkedList<ITask<T>>,
protected callback: (id: number, err?: Error) => void,
protected callback: ICallback<R>,
) {
}
@ -19,10 +21,16 @@ export class Worker<T> implements IWorker<T> {
let task = this.taskQueue.shift()
while (task !== undefined) {
try {
await this.executor.execute(task)
this.callback(task.id)
const result = await this.executor.execute(task)
this.callback(undefined, {
id: task.id,
result,
})
} catch (err) {
this.callback(task.id, err)
this.callback(err, {
id: task.id,
result: {} as any,
})
}
task = this.taskQueue.shift()
}

View File

View File

@ -0,0 +1,11 @@
import {ITask} from '../ITask'
process.on('message', async (task: ITask<[number, number]>) => {
await new Promise(delay => {
delay(task.definition)
process.send!('status_' + task.id, {
id: task.id,
result: task.definition[0] + task.definition[1],
})
})
})

View File

@ -0,0 +1,3 @@
process.on('message', async task => {
process.send('message',
})