MCPcopy Index your code
hub / github.com/andywer/threads.js / createObservableForJob

Function createObservableForJob

src/master/invocation-proxy.ts:39–89  ·  view source on GitHub ↗
(worker: WorkerType, jobUID: number)

Source from the content-addressed store, hash-verified

37const isJobStartMessage = (data: any): data is WorkerJobStartMessage => data && data.type === WorkerMessageType.running
38
39function createObservableForJob<ResultType>(worker: WorkerType, jobUID: number): Observable<ResultType> {
40 return new Observable(observer => {
41 let asyncType: "observable" | "promise" | undefined
42
43 const messageHandler = ((event: MessageEvent) => {
44 debugMessages("Message from worker:", event.data)
45 if (!event.data || event.data.uid !== jobUID) return
46
47 if (isJobStartMessage(event.data)) {
48 asyncType = event.data.resultType
49 } else if (isJobResultMessage(event.data)) {
50 if (asyncType === "promise") {
51 if (typeof event.data.payload !== "undefined") {
52 observer.next(deserialize(event.data.payload))
53 }
54 observer.complete()
55 worker.removeEventListener("message", messageHandler)
56 } else {
57 if (event.data.payload) {
58 observer.next(deserialize(event.data.payload))
59 }
60 if (event.data.complete) {
61 observer.complete()
62 worker.removeEventListener("message", messageHandler)
63 }
64 }
65 } else if (isJobErrorMessage(event.data)) {
66 const error = deserialize(event.data.error as any)
67 if (asyncType === "promise" || !asyncType) {
68 observer.error(error)
69 } else {
70 observer.error(error)
71 }
72 worker.removeEventListener("message", messageHandler)
73 }
74 }) as EventListener
75
76 worker.addEventListener("message", messageHandler)
77
78 return () => {
79 if (asyncType === "observable" || !asyncType) {
80 const cancelMessage: MasterJobCancelMessage = {
81 type: MasterMessageType.cancel,
82 uid: jobUID
83 }
84 worker.postMessage(cancelMessage)
85 }
86 worker.removeEventListener("message", messageHandler)
87 }
88 })
89}
90
91function prepareArguments(rawArgs: any[]): { args: any[], transferables: Transferable[] } {
92 if (rawArgs.length === 0) {

Callers 1

createProxyFunctionFunction · 0.85

Calls 10

deserializeFunction · 0.90
isJobStartMessageFunction · 0.85
isJobResultMessageFunction · 0.85
isJobErrorMessageFunction · 0.85
removeEventListenerMethod · 0.65
addEventListenerMethod · 0.65
postMessageMethod · 0.65
nextMethod · 0.45
completeMethod · 0.45
errorMethod · 0.45

Tested by

no test coverage detected