MCPcopy Index your code
hub / github.com/nodejs/node / newReadableStreamFromStreamBase

Function newReadableStreamFromStreamBase

lib/internal/webstreams/adapters.js:1035–1101  ·  view source on GitHub ↗

* @param {StreamBase} streamBase * @param {QueuingStrategy} strategy * @returns {ReadableStream}

(streamBase, strategy, options = kEmptyObject)

Source from the content-addressed store, hash-verified

1033 * @returns {ReadableStream}
1034 */
1035function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyObject) {
1036 validateObject(streamBase, 'streamBase');
1037 validateObject(options, 'options');
1038
1039 const {
1040 ondone = () => {},
1041 } = options;
1042
1043 if (typeof streamBase.onread === 'function')
1044 throw new ERR_INVALID_STATE('StreamBase already has a consumer');
1045
1046 validateFunction(ondone, 'options.ondone');
1047
1048 let controller;
1049
1050 streamBase.onread = (arrayBuffer) => {
1051 const nread = streamBaseState[kReadBytesOrError];
1052
1053 if (nread === 0)
1054 return;
1055
1056 try {
1057 if (nread === UV_EOF) {
1058 controller.close();
1059 streamBase.readStop();
1060 try {
1061 ondone();
1062 } catch (error) {
1063 controller.error(error);
1064 }
1065 return;
1066 }
1067
1068 controller.enqueue(arrayBuffer);
1069
1070 if (controller.desiredSize <= 0)
1071 streamBase.readStop();
1072 } catch (error) {
1073 controller.error(error);
1074 streamBase.readStop();
1075 }
1076 };
1077
1078 return new ReadableStream({
1079 start(c) { controller = c; },
1080
1081 pull() {
1082 streamBase.readStart();
1083 },
1084
1085 cancel() {
1086 const promise = PromiseWithResolvers();
1087 try {
1088 ondone();
1089 } catch (error) {
1090 promise.reject(error);
1091 return promise.promise;
1092 }

Calls 4

closeMethod · 0.65
readStopMethod · 0.45
errorMethod · 0.45
enqueueMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…