Add initial impl of TaskExecutor
This commit is contained in:
parent
a15a24a2c1
commit
b5e0c408c2
@ -10,16 +10,65 @@ describe('LinkedList', () => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('push', () => {
|
describe('clear', () => {
|
||||||
|
it('clears the array and sets length to 0', () => {
|
||||||
|
const array = [0, 1, 2, 3, 4, 5]
|
||||||
|
const ll = LinkedList.fromArray(array)
|
||||||
|
expect(ll.length).toBe(6)
|
||||||
|
ll.clear()
|
||||||
|
expect(ll.length).toBe(0)
|
||||||
|
expect(ll.toArray()).toEqual([])
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('push', () => {
|
||||||
|
it('pushes a new element to the end', () => {
|
||||||
|
const ll = new LinkedList()
|
||||||
|
ll.push(0)
|
||||||
|
expect(ll.length).toBe(1)
|
||||||
|
ll.push(1)
|
||||||
|
expect(ll.length).toBe(2)
|
||||||
|
ll.push(2, 3)
|
||||||
|
expect(ll.length).toBe(4)
|
||||||
|
ll.push(4, 5, 6)
|
||||||
|
expect(ll.length).toBe(7)
|
||||||
|
expect(ll.toArray()).toEqual([0, 1, 2, 3, 4, 5, 6])
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('peek', () => {
|
describe('peek', () => {
|
||||||
|
it('returns the next element', () => {
|
||||||
|
const ll = new LinkedList()
|
||||||
|
expect(ll.peek()).toEqual(undefined)
|
||||||
|
expect(ll.peek()).toEqual(undefined)
|
||||||
|
ll.push(1)
|
||||||
|
expect(ll.peek()).toEqual(1)
|
||||||
|
expect(ll.peek()).toEqual(1)
|
||||||
|
ll.push(2)
|
||||||
|
expect(ll.peek()).toEqual(1)
|
||||||
|
expect(ll.peek()).toEqual(1)
|
||||||
|
ll.clear()
|
||||||
|
expect(ll.peek()).toEqual(undefined)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('pop', () => {
|
describe('shift', () => {
|
||||||
|
it('returns the first element and shifts the linked list', () => {
|
||||||
|
const ll = LinkedList.fromArray([0, 1, 2, 3])
|
||||||
|
expect(ll.length).toBe(4)
|
||||||
|
expect(ll.shift()).toEqual(0)
|
||||||
|
expect(ll.length).toBe(3)
|
||||||
|
expect(ll.shift()).toEqual(1)
|
||||||
|
expect(ll.length).toBe(2)
|
||||||
|
expect(ll.shift()).toEqual(2)
|
||||||
|
expect(ll.length).toBe(1)
|
||||||
|
expect(ll.shift()).toEqual(3)
|
||||||
|
expect(ll.length).toBe(0)
|
||||||
|
expect(ll.shift()).toEqual(undefined)
|
||||||
|
expect(ll.length).toBe(0)
|
||||||
|
expect(ll.shift()).toEqual(undefined)
|
||||||
|
expect(ll.length).toBe(0)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
export interface IQueue<T> {
|
export interface IQueue<T> {
|
||||||
push(...t: T[]): void
|
push(...t: T[]): void
|
||||||
pop(): T | undefined
|
shift(): T | undefined
|
||||||
peek(): T | undefined
|
peek(): T | undefined
|
||||||
toArray(): T[]
|
toArray(): T[]
|
||||||
}
|
}
|
||||||
@ -10,6 +10,28 @@ interface INode<T> {
|
|||||||
next?: INode<T>
|
next?: INode<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface IIterator<T> {
|
||||||
|
hasNext(): boolean
|
||||||
|
next(): T | undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
export class LinkedListIterator<T> implements IIterator<T> {
|
||||||
|
constructor(protected node: INode<T> | undefined) {
|
||||||
|
}
|
||||||
|
hasNext() {
|
||||||
|
return this.node !== undefined
|
||||||
|
}
|
||||||
|
next(): T | undefined {
|
||||||
|
if (this.node === undefined) {
|
||||||
|
return undefined
|
||||||
|
}
|
||||||
|
const node = this.node
|
||||||
|
const {value} = node
|
||||||
|
this.node = node.next
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export class LinkedList<T> implements IQueue<T> {
|
export class LinkedList<T> implements IQueue<T> {
|
||||||
length = 0
|
length = 0
|
||||||
|
|
||||||
@ -34,22 +56,21 @@ export class LinkedList<T> implements IQueue<T> {
|
|||||||
return this.head && this.head.value
|
return this.head && this.head.value
|
||||||
}
|
}
|
||||||
|
|
||||||
pop(): T | undefined {
|
shift(): T | undefined {
|
||||||
if (!this.length) {
|
if (!this.length) {
|
||||||
return undefined
|
return undefined
|
||||||
}
|
}
|
||||||
if (this.length === 1) {
|
|
||||||
const value = this.head!.value
|
|
||||||
this.head = this.tail = undefined
|
|
||||||
this.length = 0
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
const head = this.head!
|
const head = this.head!
|
||||||
this.head = head.next
|
this.head = head.next
|
||||||
this.length--
|
this.length--
|
||||||
return head.value
|
return head.value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clear() {
|
||||||
|
this.head = this.tail = undefined
|
||||||
|
this.length = 0
|
||||||
|
}
|
||||||
|
|
||||||
toArray(): T[] {
|
toArray(): T[] {
|
||||||
const array: T[] = []
|
const array: T[] = []
|
||||||
for (let h = this.head; h !== undefined; h = h.next) {
|
for (let h = this.head; h !== undefined; h = h.next) {
|
||||||
@ -58,6 +79,10 @@ export class LinkedList<T> implements IQueue<T> {
|
|||||||
return array
|
return array
|
||||||
}
|
}
|
||||||
|
|
||||||
|
iterator(): LinkedListIterator<T> {
|
||||||
|
return new LinkedListIterator(this.head)
|
||||||
|
}
|
||||||
|
|
||||||
static fromArray<T>(t: T[]): LinkedList<T> {
|
static fromArray<T>(t: T[]): LinkedList<T> {
|
||||||
const list = new LinkedList<T>()
|
const list = new LinkedList<T>()
|
||||||
list.push(...t)
|
list.push(...t)
|
||||||
|
|||||||
42
packages/tasq/src/TaskExecutor.test.ts
Normal file
42
packages/tasq/src/TaskExecutor.test.ts
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
import {TaskExecutor} from './TaskExecutor'
|
||||||
|
|
||||||
|
describe('TaskExecutor', () => {
|
||||||
|
|
||||||
|
function delay(ms: number): Promise<void> {
|
||||||
|
return new Promise(resolve => setTimeout(resolve, ms))
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('post', () => {
|
||||||
|
it('posts new tasks and executes asynchronously', async () => {
|
||||||
|
const results: number[] = []
|
||||||
|
const te = new TaskExecutor<number>(1, async task => {
|
||||||
|
await delay(task)
|
||||||
|
results.push(task)
|
||||||
|
})
|
||||||
|
te.post(10)
|
||||||
|
te.post(5)
|
||||||
|
te.post(7)
|
||||||
|
await te.wait()
|
||||||
|
expect(results).toEqual([10, 5, 7])
|
||||||
|
})
|
||||||
|
it.only('executes tasks in different order', async () => {
|
||||||
|
const results: number[] = []
|
||||||
|
const te = new TaskExecutor<number>(2, async task => {
|
||||||
|
await delay(task)
|
||||||
|
results.push(task)
|
||||||
|
})
|
||||||
|
te.post(100) // worker1
|
||||||
|
te.post(50) // worker2
|
||||||
|
te.post(85) // worker2
|
||||||
|
te.post(10) // worker1
|
||||||
|
await te.wait()
|
||||||
|
expect(results).toEqual([50, 100, 10, 85])
|
||||||
|
})
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('error handling', () => {
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
})
|
||||||
@ -1,25 +1,80 @@
|
|||||||
export interface ITaskExecutor<T> {
|
import {EventEmitter} from 'events'
|
||||||
post(task: T)
|
import {LinkedList} from './LinkedList'
|
||||||
start()
|
|
||||||
stop()
|
export interface ITask<T> {
|
||||||
|
execute(): Promise<void>
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Queue<T> {
|
interface ITaskEvents {
|
||||||
pop()
|
success: void
|
||||||
|
failure: Error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface ITaskExecutor<T> {
|
||||||
|
post(task: T): void
|
||||||
|
|
||||||
|
wait(): Promise<void>
|
||||||
|
|
||||||
|
addListener<E extends keyof ITaskEvents>(
|
||||||
|
event: E, listener: (value: ITaskEvents[E]) => void): void
|
||||||
|
removeListener<E extends keyof ITaskEvents>(
|
||||||
|
event: E, listener: (value: ITaskEvents[E]) => void): void
|
||||||
|
}
|
||||||
|
|
||||||
|
let counter = 0
|
||||||
|
|
||||||
export class TaskExecutor<T> implements ITaskExecutor<T> {
|
export class TaskExecutor<T> implements ITaskExecutor<T> {
|
||||||
protected queue: T[] = []
|
protected taskQueue = new LinkedList<T>()
|
||||||
|
protected workers: Set<Promise<void>> = new Set()
|
||||||
|
protected events = new EventEmitter()
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
readonly n: number = 1,
|
||||||
|
readonly processTask: (task: T) => Promise<void>,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
addListener<E extends keyof ITaskEvents>(
|
||||||
|
event: E, listener: (value: ITaskEvents[E]) => void): void {
|
||||||
|
this.events.addListener(event, listener)
|
||||||
|
}
|
||||||
|
removeListener<E extends keyof ITaskEvents>(
|
||||||
|
event: E, listener: (value: ITaskEvents[E]) => void): void {
|
||||||
|
this.events.removeListener(event, listener)
|
||||||
|
}
|
||||||
|
|
||||||
post(task: T) {
|
post(task: T) {
|
||||||
this.queue.push(task)
|
this.taskQueue.push(task)
|
||||||
|
if (this.workers.size < this.n) {
|
||||||
|
const worker = this.startWorker()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
start() {
|
protected async startWorker() {
|
||||||
|
counter++
|
||||||
|
const promise = this._startWorker(counter)
|
||||||
|
this.workers.add(promise)
|
||||||
|
await promise
|
||||||
|
this.workers.delete(promise)
|
||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
protected async _startWorker(id: number) {
|
||||||
|
let task = this.taskQueue.shift()
|
||||||
|
while (task !== undefined) {
|
||||||
|
try {
|
||||||
|
await this.processTask(task)
|
||||||
|
this.events.emit('success')
|
||||||
|
} catch (err) {
|
||||||
|
this.events.emit('failure', err)
|
||||||
|
}
|
||||||
|
task = this.taskQueue.shift()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async wait() {
|
||||||
|
const workers = Array.from(this.workers)
|
||||||
|
for (const worker of workers) {
|
||||||
|
await worker
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user