* Return a stream handler * * @param {object} callbacks A group of callbacks to call when needed * * @returns {Promise } The stream manager *
(callbacks)
| 250 | * |
| 251 | */ |
| 252 | async get(callbacks) { |
| 253 | let self = this; |
| 254 | |
| 255 | if (this.streamHandler) { |
| 256 | return this.streamHandler; |
| 257 | } |
| 258 | |
| 259 | callbacks.connecting(); |
| 260 | |
| 261 | let streamPaused = false, |
| 262 | currentReceived = 0, |
| 263 | currentUnpacked = 0; |
| 264 | |
| 265 | const shouldPause = () => { |
| 266 | return currentReceived > currentUnpacked; |
| 267 | }; |
| 268 | |
| 269 | try { |
| 270 | let conn = await this.dial.dial({ |
| 271 | inbound(data) { |
| 272 | currentReceived += data.size; |
| 273 | |
| 274 | callbacks.traffic(data.size, 0); |
| 275 | }, |
| 276 | inboundUnpacked(data) { |
| 277 | currentUnpacked += data.length; |
| 278 | |
| 279 | if (currentUnpacked >= currentReceived) { |
| 280 | currentUnpacked = 0; |
| 281 | currentReceived = 0; |
| 282 | } |
| 283 | |
| 284 | if (self.streamHandler !== null) { |
| 285 | if (streamPaused && !shouldPause()) { |
| 286 | streamPaused = false; |
| 287 | self.streamHandler.resume(); |
| 288 | |
| 289 | return; |
| 290 | } else if (!streamPaused && shouldPause()) { |
| 291 | streamPaused = true; |
| 292 | self.streamHandler.pause(); |
| 293 | |
| 294 | return; |
| 295 | } |
| 296 | } |
| 297 | }, |
| 298 | outbound(data) { |
| 299 | callbacks.traffic(0, data.length); |
| 300 | } |
| 301 | }); |
| 302 | |
| 303 | let streamHandler = new streams.Streams(conn.reader, conn.sender, { |
| 304 | echoInterval: self.echoInterval, |
| 305 | echoUpdater(delay) { |
| 306 | return callbacks.echo(delay); |
| 307 | }, |
| 308 | cleared(e) { |
| 309 | if (self.streamHandler === null) { |