deno.land / x / replicache@v10.0.0-beta.0 / db / write.ts
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458import type {LogContext} from '@rocicorp/logger';import type * as dag from '../dag/mod';import type {ReadonlyJSONValue} from '../json';import { Commit, Meta as CommitMeta, IndexDefinition, IndexRecord, newIndexChange as commitNewIndexChange, newLocal as commitNewLocal, newSnapshot as commitNewSnapshot,} from './commit';import { Read, readCommitForBTreeWrite, readIndexesForRead, Whence,} from './read';import {IndexWrite, IndexOperation, indexValue, IndexRead} from './index';import {BTreeRead, BTreeWrite} from '../btree/mod';import {asyncIterableToArray} from '../async-iterable-to-array';import {lazy} from '../lazy';import {emptyHash, Hash} from '../hash';
type IndexChangeMeta = { type: MetaType.IndexChange; lastMutationID: number;};
type LocalMeta = { type: MetaType.Local; mutatorName: string; mutatorArgs: ReadonlyJSONValue; mutationID: number; originalHash: Hash | null; timestamp: number;};
type SnapshotMeta = { type: MetaType.Snapshot; lastMutationID: number; cookie: ReadonlyJSONValue;};
type Meta = SnapshotMeta | LocalMeta | IndexChangeMeta;
const enum MetaType { IndexChange, Local, Snapshot,}
export class Write extends Read { private readonly _dagWrite: dag.Write; private readonly _basis: Commit<CommitMeta> | undefined; private readonly _meta: Meta;
shouldDeepClone = true;
declare map: BTreeWrite;
declare readonly indexes: Map<string, IndexWrite>;
constructor( dagWrite: dag.Write, map: BTreeWrite, basis: Commit<CommitMeta> | undefined, meta: Meta, indexes: Map<string, IndexWrite>, ) { // TypeScript has trouble super(dagWrite, map, indexes); this._dagWrite = dagWrite; this._basis = basis; this._meta = meta; }
static async newLocal( whence: Whence, mutatorName: string, mutatorArgs: ReadonlyJSONValue, originalHash: Hash | null, dagWrite: dag.Write, timestamp: number, ): Promise<Write> { const [, basis, bTreeWrite] = await readCommitForBTreeWrite( whence, dagWrite, ); const mutationID = basis.nextMutationID; const indexes = readIndexesForWrite(basis); return new Write( dagWrite, bTreeWrite, basis, { type: MetaType.Local, mutatorName, mutatorArgs, mutationID, originalHash, timestamp, }, indexes, ); }
static async newSnapshot( whence: Whence, mutationID: number, cookie: ReadonlyJSONValue, dagWrite: dag.Write, indexes: Map<string, IndexWrite>, ): Promise<Write> { const [, basis, bTreeWrite] = await readCommitForBTreeWrite( whence, dagWrite, ); return new Write( dagWrite, bTreeWrite, basis, {type: MetaType.Snapshot, lastMutationID: mutationID, cookie}, indexes, ); }
static async newIndexChange( whence: Whence, dagWrite: dag.Write, ): Promise<Write> { const [, basis, bTreeWrite] = await readCommitForBTreeWrite( whence, dagWrite, ); const lastMutationID = basis.mutationID; const indexes = readIndexesForWrite(basis); return new Write( dagWrite, bTreeWrite, basis, {type: MetaType.IndexChange, lastMutationID}, indexes, ); }
isRebase(): boolean { return ( this._meta.type === MetaType.Local && this._meta.originalHash !== null ); }
async put( lc: LogContext, key: string, val: ReadonlyJSONValue, ): Promise<void> { if (this._meta.type === MetaType.IndexChange) { throw new Error('Not allowed'); } const oldVal = lazy(() => this.map.get(key)); await updateIndexes(lc, this.indexes, this._dagWrite, key, oldVal, val);
await this.map.put(key, val); }
async del(lc: LogContext, key: string): Promise<boolean> { if (this._meta.type === MetaType.IndexChange) { throw new Error('Not allowed'); }
// TODO(arv): This does the binary search twice. We can do better. const oldVal = lazy(() => this.map.get(key)); if (oldVal !== undefined) { await updateIndexes( lc, this.indexes, this._dagWrite, key, oldVal, undefined, ); } return this.map.del(key); }
async clear(): Promise<void> { if (this._meta.type === MetaType.IndexChange) { throw new Error('Not allowed'); }
await this.map.clear(); const ps = []; for (const idx of this.indexes.values()) { ps.push(idx.clear()); } await Promise.all(ps); }
async createIndex( lc: LogContext, name: string, keyPrefix: string, jsonPointer: string, ): Promise<void> { if (this._meta.type === MetaType.Local) { throw new Error('Not allowed'); }
const definition: IndexDefinition = { name, keyPrefix, jsonPointer, };
// Check to see if the index already exists. const index = this.indexes.get(name); if (index) { const oldDefinition = index.meta.definition; if ( oldDefinition.name === name && oldDefinition.keyPrefix === keyPrefix && oldDefinition.jsonPointer === jsonPointer ) { return; } else { throw new Error('Index exists with different definition'); } }
const indexMap = new BTreeWrite(this._dagWrite); for await (const entry of this.map.scan(keyPrefix)) { await indexValue( lc, indexMap, IndexOperation.Add, entry[0], entry[1], jsonPointer, ); }
this.indexes.set( name, new IndexWrite( { definition, valueHash: emptyHash, }, indexMap, ), ); }
async dropIndex(name: string): Promise<void> { if (this._meta.type === MetaType.Local) { throw new Error('Not allowed'); }
if (!this.indexes.delete(name)) { throw new Error(`No such index: ${name}`); } }
// Return value is the hash of the new commit. async commit(headName: string): Promise<Hash> { const [hash] = await this.commitWithChangedKeys(headName, false); return hash; }
async commitWithChangedKeys( headName: string, generateChangedKeys: boolean, ): Promise<[Hash, ChangedKeysMap]> { const valueHash = await this.map.flush(); let valueChangedKeys: string[] = []; if (generateChangedKeys && this._basis) { const basisMap = new BTreeRead(this._dagWrite, this._basis.valueHash); valueChangedKeys = await asyncIterableToArray( this.map.diffKeys(basisMap), ); } const indexRecords: IndexRecord[] = []; const keyChanges = new Map(); if (valueChangedKeys.length > 0) { keyChanges.set('', valueChangedKeys); }
let basisIndexes: Map<string, IndexRead>; if (generateChangedKeys && this._basis) { basisIndexes = readIndexesForRead(this._basis); } else { basisIndexes = new Map(); }
for (const [name, index] of this.indexes) { const valueHash = await index.flush(); const basisIndex = basisIndexes.get(name); const indexChangedKeys = await index.withMap( this._dagWrite, async map => { if (basisIndex) { return basisIndex.withMap(this._dagWrite, basisMap => asyncIterableToArray(map.diffKeys(basisMap)), ); } return asyncIterableToArray(map.keys()); }, );
if (indexChangedKeys.length > 0) { keyChanges.set(name, indexChangedKeys); }
const indexRecord: IndexRecord = { definition: index.meta.definition, valueHash, }; indexRecords.push(indexRecord); } const basisHash = this._basis ? this._basis.chunk.hash : null; let commit; const meta = this._meta; switch (meta.type) { case MetaType.Local: { const {mutationID, mutatorName, mutatorArgs, originalHash, timestamp} = meta; commit = commitNewLocal( this._dagWrite.createChunk, basisHash, mutationID, mutatorName, mutatorArgs, originalHash, valueHash, indexRecords, timestamp, ); break; } case MetaType.Snapshot: { const {lastMutationID, cookie} = meta; commit = commitNewSnapshot( this._dagWrite.createChunk, basisHash, lastMutationID, cookie, valueHash, indexRecords, ); break; } case MetaType.IndexChange: { const {lastMutationID} = meta; if (this._basis !== undefined) { if (this._basis.mutationID !== lastMutationID) { throw new Error('Index change must not change mutationID'); } if (this._basis.valueHash !== valueHash) { throw new Error('Index change must not change valueHash'); } }
commit = commitNewIndexChange( this._dagWrite.createChunk, basisHash, lastMutationID, valueHash, indexRecords, ); break; } }
await Promise.all([ this._dagWrite.putChunk(commit.chunk), this._dagWrite.setHead(headName, commit.chunk.hash), ]);
await this._dagWrite.commit();
return [commit.chunk.hash, keyChanges]; }
close(): void { this._dagWrite.close(); }}
export async function updateIndexes( lc: LogContext, indexes: Map<string, IndexWrite>, dagWrite: dag.Write, key: string, oldValGetter: () => Promise<ReadonlyJSONValue | undefined>, newVal: ReadonlyJSONValue | undefined,): Promise<void> { const ps: Promise<void>[] = []; for (const idx of indexes.values()) { if (key.startsWith(idx.meta.definition.keyPrefix)) { const oldVal = await oldValGetter(); await idx.withMap(dagWrite, async map => { if (oldVal !== undefined) { ps.push( indexValue( lc, map, IndexOperation.Remove, key, oldVal, idx.meta.definition.jsonPointer, ), ); } if (newVal !== undefined) { ps.push( indexValue( lc, map, IndexOperation.Add, key, newVal, idx.meta.definition.jsonPointer, ), ); } }); } } await Promise.all(ps);}
type ChangedKeysMap = Map<string, string[]>;
export async function initDB( dagWrite: dag.Write, headName: string,): Promise<Hash> { const w = new Write( dagWrite, new BTreeWrite(dagWrite), undefined, {type: MetaType.Snapshot, lastMutationID: 0, cookie: null}, new Map(), ); return await w.commit(headName);}
export function readIndexesForWrite( commit: Commit<CommitMeta>,): Map<string, IndexWrite> { const m = new Map(); for (const index of commit.indexes) { m.set(index.definition.name, new IndexWrite(index, undefined)); } return m;}
Version Info