Refactor tasq to prepare for forked executors

This commit is contained in:
Jerko Steiner 2019-09-01 17:53:23 +07:00
parent 6a456eb6e8
commit b84e4bbbbf
10 changed files with 285 additions and 128 deletions

View File

@ -0,0 +1,12 @@
import {Deferred} from './Deferred'
describe('Deferred', () => {
it('allows promise to be resolved outside of the callback', async () => {
const d = new Deferred<number>()
d.resolve(3)
const result = await d.promise
expect(result).toBe(3)
})
})

View File

@ -0,0 +1,23 @@
export interface IDeferred<T> {
resolve(result: T | PromiseLike<T> | undefined): void
reject(err: Error): void
}
export class Deferred<T> implements IDeferred<T> {
readonly resolve: (result: T | PromiseLike<T> | undefined) => void
readonly reject: (err: Error) => void
readonly promise: Promise<T>
constructor() {
let res: any
let rej: any
this.promise = new Promise<T>((resolve, reject) => {
res = resolve
rej = reject
})
this.resolve = res
this.reject = rej
}
}

View File

@ -0,0 +1,36 @@
import { ITask } from './ITask'
import cp from 'child_process'
import { LinkedList } from './LinkedList'
export interface IExecutor<T> {
execute(task: ITask<T>): Promise<void>
}
export type ExecutorFactory<T> = () => IExecutor<T>
export class PromiseExecutor<T> implements IExecutor<T> {
constructor(readonly execute: (task: ITask<T>) => Promise<void>) {}
}
class SubprocessExecutor<T> implements IExecutor<T> {
process: cp.ChildProcess
constructor(
protected sourceFile: string, protected taskQueue: LinkedList<ITask<T>>,
) {
this.process = cp.fork(sourceFile)
}
async execute(task: ITask<T>): Promise<void> {
return new Promise((resolve, reject) => {
this.process.on('status_' + task.id, message => {
if (message.error) {
reject(message.error)
} else {
resolve()
}
})
this.process!.send(task)
})
}
}

View File

@ -0,0 +1,5 @@
export interface ITask<T> {
id: number
definition: T
}

View File

@ -1,48 +0,0 @@
import {TaskExecutor} from './TaskExecutor'
describe('TaskExecutor', () => {
function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
describe('post', () => {
it('posts new tasks and executes asynchronously', async () => {
const results: number[] = []
const te = new TaskExecutor<number>(1, async task => {
await delay(task)
results.push(task)
})
te.post(10)
te.post(5)
te.post(7)
await te.wait()
expect(results).toEqual([10, 5, 7])
})
it('executes tasks in different order', async () => {
const results: number[] = []
const te = new TaskExecutor<number>(2, async task => {
await delay(task)
results.push(task)
})
te.post(100) // worker1
te.post(50) // worker2
te.post(85) // worker2
te.post(10) // worker1
await te.wait()
expect(results).toEqual([50, 100, 10, 85])
})
})
describe('error handling', () => {
it('does not fail on error', async () => {
})
it('triggers failure event on error', async () => {
})
})
})

View File

@ -1,80 +0,0 @@
import {EventEmitter} from 'events'
import {LinkedList} from './LinkedList'
export interface ITask<T> {
execute(): Promise<void>
}
interface ITaskEventHandler {
success: () => void
failure: (err: Error) => void
}
export interface ITaskExecutor<T> {
post(task: T): void
wait(): Promise<void>
addListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void
removeListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void
}
let counter = 0
export class TaskExecutor<T> implements ITaskExecutor<T> {
protected taskQueue = new LinkedList<T>()
protected workers: Set<Promise<void>> = new Set()
protected events = new EventEmitter()
constructor(
readonly n: number = 1,
readonly processTask: (task: T) => Promise<void>,
) {
}
addListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void {
this.events.addListener(event, listener)
}
removeListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void {
this.events.removeListener(event, listener)
}
post(task: T) {
this.taskQueue.push(task)
if (this.workers.size < this.n) {
const worker = this.startWorker()
}
}
protected async startWorker() {
counter++
const promise = this._startWorker(counter)
this.workers.add(promise)
await promise
this.workers.delete(promise)
}
protected async _startWorker(id: number) {
let task = this.taskQueue.shift()
while (task !== undefined) {
try {
await this.processTask(task)
this.events.emit('success')
} catch (err) {
this.events.emit('failure', err)
}
task = this.taskQueue.shift()
}
}
async wait() {
const workers = Array.from(this.workers)
for (const worker of workers) {
await worker
}
}
}

