diff --git a/packages/tasq/src/LinkedList.test.ts b/packages/tasq/src/LinkedList.test.ts index b2b3ac5..1001015 100644 --- a/packages/tasq/src/LinkedList.test.ts +++ b/packages/tasq/src/LinkedList.test.ts @@ -10,16 +10,65 @@ describe('LinkedList', () => { }) }) - describe('push', () => { + describe('clear', () => { + it('clears the array and sets length to 0', () => { + const array = [0, 1, 2, 3, 4, 5] + const ll = LinkedList.fromArray(array) + expect(ll.length).toBe(6) + ll.clear() + expect(ll.length).toBe(0) + expect(ll.toArray()).toEqual([]) + }) + }) + describe('push', () => { + it('pushes a new element to the end', () => { + const ll = new LinkedList() + ll.push(0) + expect(ll.length).toBe(1) + ll.push(1) + expect(ll.length).toBe(2) + ll.push(2, 3) + expect(ll.length).toBe(4) + ll.push(4, 5, 6) + expect(ll.length).toBe(7) + expect(ll.toArray()).toEqual([0, 1, 2, 3, 4, 5, 6]) + }) }) describe('peek', () => { - + it('returns the next element', () => { + const ll = new LinkedList() + expect(ll.peek()).toEqual(undefined) + expect(ll.peek()).toEqual(undefined) + ll.push(1) + expect(ll.peek()).toEqual(1) + expect(ll.peek()).toEqual(1) + ll.push(2) + expect(ll.peek()).toEqual(1) + expect(ll.peek()).toEqual(1) + ll.clear() + expect(ll.peek()).toEqual(undefined) + }) }) - describe('pop', () => { - + describe('shift', () => { + it('returns the first element and shifts the linked list', () => { + const ll = LinkedList.fromArray([0, 1, 2, 3]) + expect(ll.length).toBe(4) + expect(ll.shift()).toEqual(0) + expect(ll.length).toBe(3) + expect(ll.shift()).toEqual(1) + expect(ll.length).toBe(2) + expect(ll.shift()).toEqual(2) + expect(ll.length).toBe(1) + expect(ll.shift()).toEqual(3) + expect(ll.length).toBe(0) + expect(ll.shift()).toEqual(undefined) + expect(ll.length).toBe(0) + expect(ll.shift()).toEqual(undefined) + expect(ll.length).toBe(0) + }) }) }) diff --git a/packages/tasq/src/LinkedList.ts b/packages/tasq/src/LinkedList.ts index 067e161..7bea504 100644 --- a/packages/tasq/src/LinkedList.ts +++ b/packages/tasq/src/LinkedList.ts @@ -1,6 +1,6 @@ export interface IQueue { push(...t: T[]): void - pop(): T | undefined + shift(): T | undefined peek(): T | undefined toArray(): T[] } @@ -10,6 +10,28 @@ interface INode { next?: INode } +export interface IIterator { + hasNext(): boolean + next(): T | undefined +} + +export class LinkedListIterator implements IIterator { + constructor(protected node: INode | undefined) { + } + hasNext() { + return this.node !== undefined + } + next(): T | undefined { + if (this.node === undefined) { + return undefined + } + const node = this.node + const {value} = node + this.node = node.next + return value + } +} + export class LinkedList implements IQueue { length = 0 @@ -34,22 +56,21 @@ export class LinkedList implements IQueue { return this.head && this.head.value } - pop(): T | undefined { + shift(): T | undefined { if (!this.length) { return undefined } - if (this.length === 1) { - const value = this.head!.value - this.head = this.tail = undefined - this.length = 0 - return value - } const head = this.head! this.head = head.next this.length-- return head.value } + clear() { + this.head = this.tail = undefined + this.length = 0 + } + toArray(): T[] { const array: T[] = [] for (let h = this.head; h !== undefined; h = h.next) { @@ -58,6 +79,10 @@ export class LinkedList implements IQueue { return array } + iterator(): LinkedListIterator { + return new LinkedListIterator(this.head) + } + static fromArray(t: T[]): LinkedList { const list = new LinkedList() list.push(...t) diff --git a/packages/tasq/src/TaskExecutor.test.ts b/packages/tasq/src/TaskExecutor.test.ts new file mode 100644 index 0000000..08b3b04 --- /dev/null +++ b/packages/tasq/src/TaskExecutor.test.ts @@ -0,0 +1,42 @@ +import {TaskExecutor} from './TaskExecutor' + +describe('TaskExecutor', () => { + + function delay(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)) + } + + describe('post', () => { + it('posts new tasks and executes asynchronously', async () => { + const results: number[] = [] + const te = new TaskExecutor(1, async task => { + await delay(task) + results.push(task) + }) + te.post(10) + te.post(5) + te.post(7) + await te.wait() + expect(results).toEqual([10, 5, 7]) + }) + it.only('executes tasks in different order', async () => { + const results: number[] = [] + const te = new TaskExecutor(2, async task => { + await delay(task) + results.push(task) + }) + te.post(100) // worker1 + te.post(50) // worker2 + te.post(85) // worker2 + te.post(10) // worker1 + await te.wait() + expect(results).toEqual([50, 100, 10, 85]) + }) + + }) + + describe('error handling', () => { + + }) + +}) diff --git a/packages/tasq/src/TaskExecutor.ts b/packages/tasq/src/TaskExecutor.ts index fcce84c..95e4c28 100644 --- a/packages/tasq/src/TaskExecutor.ts +++ b/packages/tasq/src/TaskExecutor.ts @@ -1,25 +1,80 @@ -export interface ITaskExecutor { - post(task: T) - start() - stop() +import {EventEmitter} from 'events' +import {LinkedList} from './LinkedList' + +export interface ITask { + execute(): Promise } -export class Queue { - pop() +interface ITaskEvents { + success: void + failure: Error } +export interface ITaskExecutor { + post(task: T): void + + wait(): Promise + + addListener( + event: E, listener: (value: ITaskEvents[E]) => void): void + removeListener( + event: E, listener: (value: ITaskEvents[E]) => void): void +} + +let counter = 0 + export class TaskExecutor implements ITaskExecutor { - protected queue: T[] = [] + protected taskQueue = new LinkedList() + protected workers: Set> = new Set() + protected events = new EventEmitter() + + constructor( + readonly n: number = 1, + readonly processTask: (task: T) => Promise, + ) { + } + + addListener( + event: E, listener: (value: ITaskEvents[E]) => void): void { + this.events.addListener(event, listener) + } + removeListener( + event: E, listener: (value: ITaskEvents[E]) => void): void { + this.events.removeListener(event, listener) + } post(task: T) { - this.queue.push(task) + this.taskQueue.push(task) + if (this.workers.size < this.n) { + const worker = this.startWorker() + } } - start() { - + protected async startWorker() { + counter++ + const promise = this._startWorker(counter) + this.workers.add(promise) + await promise + this.workers.delete(promise) } - stop() { + protected async _startWorker(id: number) { + let task = this.taskQueue.shift() + while (task !== undefined) { + try { + await this.processTask(task) + this.events.emit('success') + } catch (err) { + this.events.emit('failure', err) + } + task = this.taskQueue.shift() + } + } + async wait() { + const workers = Array.from(this.workers) + for (const worker of workers) { + await worker + } } }