MCPcopy Index your code
hub / github.com/ReactiveX/RxJava

github.com/ReactiveX/RxJava @v3.1.12 sqlite

repository ↗ · DeepWiki ↗ · release v3.1.12 ↗
28,315 symbols 202,141 edges 1,884 files 2,415 documented · 9% 6 cross-repo links
README

RxJava: Reactive Extensions for the JVM

codecov.io Maven Central Contribute with Gitpod OpenSSF Scorecard

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Version 3.x (Javadoc)

  • Single dependency: Reactive-Streams.
  • Java 8+ or Android API 21+ required.
  • Java 8 lambda-friendly API.
  • Android desugar friendly.
  • Fixed API mistakes and many limits of RxJava 2.
  • Intended to be a replacement for RxJava 2 with relatively few binary incompatible changes.
  • Non-opinionated about the source of concurrency (threads, pools, event loops, fibers, actors, etc.).
  • Async or synchronous execution.
  • Virtual time and schedulers for parameterized concurrency.
  • Test and diagnostic support via test schedulers, test consumers and plugin hooks.
  • Interop with newer JDK versions via 3rd party libraries, such as
  • Java 9 Flow API
  • Java 21 Virtual Threads

Learn more about RxJava in general on the Wiki Home.

:information_source: Please read the What's different in 3.0 for details on the changes and migration information when upgrading from 2.x.

Version 2.x

The 2.x version is end-of-life as of February 28, 2021. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 2.2.21, will remain accessible.

Version 1.x

The 1.x version is end-of-life as of March 31, 2018. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 1.3.8, will remain accessible.

Getting started

Setting up the dependency

The first step is to include RxJava 3 into your project, for example, as a Gradle compile dependency:

implementation "io.reactivex.rxjava3:rxjava:3.x.y"

(Please replace x and y with the latest version numbers: Maven Central )

Hello World

The second is to write the Hello World program:

package rxjava.examples;

import io.reactivex.rxjava3.core.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

Note that RxJava 3 components now live under io.reactivex.rxjava3 and the base classes and interfaces live under io.reactivex.rxjava3.core.

Base classes

RxJava 3 features several base classes you can discover operators on:

Some terminology

Upstream, downstream

The dataflows in RxJava consist of a source, zero or more intermediate steps followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means):

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

Here, if we imagine ourselves on operator2, looking to the left towards the source is called the upstream. Looking to the right towards the subscriber/consumer is called the downstream. This is often more apparent when each element is written on a separate line:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

Objects in motion

In RxJava's documentation, emission, emits, item, event, signal, data and message are considered synonyms and represent the object traveling along the dataflow.

Backpressure

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated to the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they; there is always room to store one item temporarily.

Assembly time

The preparation of dataflows by applying various intermediate operators happens in the so-called assembly time:

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

At this point, the data is not flowing yet and no side-effects are happening.

Subscription time

This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:

flow.subscribe(System.out::println)
````

This is when the **subscription side-effects** are triggered (see `doOnSubscribe`). Some sources block or start emitting items right away in this state.

#### Runtime

This is the state when the flows are actively emitting items, errors or completion signals:

