MCPcopy Index your code
hub / github.com/slackapi/python-slack-sdk / _flush_buffer

Method _flush_buffer

slack_sdk/web/chat_stream.py:215–240  ·  view source on GitHub ↗

Flush the internal buffer with chunks by making appropriate API calls.

(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs)

Source from the content-addressed store, hash-verified

213 return response
214
215 def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> SlackResponse:
216 """Flush the internal buffer with chunks by making appropriate API calls."""
217 chunks_to_flush: List[Union[Dict, Chunk]] = []
218 if len(self._buffer) != 0:
219 chunks_to_flush.append(MarkdownTextChunk(text=self._buffer))
220 if chunks is not None:
221 chunks_to_flush.extend(chunks)
222 if not self._stream_ts:
223 response = self._client.chat_startStream(
224 **self._stream_args,
225 token=self._token,
226 **kwargs,
227 chunks=chunks_to_flush,
228 )
229 self._stream_ts = response.get("ts")
230 self._state = "in_progress"
231 else:
232 response = self._client.chat_appendStream(
233 token=self._token,
234 channel=self._stream_args["channel"],
235 ts=self._stream_ts,
236 **kwargs,
237 chunks=chunks_to_flush,
238 )
239 self._buffer = ""
240 return response

Callers 1

appendMethod · 0.95

Calls 5

MarkdownTextChunkClass · 0.90
appendMethod · 0.45
chat_startStreamMethod · 0.45
getMethod · 0.45
chat_appendStreamMethod · 0.45

Tested by

no test coverage detected