* @param {StreamBase} streamBase * @param {QueuingStrategy} strategy * @returns {ReadableStream}
(streamBase, strategy, options = kEmptyObject)
| 1033 | * @returns {ReadableStream} |
| 1034 | */ |
| 1035 | function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyObject) { |
| 1036 | validateObject(streamBase, 'streamBase'); |
| 1037 | validateObject(options, 'options'); |
| 1038 | |
| 1039 | const { |
| 1040 | ondone = () => {}, |
| 1041 | } = options; |
| 1042 | |
| 1043 | if (typeof streamBase.onread === 'function') |
| 1044 | throw new ERR_INVALID_STATE('StreamBase already has a consumer'); |
| 1045 | |
| 1046 | validateFunction(ondone, 'options.ondone'); |
| 1047 | |
| 1048 | let controller; |
| 1049 | |
| 1050 | streamBase.onread = (arrayBuffer) => { |
| 1051 | const nread = streamBaseState[kReadBytesOrError]; |
| 1052 | |
| 1053 | if (nread === 0) |
| 1054 | return; |
| 1055 | |
| 1056 | try { |
| 1057 | if (nread === UV_EOF) { |
| 1058 | controller.close(); |
| 1059 | streamBase.readStop(); |
| 1060 | try { |
| 1061 | ondone(); |
| 1062 | } catch (error) { |
| 1063 | controller.error(error); |
| 1064 | } |
| 1065 | return; |
| 1066 | } |
| 1067 | |
| 1068 | controller.enqueue(arrayBuffer); |
| 1069 | |
| 1070 | if (controller.desiredSize <= 0) |
| 1071 | streamBase.readStop(); |
| 1072 | } catch (error) { |
| 1073 | controller.error(error); |
| 1074 | streamBase.readStop(); |
| 1075 | } |
| 1076 | }; |
| 1077 | |
| 1078 | return new ReadableStream({ |
| 1079 | start(c) { controller = c; }, |
| 1080 | |
| 1081 | pull() { |
| 1082 | streamBase.readStart(); |
| 1083 | }, |
| 1084 | |
| 1085 | cancel() { |
| 1086 | const promise = PromiseWithResolvers(); |
| 1087 | try { |
| 1088 | ondone(); |
| 1089 | } catch (error) { |
| 1090 | promise.reject(error); |
| 1091 | return promise.promise; |
| 1092 | } |
no test coverage detected
searching dependent graphs…