(options: {
key: string
context: StorageContext
contentType: string
})
| 259 | * buffered and written via a single {@link uploadFile} on `complete`. |
| 260 | */ |
| 261 | export async function createMultipartUpload(options: { |
| 262 | key: string |
| 263 | context: StorageContext |
| 264 | contentType: string |
| 265 | }): Promise<MultipartUploadHandle> { |
| 266 | const { key, context, contentType } = options |
| 267 | const config = getStorageConfig(context) |
| 268 | const cloud = hasCloudStorage() |
| 269 | |
| 270 | let backend: MultipartBackend | null = null |
| 271 | // Accumulate writes as references, not a growing buffer — concatenating only when a part fills |
| 272 | // (or on complete) keeps total copying ~O(bytes) instead of O(bytes × writes). |
| 273 | let pendingChunks: Buffer[] = [] |
| 274 | let pendingBytes = 0 |
| 275 | let totalBytes = 0 |
| 276 | let partNumber = 0 |
| 277 | let aborted = false |
| 278 | let firstError: unknown |
| 279 | const inflight = new Set<Promise<void>>() |
| 280 | |
| 281 | /** Merge the accumulated chunks into one ArrayBuffer-backed buffer (which `uploadFile` expects). */ |
| 282 | const drainPending = (): Buffer<ArrayBuffer> => Buffer.concat(pendingChunks, pendingBytes) |
| 283 | |
| 284 | const ensureBackend = async (): Promise<MultipartBackend> => { |
| 285 | if (!backend) { |
| 286 | backend = USE_BLOB_STORAGE |
| 287 | ? await createBlobBackend(key, createBlobConfig(config), contentType) |
| 288 | : await createS3Backend(key, createS3Config(config), contentType, context) |
| 289 | } |
| 290 | return backend |
| 291 | } |
| 292 | |
| 293 | const dispatchPart = async (body: Buffer): Promise<void> => { |
| 294 | // Bound concurrency: wait for a free slot before starting another part. |
| 295 | while (inflight.size >= MULTIPART_MAX_INFLIGHT) await Promise.race(inflight) |
| 296 | if (firstError) throw firstError |
| 297 | const be = await ensureBackend() |
| 298 | const partNo = ++partNumber |
| 299 | const p = be |
| 300 | .uploadPart(partNo, body) |
| 301 | .catch((err) => { |
| 302 | firstError ??= err |
| 303 | }) |
| 304 | .finally(() => { |
| 305 | inflight.delete(p) |
| 306 | }) |
| 307 | inflight.add(p) |
| 308 | } |
| 309 | |
| 310 | const abort = async (): Promise<void> => { |
| 311 | aborted = true |
| 312 | await Promise.allSettled(inflight) |
| 313 | if (backend) await backend.abort().catch(() => {}) |
| 314 | } |
| 315 | |
| 316 | return { |
| 317 | async write(chunk) { |
| 318 | if (aborted) throw new Error('Multipart upload already aborted') |
no test coverage detected