deno.land / x / cockatiel@v3.1.2 / src / BulkheadPolicy.ts

BulkheadPolicy.ts
نووسراو ببینە
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import { neverAbortedSignal } from './common/abort';import { defer } from './common/defer';import { EventEmitter } from './common/Event';import { ExecuteWrapper } from './common/Executor';import { BulkheadRejectedError } from './errors/BulkheadRejectedError';import { TaskCancelledError } from './errors/Errors';import { IDefaultPolicyContext, IPolicy } from './Policy';
interface IQueueItem<T> { signal: AbortSignal; fn(context: IDefaultPolicyContext): Promise<T> | T; resolve(value: T): void; reject(error: Error): void;}
export class BulkheadPolicy implements IPolicy { public declare readonly _altReturn: never;
private active = 0; private readonly queue: Array<IQueueItem<unknown>> = []; private readonly onRejectEmitter = new EventEmitter<void>(); private readonly executor = new ExecuteWrapper();
/** * @inheritdoc */ public readonly onSuccess = this.executor.onSuccess;
/** * @inheritdoc */ public readonly onFailure = this.executor.onFailure;
/** * Emitter that fires when an item is rejected from the bulkhead. */ public readonly onReject = this.onRejectEmitter.addListener;
/** * Returns the number of available execution slots at this point in time. */ public get executionSlots() { return this.capacity - this.active; }
/** * Returns the number of queue slots at this point in time. */ public get queueSlots() { return this.queueCapacity - this.queue.length; }
/** * Bulkhead limits concurrent requests made. */ constructor(private readonly capacity: number, private readonly queueCapacity: number) {}
/** * Executes the given function. * @param fn Function to execute * @throws a {@link BulkheadRejectedException} if the bulkhead limits are exceeeded */ public async execute<T>( fn: (context: IDefaultPolicyContext) => PromiseLike<T> | T, signal = neverAbortedSignal, ): Promise<T> { if (signal.aborted) { throw new TaskCancelledError(); }
if (this.active < this.capacity) { this.active++; try { return await fn({ signal }); } finally { this.active--; this.dequeue(); } }
if (this.queue.length < this.queueCapacity) { const { resolve, reject, promise } = defer<T>(); this.queue.push({ signal, fn, resolve, reject }); return promise; }
this.onRejectEmitter.emit(); throw new BulkheadRejectedError(this.capacity, this.queueCapacity); }
private dequeue() { const item = this.queue.shift(); if (!item) { return; }
Promise.resolve() .then(() => this.execute(item.fn, item.signal)) .then(item.resolve) .catch(item.reject); }}
cockatiel

Version Info

Tagged at
5 months ago