View File

@ -0,0 +1,73 @@
import {TaskManager} from './TaskManager'
import { PromiseExecutor } from './Executor'
describe('TaskManager', () => {
function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
describe('post', () => {
it('posts new tasks and executes asynchronously', async () => {
const results: number[] = []
const te = new TaskManager<number>(
1,
() => new PromiseExecutor(async task => {
await delay(task.definition)
results.push(task.definition)
}),
)
te.post(10)
te.post(5)
te.post(7)
await te.wait()
expect(results).toEqual([10, 5, 7])
})
it('executes tasks in different order', async () => {
const results: number[] = []
const te = new TaskManager<number>(
2,
() => new PromiseExecutor(async task => {
await delay(task.definition)
results.push(task.definition)
}),
)
te.post(100) // worker1
te.post(50) // worker2
te.post(85) // worker2
te.post(10) // worker1
await te.wait()
expect(results).toEqual([50, 100, 10, 85])
})
it('returns promises when job posted', async () => {
const results: number[] = []
const te = new TaskManager<number>(
2,
() => new PromiseExecutor(async task => {
await delay(task.definition)
results.push(task.definition)
}),
)
await Promise.all([
te.post(100), // worker1
te.post(50), // worker2
te.post(85), // worker2
te.post(10), // worker1
])
expect(results).toEqual([50, 100, 10, 85])
})
})
describe('error handling', () => {
it('does not fail on error', async () => {
})
it('triggers failure event on error', async () => {
})
})
})

View File

@ -0,0 +1,105 @@
import {EventEmitter} from 'events'
import {LinkedList} from './LinkedList'
import {Deferred} from './Deferred'
import { Worker } from './Worker'
import { ExecutorFactory } from './Executor'
interface ITask<T> {
id: number
definition: T
}
interface ITaskEventHandler {
success: () => void
failure: (err: Error) => void
}
export interface ITaskManager<T> {
post(task: T): void
wait(): Promise<void>
addListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void
removeListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void
}
export class TaskManager<T> implements ITaskManager<T> {
protected taskQueue = new LinkedList<ITask<T>>()
protected workers: Set<Promise<void>> = new Set()
protected events = new EventEmitter()
protected deferredTasks = new Map<number, Deferred<void>>()
protected taskCount = 0
constructor(
readonly n: number = 1,
readonly createExecutor: ExecutorFactory<T>,
) {
}
addListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void {
this.events.addListener(event, listener)
}
removeListener<E extends keyof ITaskEventHandler>(
event: E, listener: ITaskEventHandler[E]): void {
this.events.removeListener(event, listener)
}
async post(definition: T) {
const id = this.getNextTaskId()
this.taskQueue.push({
id,
definition,
})
const deferred = new Deferred<void>()
this.deferredTasks.set(id, deferred)
if (this.workers.size < this.n) {
// deliberately do not use promise here
const worker = this.startWorker()
}
return deferred.promise
}
protected getNextTaskId() {
this.taskCount += 1
return this.taskCount
}
protected async startWorker() {
const promise = new Worker(
this.createExecutor(),
this.taskQueue,
(id, err) => {
const deferred = this.deferredTasks.get(id)
if (!deferred) {
throw new Error('No deferred found for task id:' + id)
// TODO this should not happen!
return
}
if (err) {
deferred.reject(err)
} else {
deferred.resolve()
}
},
)
.start()
this.workers.add(promise)
await promise
this.workers.delete(promise)
}
async wait() {
const workers = Array.from(this.workers)
for (const worker of workers) {
await worker
}
}
}

View File

@ -0,0 +1,30 @@
import cp from 'child_process'
import { LinkedList } from './LinkedList'
import { IExecutor } from './Executor'
import { ITask } from './ITask'
export interface IWorker<T> {
start(): Promise<void>
}
export class Worker<T> implements IWorker<T> {
constructor(
protected executor: IExecutor<T>,
protected taskQueue: LinkedList<ITask<T>>,
protected callback: (id: number, err?: Error) => void,
) {
}
async start() {
let task = this.taskQueue.shift()
while (task !== undefined) {
try {
await this.executor.execute(task)
this.callback(task.id)
} catch (err) {
this.callback(task.id, err)
}
task = this.taskQueue.shift()
}
}
}

View File

@ -1 +1,2 @@
export * from './debounce'
export * from './Deferred'