MCPcopy
hub / github.com/Effect-TS/effect / SubscriptionRefImpl

Class SubscriptionRefImpl

packages/effect/src/internal/subscriptionRef.ts:29–81  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

27
28/** @internal */
29class SubscriptionRefImpl<in out A> extends Effectable.Class<A> implements SubscriptionRef.SubscriptionRef<A> {
30 readonly [Readable.TypeId]: Readable.TypeId = Readable.TypeId
31 readonly [Subscribable.TypeId]: Subscribable.TypeId = Subscribable.TypeId
32 readonly [Ref.RefTypeId] = ref_.refVariance
33 readonly [Synchronized.SynchronizedRefTypeId] = circular_.synchronizedVariance
34 readonly [SubscriptionRefTypeId] = subscriptionRefVariance
35 constructor(
36 readonly ref: Ref.Ref<A>,
37 readonly pubsub: PubSub.PubSub<A>,
38 readonly semaphore: Effect.Semaphore
39 ) {
40 super()
41 this.get = Ref.get(this.ref)
42 }
43 commit() {
44 return this.get
45 }
46 readonly get: Effect.Effect<A>
47 get changes(): Stream<A> {
48 return pipe(
49 Ref.get(this.ref),
50 Effect.flatMap((a) =>
51 Effect.map(
52 stream.fromPubSub(this.pubsub, { scoped: true }),
53 (s) =>
54 stream.concat(
55 stream.make(a),
56 s
57 )
58 )
59 ),
60 this.semaphore.withPermits(1),
61 stream.unwrapScoped
62 )
63 }
64 modify<B>(f: (a: A) => readonly [B, A]): Effect.Effect<B> {
65 return this.modifyEffect((a) => Effect.succeed(f(a)))
66 }
67 modifyEffect<B, E, R>(f: (a: A) => Effect.Effect<readonly [B, A], E, R>): Effect.Effect<B, E, R> {
68 return pipe(
69 Ref.get(this.ref),
70 Effect.flatMap(f),
71 Effect.flatMap(([b, a]) =>
72 pipe(
73 Ref.set(this.ref, a),
74 Effect.as(b),
75 Effect.zipLeft(PubSub.publish(this.pubsub, a))
76 )
77 ),
78 this.semaphore.withPermits(1)
79 )
80 }
81}
82
83/** @internal */
84export const get = <A>(self: SubscriptionRef.SubscriptionRef<A>): Effect.Effect<A> => Ref.get(self.ref)

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected