Add @rondo/tasq

This commit is contained in:
Jerko Steiner 2019-07-19 19:48:03 +08:00
parent 98bceca746
commit a15a24a2c1
11 changed files with 264 additions and 1 deletions

View File

@ -6,7 +6,8 @@
"@rondo/comments-server": "file:packages/comments-server",
"@rondo/comments-common": "file:packages/comments-common",
"@rondo/comments-client": "file:packages/comments-client",
"@rondo/image-upload": "file:packages/image-upload"
"@rondo/image-upload": "file:packages/image-upload",
"@rondo/tasq": "file:packages/tasq"
},
"devDependencies": {
"@types/bcrypt": "^3.0.0",

97
packages/tasq/README.md Normal file
View File

@ -0,0 +1,97 @@
# TaskExecutor (table polling)
```sql
create table job (id, status...);
create table executing_job(id, jobId UNIQUE, workerId);
" TRY
insert into executing_job(jobId, workerId) values (?, ?);
" CATCH
" other worker has this job
return
update job set status = 'executing' where id = ?;
" TRY
executeJob(jobId);
" CATCH
update job set status = 'error' where id = ?;
return
" FINALLY
delete from executing_job where jobId = ?, workerId = ?;
update job set status = 'success' where id = ?;
```
The
TaskExecutor regularly checks the jobs table for a new job to run.
Pros: No need to fill the tasks after a restart as the jobs table can always be
queried.
Cons: slow to react on new requests - we have to wait for the executor to query
the database every N minutes.
# TaskExecutor (push method)
Every server instance can have a TaskExecutor(N) with a queue of jobs. The
server pushes a new job to the tasks queue for every hook received.
The TaskExecutor can execute N async jobs simultaneously. After a job is
completed, the taskexecutor checks if there is another job available in the
queue, if not it waits until a new job is pushed to the queue.
On boot, the server queries the jobs table with any jobs that have not been
executed yet. The queue is filled with jobs from the table.
If there are multiple nodes, the `executing_job` table will prevent the same
job to be executed twice at the same time.
Pros: No need to poll a table manually, the jobs are simply pushed to the queue
Cons: The `jobs` table could become too big...
## Alternative
The TaskExecutor could query for next available (old) job manually after a
period of inactivity (or after every Nth job handled). This seems like the best
solution.
```
------
|-----------------------------| / \
|-------| -----------> | instance1 ---> TaskExecutor | -------> | |
incoming request | | |-----------------------------| | | table: job
------------------> | proxy | | DB |
| | |-----------------------------| | | table executing_job
|-------| -----------> | instance2 ---> TaskExecutor | -------> | |
|-----------------------------| \ /
------
```
As the incoming request is received, the instanceN writes the request
information into the job table, then passes the Job, as well as the user
context to TaskExecutor.
To isolate TaskExecutor from the rest of the server-side code (to make it
easier to be a part of a separate microservice altogether in the future), it
can notify the server instance as soon as it is idle to query for old jobs.
For example:
```typescript
interface TaskExecutor {
constructor(n: number)
// EventEmitter events:
// - "idle", () => void
// - "success", (jobId: number) => void
// - "fail", (jobId: number) => void
on(event: string, listener: () => void)
removeListener(event: string, listener: () => void)
post(job: Job): void
start(): void
stop(): void
}
```

View File

@ -0,0 +1,18 @@
module.exports = {
roots: [
'<rootDir>/src'
],
transform: {
'^.+\\.tsx?$': 'ts-jest'
},
testRegex: '(/__tests__/.*|\\.(test|spec))\\.tsx?$',
moduleFileExtensions: [
'ts',
'tsx',
'js',
'jsx'
],
setupFiles: ['<rootDir>/jest.setup.js'],
maxConcurrency: 1,
verbose: false
}

View File

View File

@ -0,0 +1,14 @@
{
"name": "@rondo/tasq",
"private": true,
"scripts": {
"test": "jest",
"lint": "tslint --project .",
"compile": "tsc",
"clean": "rm -rf lib/"
},
"dependencies": {},
"types": "lib/index.d.ts",
"devDependencies": {},
"module": "lib/index.js"
}

View File

@ -0,0 +1,25 @@
import {LinkedList} from './LinkedList'
describe('LinkedList', () => {
describe('static fromArray', () => {
it('creates a LinkedList from array', () => {
const array = [0, 1, 2, 3, 4, 5]
const ll = LinkedList.fromArray(array)
expect(ll.toArray()).toEqual(array)
})
})
describe('push', () => {
})
describe('peek', () => {
})
describe('pop', () => {
})
})

View File

@ -0,0 +1,66 @@
export interface IQueue<T> {
push(...t: T[]): void
pop(): T | undefined
peek(): T | undefined
toArray(): T[]
}
interface INode<T> {
value: T
next?: INode<T>
}
export class LinkedList<T> implements IQueue<T> {
length = 0
protected head: INode<T> | undefined
protected tail: INode<T> | undefined
push(...t: T[]) {
t.forEach(value => {
const node: INode<T> = {value}
if (!this.length) {
this.head = this.tail = node
this.length = 1
return
}
this.tail!.next = node
this.tail = node
this.length++
})
}
peek(): T | undefined {
return this.head && this.head.value
}
pop(): T | undefined {
if (!this.length) {
return undefined
}
if (this.length === 1) {
const value = this.head!.value
this.head = this.tail = undefined
this.length = 0
return value
}
const head = this.head!
this.head = head.next
this.length--
return head.value
}
toArray(): T[] {
const array: T[] = []
for (let h = this.head; h !== undefined; h = h.next) {
array.push(h.value)
}
return array
}
static fromArray<T>(t: T[]): LinkedList<T> {
const list = new LinkedList<T>()
list.push(...t)
return list
}
}

View File

@ -0,0 +1,25 @@
export interface ITaskExecutor<T> {
post(task: T)
start()
stop()
}
export class Queue<T> {
pop()
}
export class TaskExecutor<T> implements ITaskExecutor<T> {
protected queue: T[] = []
post(task: T) {
this.queue.push(task)
}
start() {
}
stop() {
}
}

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,7 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "esm"
},
"references": []
}

View File

@ -0,0 +1,9 @@
{
"extends": "../tsconfig.common.json",
"compilerOptions": {
"outDir": "lib",
"rootDir": "src"
},
"references": [
]
}