Skip navigation links

Package io.vertx.rxjava.core

= Vert.x RxJava :toc: left == Vert.x API for RxJava https://github.com/ReactiveX/RxJava[RxJava] is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM.

See: Description

Package io.vertx.rxjava.core Description

= Vert.x RxJava :toc: left == Vert.x API for RxJava https://github.com/ReactiveX/RxJava[RxJava] is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM. Vert.x integrates naturally with RxJava, allowing to use observable wherever you can use streams or asynchronous results. === Using Vert.x API for RxJava1 To use Vert.x API for RxJava1, add the following dependency to the _dependencies_ section of your build descriptor: * Maven (in your `pom.xml`): [source,xml,subs="+attributes"] ---- ${maven.groupId} ${maven.artifactId} ${maven.version} ---- * Gradle (in your `build.gradle` file): [source,groovy,subs="+attributes"] ---- compile '${maven.groupId}:${maven.artifactId}:${maven.version}' ---- There are two ways for using the RxJava API with Vert.x: - via the original Vert.x API with the RxHelper helper class that provides static methods for converting objects between Vert.x core API and RxJava API. - via the _Rxified_ Vert.x API enhancing the core Vert.x API. === Read stream support RxJava `Observable` is a perfect match for Vert.x `ReadStream` class : both provide a flow of items. The RxHelper.toObservable(io.vertx.core.streams.ReadStream) static methods convert a Vert.x read stream to an `rx.Observable`: [source,java] ---- examples.NativeExamples#toObservable ---- The _Rxified_ Vert.x API provides a io.vertx.rxjava.core.streams.ReadStream#toObservable() method on io.vertx.rxjava.core.streams.ReadStream: [source,java] ---- examples.RxifiedExamples#toObservable ---- Such observables are *hot* observables, i.e. they will produce notifications regardless of subscriptions because a `ReadStream` can potentially emit items spontaneously or not, depending on the implementation: At subscription time, the adapter calls ReadStream.handler(io.vertx.core.Handler) to set its own handler. Some `ReadStream` implementations can start to emit events after this call, others will emit events whether an handler is set or not: - `AsyncFile` produces buffer events after the handler is set - `HttpServerRequest` produces events independantly of the handler (i.e buffer may be lost if no handler is set) In both cases, subscribing to the `Observable` in the same call is safe because the event loop or the worker verticles cannot be called concurrently, so the subscription will always happens before the handler starts emitting data. When you need to delay the subscription, you need to `pause` the `ReadStream` and then `resume` it, which is what you would do with a `ReadStream`. [source,java] ---- examples.RxifiedExamples#delayToObservable ---- Likewise it is possible to turn an existing `Observable` into a Vert.x `ReadStream`. The RxHelper.toReadStream(rx.Observable) static methods convert an `rx.Observable` to a Vert.x read stream: [source,java] ---- examples.NativeExamples#toReadStream ---- === Handler support The RxHelper can create an ObservableHandler: an `Observable` with a ObservableHandler.toHandler() method returning an `Handler` implementation: [source,java] ---- examples.NativeExamples#observableHandler(io.vertx.core.Vertx) ---- The _Rxified_ Vert.x API does not provide a specific API for handler. === Async result support You can create an RxJava `Subscriber` from an existing Vert.x `Handler>` and subscribe it to an `Observable` or a `Single`: [source,java] ---- examples.NativeExamples#handlerToSubscriber ---- The Vert.x `Handler>` construct occuring as last parameter of an asynchronous method can be mapped to an observable of a single element: - when the callback is a success, the observer `onNext` method is called with the item and the `onComplete` method is immediately invoked after - when the callback is a failure, the observer `onError` method is called The RxHelper.observableFuture() method creates an ObservableFuture: an `Observable` with a ObservableFuture.toHandler() method returning a `Handler>` implementation: [source,java] ---- examples.NativeExamples#observableFuture(io.vertx.core.Vertx) ---- The `ObservableFuture` will get a single `HttpServer` object, if the `listen` operation fails, the subscriber will be notified with the failure. The RxHelper.toHandler(rx.Observer) method adapts an existing `Observer` into an handler: [source,java] ---- examples.NativeExamples#observableToHandler() ---- It also works with just actions: [source,java] ---- examples.NativeExamples#actionsToHandler() ---- The _Rxified_ Vert.x API duplicates each such method with the `rx` prefix that returns an RxJava Single: [source,java] ---- examples.RxifiedExamples#single(io.vertx.rxjava.core.Vertx) ---- Such single are *cold* singles, and the corresponding API method is called on subscribe. NOTE: the `rx*` methods replace the `*Observable` of the previous _Rxified_ versions with a semantic change to be more in line with RxJava. === Scheduler support The reactive extension sometimes needs to schedule actions, for instance `Observable#timer` creates and returns a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the timer threads are not Vert.x threads and therefore not executing in a Vert.x event loop. When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra `rx.Scheduler`, the RxHelper.scheduler(io.vertx.core.Vertx) method will return a scheduler that can be used in such places. [source,java] ---- examples.NativeExamples#scheduler(io.vertx.core.Vertx) ---- For blocking scheduled actions, a scheduler can be created with the RxHelper.blockingScheduler(io.vertx.core.Vertx) method: [source,java] ---- examples.NativeExamples#blockingScheduler ---- RxJava can also be reconfigured to use the Vert.x scheduler, thanks to the scheduler hook created with RxHelper.schedulerHook(io.vertx.core.Vertx), the returned scheduler hook uses a blocking scheduler for IO actions: [source,java] ---- examples.NativeExamples#schedulerHook(io.vertx.core.Vertx) ---- The _Rxified_ Vert.x API provides also similar method on the RxHelper class: [source,java] ---- examples.RxifiedExamples#scheduler(io.vertx.rxjava.core.Vertx) ---- [source,java] ---- examples.RxifiedExamples#schedulerHook(io.vertx.rxjava.core.Vertx) ---- It is also possible to create a scheduler backed by a named worker pool. This can be useful if you want to re-use the specific thread pool for scheduling blocking actions: [source,$lang] ---- examples.RxifiedExamples#scheduler(io.vertx.rxjava.core.WorkerExecutor) ---- === Json unmarshalling The RxHelper.unmarshaller(java.lang.Class) creates an `rx.Observable.Operator` that transforms an `Observable` in json format into an object observable: [source,java] ---- examples.NativeExamples#unmarshaller(io.vertx.core.file.FileSystem) ---- The same can be done with the _Rxified_ helper: [source,java] ---- examples.RxifiedExamples#unmarshaller(io.vertx.rxjava.core.file.FileSystem) ---- === Deploying a Verticle The Rxified API cannot deploy an existing Verticle instance, the helper RxHelper.observableFuture() method provides a solution to that. The io.vertx.rxjava.core.RxHelper#deployVerticle(io.vertx.rxjava.core.Vertx, io.vertx.core.Verticle) does it automatically for you, it deploys a `Verticle` and returns an `Observable` of the deployment ID. [source,java] ---- examples.RxifiedExamples#deployVerticle ---- === HttpClient GET on subscription The io.vertx.rxjava.core.RxHelper#get(io.vertx.rxjava.core.http.HttpClient, int, java.lang.String, java.lang.String) is a convenient helper method that performs an HTTP GET upon subscription: [source,java] ---- examples.RxifiedExamples#get ---- WARNING: this API is different from the HttpClient that performs the GET request when the method is called and returns a one shot `Observable`. = Rxified API The _Rxified_ API is a code generated version of the Vert.x API, just like the _JavaScript_ or _Groovy_ language. The API uses the `io.vertx.rxjava` prefix, for instance the `io.vertx.core.Vertx` class is translated to the io.vertx.rxjava.core.Vertx class. === Embedding Rxfified Vert.x Just use the io.vertx.rxjava.core.Vertx#vertx() methods: [source,java] ---- examples.RxifiedExamples#embedded() ---- === As a Verticle Extend the AbstractVerticle class, it will wrap it for you: [source,java] ---- examples.RxifiedExamples#verticle() ---- Deploying an RxJava verticle is still performed by the Java deployer and does not need a specified deployer. == Api examples Let's study now a few examples of using Vert.x with RxJava. === EventBus message stream The event bus io.vertx.rxjava.core.eventbus.MessageConsumer provides naturally an `Observable>`: [source,java] ---- examples.RxifiedExamples#eventBusMessages(io.vertx.rxjava.core.Vertx) ---- The io.vertx.rxjava.core.eventbus.MessageConsumer provides a stream of io.vertx.rxjava.core.eventbus.Message. The io.vertx.rxjava.core.eventbus.Message#body() gives access to a new stream of message bodies if needed: [source,java] ---- examples.RxifiedExamples#eventBusBodies(io.vertx.rxjava.core.Vertx) ---- RxJava map/reduce composition style can then be used: [source,java] ---- examples.RxifiedExamples#eventBusMapReduce(io.vertx.rxjava.core.Vertx) ---- === Timers Timer task can be created with io.vertx.rxjava.core.Vertx#timerStream(long): [source,java] ---- examples.RxifiedExamples#timer(io.vertx.rxjava.core.Vertx) ---- Periodic task can be created with io.vertx.rxjava.core.Vertx#periodicStream(long): [source,java] ---- examples.RxifiedExamples#periodic(io.vertx.rxjava.core.Vertx) ---- The observable can be cancelled with an unsubscription: [source,java] ---- examples.RxifiedExamples#periodicUnsubscribe(io.vertx.rxjava.core.Vertx) ---- === Http client requests io.vertx.rxjava.core.http.HttpClientRequest#toObservable() provides a one shot callback with the HttpClientResponse object. The observable reports a request failure. [source,java] ---- examples.RxifiedExamples#httpClientRequest(io.vertx.rxjava.core.Vertx) ---- The response can be processed as an `Observable` with the io.vertx.rxjava.core.http.HttpClientResponse#toObservable() method: [source,java] ---- examples.RxifiedExamples#httpClientResponse(io.vertx.rxjava.core.http.HttpClientRequest) ---- The same flow can be achieved with the `flatMap` operation: [source,java] ---- examples.RxifiedExamples#httpClientResponseFlatMap(io.vertx.rxjava.core.http.HttpClientRequest) ---- We can also unmarshall the `Observable` into an object using the RxHelper.unmarshaller(java.lang.Class) static method. This method creates an `Rx.Observable.Operator` unmarshalling buffers to an object: [source,java] ---- examples.RxifiedExamples#httpClientResponseFlatMapUnmarshall(io.vertx.rxjava.core.http.HttpClientRequest) ---- === Http server requests The io.vertx.rxjava.core.http.HttpServer#requestStream() provides a callback for each incoming request: [source,java] ---- examples.RxifiedExamples#httpServerRequest ---- The HttpServerRequest can then be adapted to an `Observable`: [source,java] ---- examples.RxifiedExamples#httpServerRequestObservable(io.vertx.rxjava.core.http.HttpServer) ---- The RxHelper.unmarshaller(java.lang.Class) can be used to parse and map a json request to an object: [source,java] ---- examples.RxifiedExamples#httpServerRequestObservableUnmarshall(io.vertx.rxjava.core.http.HttpServer) ---- === Websocket client The io.vertx.rxjava.core.http.HttpClient#websocketStream provides a single callback when the websocket connects, otherwise a failure: [source,java] ---- examples.RxifiedExamples#websocketClient(io.vertx.rxjava.core.Vertx) ---- The io.vertx.rxjava.core.http.WebSocket can then be turned into an `Observable` easily: [source,java] ---- examples.RxifiedExamples#websocketClientBuffer(rx.Observable) ---- === Websocket server The io.vertx.rxjava.core.http.HttpServer#websocketStream() provides a callback for each incoming connection: [source,java] ---- examples.RxifiedExamples#websocketServer(io.vertx.rxjava.core.http.HttpServer) ---- The ServerWebSocket can be turned into an `Observable` easily: [source,java] ---- examples.RxifiedExamples#websocketServerBuffer(rx.Observable) ----
Skip navigation links

Copyright © 2018 Eclipse. All rights reserved.