MCPcopy
hub / github.com/simstudioai/sim / createMultipartUpload

Function createMultipartUpload

apps/sim/lib/uploads/core/storage-service.ts:261–361  ·  view source on GitHub ↗
(options: {
  key: string
  context: StorageContext
  contentType: string
})

Source from the content-addressed store, hash-verified

259 * buffered and written via a single {@link uploadFile} on `complete`.
260 */
261export async function createMultipartUpload(options: {
262 key: string
263 context: StorageContext
264 contentType: string
265}): Promise<MultipartUploadHandle> {
266 const { key, context, contentType } = options
267 const config = getStorageConfig(context)
268 const cloud = hasCloudStorage()
269
270 let backend: MultipartBackend | null = null
271 // Accumulate writes as references, not a growing buffer — concatenating only when a part fills
272 // (or on complete) keeps total copying ~O(bytes) instead of O(bytes × writes).
273 let pendingChunks: Buffer[] = []
274 let pendingBytes = 0
275 let totalBytes = 0
276 let partNumber = 0
277 let aborted = false
278 let firstError: unknown
279 const inflight = new Set<Promise<void>>()
280
281 /** Merge the accumulated chunks into one ArrayBuffer-backed buffer (which `uploadFile` expects). */
282 const drainPending = (): Buffer<ArrayBuffer> => Buffer.concat(pendingChunks, pendingBytes)
283
284 const ensureBackend = async (): Promise<MultipartBackend> => {
285 if (!backend) {
286 backend = USE_BLOB_STORAGE
287 ? await createBlobBackend(key, createBlobConfig(config), contentType)
288 : await createS3Backend(key, createS3Config(config), contentType, context)
289 }
290 return backend
291 }
292
293 const dispatchPart = async (body: Buffer): Promise<void> => {
294 // Bound concurrency: wait for a free slot before starting another part.
295 while (inflight.size >= MULTIPART_MAX_INFLIGHT) await Promise.race(inflight)
296 if (firstError) throw firstError
297 const be = await ensureBackend()
298 const partNo = ++partNumber
299 const p = be
300 .uploadPart(partNo, body)
301 .catch((err) => {
302 firstError ??= err
303 })
304 .finally(() => {
305 inflight.delete(p)
306 })
307 inflight.add(p)
308 }
309
310 const abort = async (): Promise<void> => {
311 aborted = true
312 await Promise.allSettled(inflight)
313 if (backend) await backend.abort().catch(() => {})
314 }
315
316 return {
317 async write(chunk) {
318 if (aborted) throw new Error('Multipart upload already aborted')

Callers 3

materializeFunction · 0.90
runTableExportFunction · 0.90

Calls 2

getStorageConfigFunction · 0.90
hasCloudStorageFunction · 0.85

Tested by

no test coverage detected