297 lines
8.5 KiB
JavaScript
297 lines
8.5 KiB
JavaScript
import QuickLRU from 'quick-lru';
|
|
import { BaseSource } from './basesource.js';
|
|
import { AbortError, AggregateError, wait, zip } from '../utils.js';
|
|
|
|
class Block {
|
|
/**
|
|
*
|
|
* @param {number} offset
|
|
* @param {number} length
|
|
* @param {ArrayBuffer} [data]
|
|
*/
|
|
constructor(offset, length, data = null) {
|
|
this.offset = offset;
|
|
this.length = length;
|
|
this.data = data;
|
|
}
|
|
|
|
/**
|
|
* @returns {number} the top byte border
|
|
*/
|
|
get top() {
|
|
return this.offset + this.length;
|
|
}
|
|
}
|
|
|
|
class BlockGroup {
|
|
/**
|
|
*
|
|
* @param {number} offset
|
|
* @param {number} length
|
|
* @param {number[]} blockIds
|
|
*/
|
|
constructor(offset, length, blockIds) {
|
|
this.offset = offset;
|
|
this.length = length;
|
|
this.blockIds = blockIds;
|
|
}
|
|
}
|
|
|
|
export class BlockedSource extends BaseSource {
|
|
/**
|
|
*
|
|
* @param {BaseSource} source The underlying source that shall be blocked and cached
|
|
* @param {object} options
|
|
* @param {number} [options.blockSize]
|
|
* @param {number} [options.cacheSize]
|
|
*/
|
|
constructor(source, { blockSize = 65536, cacheSize = 100 } = {}) {
|
|
super();
|
|
this.source = source;
|
|
this.blockSize = blockSize;
|
|
|
|
this.blockCache = new QuickLRU({
|
|
maxSize: cacheSize,
|
|
onEviction: (blockId, block) => {
|
|
this.evictedBlocks.set(blockId, block);
|
|
},
|
|
});
|
|
|
|
/** @type {Map<number, Block>} */
|
|
this.evictedBlocks = new Map();
|
|
|
|
// mapping blockId -> Block instance
|
|
this.blockRequests = new Map();
|
|
|
|
// set of blockIds missing for the current requests
|
|
this.blockIdsToFetch = new Set();
|
|
|
|
this.abortedBlockIds = new Set();
|
|
}
|
|
|
|
get fileSize() {
|
|
return this.source.fileSize;
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {import("./basesource").Slice[]} slices
|
|
*/
|
|
async fetch(slices, signal) {
|
|
const blockRequests = [];
|
|
const missingBlockIds = [];
|
|
const allBlockIds = [];
|
|
this.evictedBlocks.clear();
|
|
|
|
for (const { offset, length } of slices) {
|
|
let top = offset + length;
|
|
|
|
const { fileSize } = this;
|
|
if (fileSize !== null) {
|
|
top = Math.min(top, fileSize);
|
|
}
|
|
|
|
const firstBlockOffset = Math.floor(offset / this.blockSize) * this.blockSize;
|
|
|
|
for (let current = firstBlockOffset; current < top; current += this.blockSize) {
|
|
const blockId = Math.floor(current / this.blockSize);
|
|
if (!this.blockCache.has(blockId) && !this.blockRequests.has(blockId)) {
|
|
this.blockIdsToFetch.add(blockId);
|
|
missingBlockIds.push(blockId);
|
|
}
|
|
if (this.blockRequests.has(blockId)) {
|
|
blockRequests.push(this.blockRequests.get(blockId));
|
|
}
|
|
allBlockIds.push(blockId);
|
|
}
|
|
}
|
|
|
|
// allow additional block requests to accumulate
|
|
await wait();
|
|
this.fetchBlocks(signal);
|
|
|
|
// Gather all of the new requests that this fetch call is contributing to `fetch`.
|
|
const missingRequests = [];
|
|
for (const blockId of missingBlockIds) {
|
|
// The requested missing block could already be in the cache
|
|
// instead of having its request still be outstanding.
|
|
if (this.blockRequests.has(blockId)) {
|
|
missingRequests.push(this.blockRequests.get(blockId));
|
|
}
|
|
}
|
|
|
|
// Actually await all pending requests that are needed for this `fetch`.
|
|
await Promise.allSettled(blockRequests);
|
|
await Promise.allSettled(missingRequests);
|
|
|
|
// Perform retries if a block was interrupted by a previous signal
|
|
const abortedBlockRequests = [];
|
|
const abortedBlockIds = allBlockIds
|
|
.filter((id) => this.abortedBlockIds.has(id) || !this.blockCache.has(id));
|
|
abortedBlockIds.forEach((id) => this.blockIdsToFetch.add(id));
|
|
// start the retry of some blocks if required
|
|
if (abortedBlockIds.length > 0 && signal && !signal.aborted) {
|
|
this.fetchBlocks(null);
|
|
for (const blockId of abortedBlockIds) {
|
|
const block = this.blockRequests.get(blockId);
|
|
if (!block) {
|
|
throw new Error(`Block ${blockId} is not in the block requests`);
|
|
}
|
|
abortedBlockRequests.push(block);
|
|
}
|
|
await Promise.allSettled(abortedBlockRequests);
|
|
}
|
|
|
|
// throw an abort error
|
|
if (signal && signal.aborted) {
|
|
throw new AbortError('Request was aborted');
|
|
}
|
|
|
|
const blocks = allBlockIds.map((id) => this.blockCache.get(id) || this.evictedBlocks.get(id));
|
|
const failedBlocks = blocks.filter((i) => !i);
|
|
if (failedBlocks.length) {
|
|
throw new AggregateError(failedBlocks, 'Request failed');
|
|
}
|
|
|
|
// create a final Map, with all required blocks for this request to satisfy
|
|
const requiredBlocks = new Map(zip(allBlockIds, blocks));
|
|
|
|
// TODO: satisfy each slice
|
|
return this.readSliceData(slices, requiredBlocks);
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {AbortSignal} signal
|
|
*/
|
|
fetchBlocks(signal) {
|
|
// check if we still need to
|
|
if (this.blockIdsToFetch.size > 0) {
|
|
const groups = this.groupBlocks(this.blockIdsToFetch);
|
|
|
|
// start requesting slices of data
|
|
const groupRequests = this.source.fetch(groups, signal);
|
|
|
|
for (let groupIndex = 0; groupIndex < groups.length; ++groupIndex) {
|
|
const group = groups[groupIndex];
|
|
|
|
for (const blockId of group.blockIds) {
|
|
// make an async IIFE for each block
|
|
this.blockRequests.set(blockId, (async () => {
|
|
try {
|
|
const response = (await groupRequests)[groupIndex];
|
|
const blockOffset = blockId * this.blockSize;
|
|
const o = blockOffset - response.offset;
|
|
const t = Math.min(o + this.blockSize, response.data.byteLength);
|
|
const data = response.data.slice(o, t);
|
|
const block = new Block(
|
|
blockOffset,
|
|
data.byteLength,
|
|
data,
|
|
blockId,
|
|
);
|
|
this.blockCache.set(blockId, block);
|
|
this.abortedBlockIds.delete(blockId);
|
|
} catch (err) {
|
|
if (err.name === 'AbortError') {
|
|
// store the signal here, we need it to determine later if an
|
|
// error was caused by this signal
|
|
err.signal = signal;
|
|
this.blockCache.delete(blockId);
|
|
this.abortedBlockIds.add(blockId);
|
|
} else {
|
|
throw err;
|
|
}
|
|
} finally {
|
|
this.blockRequests.delete(blockId);
|
|
}
|
|
})());
|
|
}
|
|
}
|
|
this.blockIdsToFetch.clear();
|
|
}
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {Set} blockIds
|
|
* @returns {BlockGroup[]}
|
|
*/
|
|
groupBlocks(blockIds) {
|
|
const sortedBlockIds = Array.from(blockIds).sort((a, b) => a - b);
|
|
if (sortedBlockIds.length === 0) {
|
|
return [];
|
|
}
|
|
let current = [];
|
|
let lastBlockId = null;
|
|
const groups = [];
|
|
|
|
for (const blockId of sortedBlockIds) {
|
|
if (lastBlockId === null || lastBlockId + 1 === blockId) {
|
|
current.push(blockId);
|
|
lastBlockId = blockId;
|
|
} else {
|
|
groups.push(new BlockGroup(
|
|
current[0] * this.blockSize,
|
|
current.length * this.blockSize,
|
|
current,
|
|
));
|
|
current = [blockId];
|
|
lastBlockId = blockId;
|
|
}
|
|
}
|
|
|
|
groups.push(new BlockGroup(
|
|
current[0] * this.blockSize,
|
|
current.length * this.blockSize,
|
|
current,
|
|
));
|
|
|
|
return groups;
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {import("./basesource").Slice[]} slices
|
|
* @param {Map} blocks
|
|
*/
|
|
readSliceData(slices, blocks) {
|
|
return slices.map((slice) => {
|
|
let top = slice.offset + slice.length;
|
|
if (this.fileSize !== null) {
|
|
top = Math.min(this.fileSize, top);
|
|
}
|
|
const blockIdLow = Math.floor(slice.offset / this.blockSize);
|
|
const blockIdHigh = Math.floor(top / this.blockSize);
|
|
const sliceData = new ArrayBuffer(slice.length);
|
|
const sliceView = new Uint8Array(sliceData);
|
|
|
|
for (let blockId = blockIdLow; blockId <= blockIdHigh; ++blockId) {
|
|
const block = blocks.get(blockId);
|
|
const delta = block.offset - slice.offset;
|
|
const topDelta = block.top - top;
|
|
let blockInnerOffset = 0;
|
|
let rangeInnerOffset = 0;
|
|
let usedBlockLength;
|
|
|
|
if (delta < 0) {
|
|
blockInnerOffset = -delta;
|
|
} else if (delta > 0) {
|
|
rangeInnerOffset = delta;
|
|
}
|
|
|
|
if (topDelta < 0) {
|
|
usedBlockLength = block.length - blockInnerOffset;
|
|
} else {
|
|
usedBlockLength = top - block.offset - blockInnerOffset;
|
|
}
|
|
|
|
const blockView = new Uint8Array(block.data, blockInnerOffset, usedBlockLength);
|
|
sliceView.set(blockView, rangeInnerOffset);
|
|
}
|
|
|
|
return sliceData;
|
|
});
|
|
}
|
|
}
|