-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathsubscribersMap.ts
More file actions
108 lines (87 loc) · 3.44 KB
/
subscribersMap.ts
File metadata and controls
108 lines (87 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import { createLogger, ILogger } from '../logger';
import { combineDisposers, IDisposable } from '../functions/disposer';
type Unsub = () => void;
export class SubscribersMap implements IDisposable {
/** Unsusbcribers map: key => unsub fn */
private readonly _map = new Map<string, () => void>();
/** Timeouts map: key => timeout handle */
private readonly _timeouts = new Map<string, any>();
private readonly _logger: ILogger = null;
protected _count = 0;
constructor(readonly subscribe: null | ((key: string) => Promise<Unsub[]>), readonly name?: string) {
this._logger = createLogger(`[Observers:${this.name || '?'}]`);
}
public get count() { return this._count; }
public getIsObserving(key: string) {
return this._map.has(key);
}
public getHasObserveTimeout(key: string) {
return this.getIsObserving(key) && this._timeouts.has(key);
}
public async enable(key: string, enable: boolean, clearAfter: number = null, existingUnsubs: Unsub[] = null) {
if (enable === this.getIsObserving(key)) {
this.refreshTimeout(key, enable, clearAfter, true);
return;
}
if (enable) {
this._logger.log('Adding observer for key =', key, clearAfter ? `, clearAfter = ${clearAfter}` : '');
// this marker will help to determine whether unsubscribe was requested while we were in process of subscribing
let disabed = false;
const marker = () => { disabed = true; };
this._map.set(key, marker);
if (!this.subscribe && !existingUnsubs) {
throw new Error('Neither subscribe function nor existingUnsubs has been configured');
}
const unsubs = existingUnsubs || await this.subscribe(key);
const result = combineDisposers(...unsubs);
if (disabed) { // unsubscribe was requested
result();
} else {
this._map.set(key, result);
this.setCount(this._count + 1);
this.refreshTimeout(key, true, clearAfter);
}
} else {
this._logger.log('Removing observer for key =', key);
this.refreshTimeout(key, false);
const unsub = this._map.get(key);
this._map.delete(key);
unsub();
this.setCount(this._count - 1);
}
}
private refreshTimeout(key: string, enable: boolean, timeout?: number, refresh = false) {
const current = this._timeouts.get(key);
if (current) {
clearTimeout(current);
this._timeouts.delete(key);
}
if (enable && refresh && current == null) {
// DO NOT setup new timeout because it's not intended to clear subscribtion if it was previously enabled for long term
return;
}
if (enable && timeout) {
const t = setTimeout(() => this.enable(key, false), timeout);
this._timeouts.set(key, t);
}
}
protected setCount(v: number) {
this._count = v;
}
public clear() {
// Clear timeouts
for (const t of this._timeouts.values()) {
clearTimeout(t);
}
this._timeouts.clear();
// Invoke unsubscribers
for (const u of this._map.values()) {
u();
}
this._map.clear();
this.setCount(0);
}
public dispose() {
this.clear();
}
}