From a15a24a2c1952fd92a2c8420f32e78f377776b7d Mon Sep 17 00:00:00 2001 From: Jerko Steiner Date: Fri, 19 Jul 2019 19:48:03 +0800 Subject: [PATCH] Add @rondo/tasq --- package.json | 3 +- packages/tasq/README.md | 97 ++++++++++++++++++++++++++++ packages/tasq/jest.config.js | 18 ++++++ packages/tasq/jest.setup.js | 0 packages/tasq/package.json | 14 ++++ packages/tasq/src/LinkedList.test.ts | 25 +++++++ packages/tasq/src/LinkedList.ts | 66 +++++++++++++++++++ packages/tasq/src/TaskExecutor.ts | 25 +++++++ packages/tasq/src/index.ts | 1 + packages/tasq/tsconfig.esm.json | 7 ++ packages/tasq/tsconfig.json | 9 +++ 11 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 packages/tasq/README.md create mode 100644 packages/tasq/jest.config.js create mode 100644 packages/tasq/jest.setup.js create mode 100644 packages/tasq/package.json create mode 100644 packages/tasq/src/LinkedList.test.ts create mode 100644 packages/tasq/src/LinkedList.ts create mode 100644 packages/tasq/src/TaskExecutor.ts create mode 100644 packages/tasq/src/index.ts create mode 100644 packages/tasq/tsconfig.esm.json create mode 100644 packages/tasq/tsconfig.json diff --git a/package.json b/package.json index 19402a5..76de44f 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,8 @@ "@rondo/comments-server": "file:packages/comments-server", "@rondo/comments-common": "file:packages/comments-common", "@rondo/comments-client": "file:packages/comments-client", - "@rondo/image-upload": "file:packages/image-upload" + "@rondo/image-upload": "file:packages/image-upload", + "@rondo/tasq": "file:packages/tasq" }, "devDependencies": { "@types/bcrypt": "^3.0.0", diff --git a/packages/tasq/README.md b/packages/tasq/README.md new file mode 100644 index 0000000..5f1efab --- /dev/null +++ b/packages/tasq/README.md @@ -0,0 +1,97 @@ +# TaskExecutor (table polling) + +```sql + create table job (id, status...); + create table executing_job(id, jobId UNIQUE, workerId); + + " TRY + insert into executing_job(jobId, workerId) values (?, ?); + " CATCH + " other worker has this job + return + + update job set status = 'executing' where id = ?; + + " TRY + executeJob(jobId); + " CATCH + update job set status = 'error' where id = ?; + return + " FINALLY + delete from executing_job where jobId = ?, workerId = ?; + + update job set status = 'success' where id = ?; +``` + + The +TaskExecutor regularly checks the jobs table for a new job to run. + +Pros: No need to fill the tasks after a restart as the jobs table can always be +queried. + +Cons: slow to react on new requests - we have to wait for the executor to query +the database every N minutes. + +# TaskExecutor (push method) + +Every server instance can have a TaskExecutor(N) with a queue of jobs. The +server pushes a new job to the tasks queue for every hook received. + +The TaskExecutor can execute N async jobs simultaneously. After a job is +completed, the taskexecutor checks if there is another job available in the +queue, if not it waits until a new job is pushed to the queue. + +On boot, the server queries the jobs table with any jobs that have not been +executed yet. The queue is filled with jobs from the table. + +If there are multiple nodes, the `executing_job` table will prevent the same +job to be executed twice at the same time. + +Pros: No need to poll a table manually, the jobs are simply pushed to the queue + +Cons: The `jobs` table could become too big... + +## Alternative + +The TaskExecutor could query for next available (old) job manually after a +period of inactivity (or after every Nth job handled). This seems like the best +solution. + +``` + ------ + |-----------------------------| / \ + |-------| -----------> | instance1 ---> TaskExecutor | -------> | | + incoming request | | |-----------------------------| | | table: job + ------------------> | proxy | | DB | + | | |-----------------------------| | | table executing_job + |-------| -----------> | instance2 ---> TaskExecutor | -------> | | + |-----------------------------| \ / + ------ +``` + +As the incoming request is received, the instanceN writes the request +information into the job table, then passes the Job, as well as the user +context to TaskExecutor. + +To isolate TaskExecutor from the rest of the server-side code (to make it +easier to be a part of a separate microservice altogether in the future), it +can notify the server instance as soon as it is idle to query for old jobs. + +For example: + +```typescript +interface TaskExecutor { + constructor(n: number) + + // EventEmitter events: + // - "idle", () => void + // - "success", (jobId: number) => void + // - "fail", (jobId: number) => void + on(event: string, listener: () => void) + removeListener(event: string, listener: () => void) + + post(job: Job): void + start(): void + stop(): void +} +``` diff --git a/packages/tasq/jest.config.js b/packages/tasq/jest.config.js new file mode 100644 index 0000000..3a4457a --- /dev/null +++ b/packages/tasq/jest.config.js @@ -0,0 +1,18 @@ +module.exports = { + roots: [ + '/src' + ], + transform: { + '^.+\\.tsx?$': 'ts-jest' + }, + testRegex: '(/__tests__/.*|\\.(test|spec))\\.tsx?$', + moduleFileExtensions: [ + 'ts', + 'tsx', + 'js', + 'jsx' + ], + setupFiles: ['/jest.setup.js'], + maxConcurrency: 1, + verbose: false +} diff --git a/packages/tasq/jest.setup.js b/packages/tasq/jest.setup.js new file mode 100644 index 0000000..e69de29 diff --git a/packages/tasq/package.json b/packages/tasq/package.json new file mode 100644 index 0000000..c1b95b1 --- /dev/null +++ b/packages/tasq/package.json @@ -0,0 +1,14 @@ +{ + "name": "@rondo/tasq", + "private": true, + "scripts": { + "test": "jest", + "lint": "tslint --project .", + "compile": "tsc", + "clean": "rm -rf lib/" + }, + "dependencies": {}, + "types": "lib/index.d.ts", + "devDependencies": {}, + "module": "lib/index.js" +} diff --git a/packages/tasq/src/LinkedList.test.ts b/packages/tasq/src/LinkedList.test.ts new file mode 100644 index 0000000..b2b3ac5 --- /dev/null +++ b/packages/tasq/src/LinkedList.test.ts @@ -0,0 +1,25 @@ +import {LinkedList} from './LinkedList' + +describe('LinkedList', () => { + + describe('static fromArray', () => { + it('creates a LinkedList from array', () => { + const array = [0, 1, 2, 3, 4, 5] + const ll = LinkedList.fromArray(array) + expect(ll.toArray()).toEqual(array) + }) + }) + + describe('push', () => { + + }) + + describe('peek', () => { + + }) + + describe('pop', () => { + + }) + +}) diff --git a/packages/tasq/src/LinkedList.ts b/packages/tasq/src/LinkedList.ts new file mode 100644 index 0000000..067e161 --- /dev/null +++ b/packages/tasq/src/LinkedList.ts @@ -0,0 +1,66 @@ +export interface IQueue { + push(...t: T[]): void + pop(): T | undefined + peek(): T | undefined + toArray(): T[] +} + +interface INode { + value: T + next?: INode +} + +export class LinkedList implements IQueue { + length = 0 + + protected head: INode | undefined + protected tail: INode | undefined + + push(...t: T[]) { + t.forEach(value => { + const node: INode = {value} + if (!this.length) { + this.head = this.tail = node + this.length = 1 + return + } + this.tail!.next = node + this.tail = node + this.length++ + }) + } + + peek(): T | undefined { + return this.head && this.head.value + } + + pop(): T | undefined { + if (!this.length) { + return undefined + } + if (this.length === 1) { + const value = this.head!.value + this.head = this.tail = undefined + this.length = 0 + return value + } + const head = this.head! + this.head = head.next + this.length-- + return head.value + } + + toArray(): T[] { + const array: T[] = [] + for (let h = this.head; h !== undefined; h = h.next) { + array.push(h.value) + } + return array + } + + static fromArray(t: T[]): LinkedList { + const list = new LinkedList() + list.push(...t) + return list + } +} diff --git a/packages/tasq/src/TaskExecutor.ts b/packages/tasq/src/TaskExecutor.ts new file mode 100644 index 0000000..fcce84c --- /dev/null +++ b/packages/tasq/src/TaskExecutor.ts @@ -0,0 +1,25 @@ +export interface ITaskExecutor { + post(task: T) + start() + stop() +} + +export class Queue { + pop() +} + +export class TaskExecutor implements ITaskExecutor { + protected queue: T[] = [] + + post(task: T) { + this.queue.push(task) + } + + start() { + + } + + stop() { + + } +} diff --git a/packages/tasq/src/index.ts b/packages/tasq/src/index.ts new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/packages/tasq/src/index.ts @@ -0,0 +1 @@ + diff --git a/packages/tasq/tsconfig.esm.json b/packages/tasq/tsconfig.esm.json new file mode 100644 index 0000000..915284d --- /dev/null +++ b/packages/tasq/tsconfig.esm.json @@ -0,0 +1,7 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "esm" + }, + "references": [] +} diff --git a/packages/tasq/tsconfig.json b/packages/tasq/tsconfig.json new file mode 100644 index 0000000..94e864b --- /dev/null +++ b/packages/tasq/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../tsconfig.common.json", + "compilerOptions": { + "outDir": "lib", + "rootDir": "src" + }, + "references": [ + ] +}