deno.land / x / replicache@v10.0.0-beta.0 / sync / pull.ts

نووسراو ببینە
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
import type {LogContext} from '@rocicorp/logger';import type * as dag from '../dag/mod';import * as db from '../db/mod';import {deepClone, deepEqual, JSONValue, ReadonlyJSONValue} from '../json';import { assertPullResponse, isClientStateNotFoundResponse, Puller, PullerResult, PullError, PullResponse, PullResponseOK,} from '../puller';import {assertHTTPRequestInfo, HTTPRequestInfo} from '../http-request-info';import {callJSRequest} from './js-request';import {SYNC_HEAD_NAME} from './sync-head-name';import * as patch from './patch';import {toError} from '../to-error';import * as btree from '../btree/mod';import {BTreeRead} from '../btree/mod';import {updateIndexes} from '../db/write';import {emptyHash, Hash} from '../hash';import type {Meta} from '../db/commit';
export const PULL_VERSION = 0;
/** * The JSON value used as the body when doing a POST to the [pull * endpoint](/server-pull). */export type PullRequest = { profileID: string; clientID: string; cookie: ReadonlyJSONValue; lastMutationID: number; pullVersion: number; // schema_version can optionally be used by the customer's app // to indicate to the data layer what format of Client View the // app understands. schemaVersion: string;};
export type BeginPullRequest = { pullURL: string; pullAuth: string; schemaVersion: string; puller: Puller;};
export type BeginPullResponse = { httpRequestInfo: HTTPRequestInfo; pullResponse?: PullResponse; syncHead: Hash;};
export async function beginPull( profileID: string, clientID: string, beginPullReq: BeginPullRequest, puller: Puller, requestID: string, store: dag.Store, lc: LogContext, createSyncBranch = true,): Promise<BeginPullResponse> { const {pullURL, pullAuth, schemaVersion} = beginPullReq;
const baseSnapshot = await store.withRead(async dagRead => { const mainHeadHash = await dagRead.getHead(db.DEFAULT_HEAD_NAME); if (!mainHeadHash) { throw new Error('Internal no main head found'); } return await db.baseSnapshot(mainHeadHash, dagRead); });
const [, baseCookie] = db.snapshotMetaParts(baseSnapshot);
const pullReq = { profileID, clientID, cookie: baseCookie, lastMutationID: baseSnapshot.mutationID, pullVersion: PULL_VERSION, schemaVersion, }; lc.debug?.('Starting pull...'); const pullStart = Date.now(); const {response, httpRequestInfo} = await callPuller( puller, pullURL, pullReq, pullAuth, requestID, );
lc.debug?.( `...Pull ${response ? 'complete' : 'failed'} in `, Date.now() - pullStart, 'ms', );
// If Puller did not get a pull response we still want to return the HTTP // request info to the JS SDK. if (!response) { return { httpRequestInfo, syncHead: emptyHash, }; }
if (!createSyncBranch || isClientStateNotFoundResponse(response)) { return { httpRequestInfo, pullResponse: response, syncHead: emptyHash, }; }
const syncHead = await handlePullResponse(lc, store, baseCookie, response); if (syncHead === null) { throw new Error('Overlapping sync JsLogInfo'); } return { httpRequestInfo, pullResponse: response, syncHead, };}
// Returns new sync head, or null if response did not apply due to mismatched cookie.export async function handlePullResponse( lc: LogContext, store: dag.Store, expectedBaseCookie: ReadonlyJSONValue, response: PullResponseOK,): Promise<Hash | null> { // It is possible that another sync completed while we were pulling. Ensure // that is not the case by re-checking the base snapshot. return await store.withWrite(async dagWrite => { const dagRead = dagWrite; const mainHead = await dagRead.getHead(db.DEFAULT_HEAD_NAME);
if (mainHead === undefined) { throw new Error('Main head disappeared'); } const baseSnapshot = await db.baseSnapshot(mainHead, dagRead); const [baseLastMutationID, baseCookie] = db.snapshotMetaParts(baseSnapshot);
// TODO(MP) Here we are using whether the cookie has changes as a proxy for whether // the base snapshot changed, which is the check we used to do. I don't think this // is quite right. We need to firm up under what conditions we will/not accept an // update from the server: https://github.com/rocicorp/replicache/issues/713. if (!deepEqual(expectedBaseCookie, baseCookie)) { return null; }
// If other entities (eg, other clients) are modifying the client view // the client view can change but the lastMutationID stays the same. // So be careful here to reject only a lesser lastMutationID. if (response.lastMutationID < baseLastMutationID) { throw new Error( `Received lastMutationID ${response.lastMutationID} is < than last snapshot lastMutationID ${baseLastMutationID}; ignoring client view`, ); }
// If there is no patch and the lmid and cookie don't change, it's a nop. // Otherwise, we will write a new commit, including for the case of just // a cookie change. if ( response.patch.length === 0 && response.lastMutationID === baseLastMutationID && (response.cookie ?? null) === baseCookie ) { return emptyHash; }
// We are going to need to adjust the indexes. Imagine we have just pulled: // // S1 - M1 - main // \ S2 - sync // // Let's say S2 says that it contains up to M1. Are we safe at this moment // to set main to S2? // // No, because the Replicache protocol does not require a snapshot // containing M1 to have the same data as the client computed for M1! // // We must diff the main map in M1 against the main map in S2 and see if it // contains any changes. Whatever changes it contains must be applied to // all indexes. // // We start with the index definitions in the last commit that was // integrated into the new snapshot. const chain = await db.commitChain(mainHead, dagRead); const lastIntegrated = chain.find( c => c.mutationID <= response.lastMutationID, ); if (!lastIntegrated) { throw new Error('Internal invalid chain'); }
const dbWrite = await db.Write.newSnapshot( db.whenceHash(baseSnapshot.chunk.hash), response.lastMutationID, response.cookie ?? null, dagWrite, db.readIndexesForWrite(lastIntegrated), );
await patch.apply(lc, dbWrite, response.patch);
const lastIntegratedMap = new BTreeRead(dagRead, lastIntegrated.valueHash);
for await (const change of dbWrite.map.diff(lastIntegratedMap)) { await updateIndexes( lc, dbWrite.indexes, dagWrite, change.key, () => Promise.resolve( (change as {oldValue: ReadonlyJSONValue | undefined}).oldValue, ), (change as {newValue: ReadonlyJSONValue | undefined}).newValue, ); }
return await dbWrite.commit(SYNC_HEAD_NAME); });}
/** * ReplayMutation is used int the RPC between EndPull so that we can replay * mutations ontop of the current state. It is never exposed to the public. */export type ReplayMutation = { id: number; name: string; args: JSONValue; original: Hash; timestamp: number;};
// The changed keys in different indexes. The key of the map is the index name.// "" is used for the primary index.export type ChangedKeysMap = Map<string, string[]>;
export type MaybeEndPullResult = { replayMutations?: ReplayMutation[]; syncHead: Hash; changedKeys: ChangedKeysMap;};
export async function maybeEndPull( store: dag.Store, lc: LogContext, expectedSyncHead: Hash,): Promise<MaybeEndPullResult> { // Ensure sync head is what the caller thinks it is. return await store.withWrite(async dagWrite => { const dagRead = dagWrite; const syncHeadHash = await dagRead.getHead(SYNC_HEAD_NAME); if (syncHeadHash === undefined) { throw new Error('Missing sync head'); } if (syncHeadHash !== expectedSyncHead) { throw new Error('Wrong sync head'); }
// Ensure another sync has not landed a new snapshot on the main chain. const syncSnapshot = await db.baseSnapshot(syncHeadHash, dagRead); const mainHeadHash = await dagRead.getHead(db.DEFAULT_HEAD_NAME); if (mainHeadHash === undefined) { throw new Error('Missing main head'); } const mainSnapshot = await db.baseSnapshot(mainHeadHash, dagRead);
const {meta} = syncSnapshot; const syncSnapshotBasis = meta.basisHash; if (syncSnapshot === null) { throw new Error('Sync snapshot with no basis'); } if (syncSnapshotBasis !== mainSnapshot.chunk.hash) { throw new Error('Overlapping syncs'); }
// Collect pending commits from the main chain and determine which // of them if any need to be replayed. let pending = await db.localMutations(mainHeadHash, dagRead); const syncHead = await db.commitFromHash(syncHeadHash, dagRead); pending = pending.filter(c => c.mutationID > syncHead.mutationID); // pending() gave us the pending mutations in sync-head-first order whereas // caller wants them in the order to replay (lower mutation ids first). pending.reverse();
// We return the keys that changed due to this pull. This is used by // subscriptions in the JS API when there are no more pending mutations. const changedKeys: ChangedKeysMap = new Map();
// Return replay commits if any. if (pending.length > 0) { const replayMutations: ReplayMutation[] = []; for (const c of pending) { let name: string; let args: ReadonlyJSONValue; let timestamp: number; if (c.isLocal()) { const lm = c.meta; name = lm.mutatorName; args = lm.mutatorArgsJSON; timestamp = lm.timestamp; } else { throw new Error('pending mutation is not local'); } replayMutations.push({ id: c.mutationID, name, args: deepClone(args), original: c.chunk.hash, timestamp, }); } return { syncHead: syncHeadHash, replayMutations, // The changed keys are not reported when further replays are // needed. The changedKeys will be reported at the end when there // are no more mutations to be replay and then it will be reported // relative to DEFAULT_HEAD_NAME. changedKeys, }; }
// TODO check invariants
// Compute diffs (changed keys) for value map and index maps. const mainHead = await db.commitFromHash(mainHeadHash, dagRead); const mainHeadMap = new BTreeRead(dagRead, mainHead.valueHash); const syncHeadMap = new BTreeRead(dagRead, syncHead.valueHash); const valueChangedKeys = await btree.changedKeys(mainHeadMap, syncHeadMap); if (valueChangedKeys.length > 0) { changedKeys.set('', valueChangedKeys); } await addChangedKeysForIndexes(mainHead, syncHead, dagRead, changedKeys);
// No mutations to replay so set the main head to the sync head and sync complete! await Promise.all([ dagWrite.setHead(db.DEFAULT_HEAD_NAME, syncHeadHash), dagWrite.removeHead(SYNC_HEAD_NAME), ]); await dagWrite.commit();
if (lc.debug) { const [oldLastMutationID, oldCookie] = db.snapshotMetaParts(mainSnapshot); const [newLastMutationID, newCookie] = db.snapshotMetaParts(syncSnapshot); lc.debug( 'Successfully pulled new snapshot w/last_mutation_id={} (prev. {}), cookie={} (prev. {}), and value_hash={} (prev. {}).', newLastMutationID, oldLastMutationID, newCookie, oldCookie, syncHead.valueHash, mainSnapshot.valueHash, ); }
return { syncHead: syncHeadHash, replayMutations: [], changedKeys, }; });}
async function callPuller( puller: Puller, url: string, body: PullRequest, auth: string, requestID: string,): Promise<PullerResult> { try { const res = await callJSRequest(puller, url, body, auth, requestID); assertResult(res); return res; } catch (e) { throw new PullError(toError(e)); }}
type Result = { response?: PullResponse; httpRequestInfo: HTTPRequestInfo;};
// eslint-disable-next-line @typescript-eslint/no-explicit-anyfunction assertResult(v: any): asserts v is Result { if (typeof v !== 'object' || v === null) { throw new Error('Expected result to be an object'); }
if (v.response !== undefined) { assertPullResponse(v.response); }
assertHTTPRequestInfo(v.httpRequestInfo);}async function addChangedKeysForIndexes( mainCommit: db.Commit<Meta>, syncCommit: db.Commit<Meta>, read: dag.Read, changedKeysMap: ChangedKeysMap,) { async function allKeys(oldMap: BTreeRead): Promise<string[]> { const keys: string[] = []; for await (const key of oldMap.keys()) { keys.push(key); } return keys; }
const oldIndexes = db.readIndexesForRead(mainCommit); const newIndexes = db.readIndexesForRead(syncCommit);
for (const [oldIndexName, oldIndex] of oldIndexes) { await oldIndex.withMap(read, async oldMap => { const newIndex = newIndexes.get(oldIndexName); if (newIndex !== undefined) { const changedKeys = await newIndex.withMap(read, async newMap => { return btree.changedKeys(oldMap, newMap); });
newIndexes.delete(oldIndexName); if (changedKeys.length > 0) { changedKeysMap.set(oldIndexName, changedKeys); } } else { // old index name is not in the new indexes. All keys changed! const changedKeys = await allKeys(oldMap); if (changedKeys.length > 0) { changedKeysMap.set(oldIndexName, changedKeys); } } }); }
for (const [newIndexName, newIndex] of newIndexes) { // new index name is not in the old indexes. All keys changed! await newIndex.withMap(read, async newMap => { const changedKeys = await allKeys(newMap); if (changedKeys.length > 0) { changedKeysMap.set(newIndexName, await allKeys(newMap)); } }); }}
replicache

Version Info

Tagged at
2 years ago