```java

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Practically, this is when the body of the given example above executes.

Simple background computation

One of the common use cases for RxJava is to run some computation, network request on a background thread and show the results (or error) on the UI thread:

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior. To illustrate, the example can be rewritten as follows:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

Typically, you can move computations or blocking IO to some other thread via subscribeOn. Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.

Schedulers

RxJava operators don't work with Threads or ExecutorServices directly but with so-called Schedulers that abstract away sources of concurrency behind a uniform API. RxJava 3 features several standard schedulers accessible via Schedulers utility class.

  • Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operators use this as their default Scheduler.
  • Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
  • Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
  • Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.

These are available on all JVM platforms but some specific platforms, such as Android, have their own typical Schedulers defined: AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXScheduler.platform().

In addition, there is an option to wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler via Schedulers.from(Executor). This can be used, for example, to have a larger but still fixed pool of threads (unlike computation() and io() respectively).

The Thread.sleep(2000); at the end is no accident. In RxJava the default Schedulers run on daemon threads, which means once the Java main thread exits, they all get stopped and background computations may never happen. Sleeping for some time in this example situations lets you see the output of the flow on the console with time to spare.

Concurrency within a flow

Flows in RxJava are sequential in nature split into processing stages that may run concurrently with each other:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

This example flow squares the numbers from 1 to 10 on the computation Scheduler and consumes the results on the "main" thread (more precisely, the caller thread of blockingSubscribe). However, the lambda v -> v * v doesn't run in parallel for this flow; it receives the values 1 to 10 on the same computation thread one after the other.

Parallel processing

Processing the numbers 1 to 10 in parallel is a bit more involved:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

Practically, parallelism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this by first mapping each number from 1 to 10 into its own individual Flowable, runs them and merges the computed squares.

Note, however, that flatMap doesn't guarantee any order and the items from the inner flows may end up interleaved. There are alternative operators:

  • concatMap that maps and runs one inner flow at a time and
  • concatMapEager which runs all inner flows "at once" but the output flow will be in the order those inner flows were created.

Alternatively, the Flowable.parallel() operator and the ParallelFlowable type help achieve the same parallel processing pattern:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable, we'd like to call another service with values emitted by the first service:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various o

Extension points exported contracts — how you extend this code

ScalarSupplier (Interface)
A marker interface indicating that a scalar, constant value is held by the implementing reactive type which can be safel [118 …
src/main/java/io/reactivex/rxjava3/operators/ScalarSupplier.java
ConditionalSubscriber (Interface)
A FlowableSubscriber with an additional #tryOnNext(Object) method that tells the caller the specified va [25 implementers]
src/main/java/io/reactivex/rxjava3/operators/ConditionalSubscriber.java
QueueFuseable (Interface)
Represents a SimpleQueue plus the means and constants for requesting a fusion mode. @param the value type re [62 implementers]
src/main/java/io/reactivex/rxjava3/operators/QueueFuseable.java
SimplePlainQueue (Interface)
Override of the SimpleQueue interface with no throws Throwable on poll(). @param the value [70 implementers]
src/main/java/io/reactivex/rxjava3/operators/SimplePlainQueue.java
SimpleQueue (Interface)
A simplified interface for offering, polling and clearing a queue. This interface does not define most of the {@link [18 …
src/main/java/io/reactivex/rxjava3/operators/SimpleQueue.java

Core symbols most depended-on inside this repo

onNext
called by 7397
src/main/java/io/reactivex/rxjava3/core/Emitter.java
subscribe
called by 4567
src/main/java/io/reactivex/rxjava3/core/MaybeSource.java
test
called by 4515
src/main/java/io/reactivex/rxjava3/functions/Predicate.java
get
called by 3861
src/main/java/io/reactivex/rxjava3/functions/Supplier.java
onComplete
called by 3597
src/main/java/io/reactivex/rxjava3/core/Emitter.java
onError
called by 3262
src/main/java/io/reactivex/rxjava3/core/Emitter.java
just
called by 1817
src/main/java/io/reactivex/rxjava3/core/Flowable.java
onSubscribe
called by 1780
src/main/java/io/reactivex/rxjava3/core/Observer.java

Shape

Method 25,269
Class 2,910
Interface 101
Enum 35

Languages

Java100%

Modules by API surface

src/test/java/io/reactivex/rxjava3/completable/CompletableTest.java293 symbols
src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java241 symbols
src/main/java/io/reactivex/rxjava3/core/Flowable.java222 symbols
src/main/java/io/reactivex/rxjava3/core/Observable.java214 symbols
src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java145 symbols
src/main/java/io/reactivex/rxjava3/internal/functions/Functions.java136 symbols
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferTest.java134 symbols
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java126 symbols
src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java126 symbols
src/main/java/io/reactivex/rxjava3/core/Maybe.java120 symbols
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java119 symbols
src/main/java/io/reactivex/rxjava3/core/Single.java112 symbols

For agents

$ claude mcp add RxJava \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact