See: Description
Interface | Description |
---|---|
WriteStreamObserver<R> |
A
WriteStream to Observer adapter. |
WriteStreamSubscriber<R> |
A
WriteStream to Subscriber adapter. |
Class | Description |
---|---|
CompletableHelper | |
ContextScheduler | |
FlowableHelper | |
MaybeHelper | |
ObservableHelper | |
RxHelper | |
SingleHelper |
RxHelper
** ObservableHelper
** FlowableHelper
** SingleHelper
** MaybeHelper
** CompletableHelper
* via the _Rxified_ Vert.x API enhancing the core Vert.x API.
=== Read stream support
RxJava `Flowable` is a perfect match for Vert.x `ReadStream` class : both provide a flow of items.
The FlowableHelper.toFlowable(io.vertx.core.streams.ReadStream)
static methods convert
a Vert.x read stream to a `Flowable`:
[source,java]
----
examples.NativeExamples#toFlowable
----
The _Rxified_ Vert.x API provides a io.vertx.reactivex.core.streams.ReadStream#toFlowable()
method on
io.vertx.reactivex.core.streams.ReadStream
:
[source,java]
----
examples.RxifiedExamples#toFlowable
----
Such flowables are *hot* flowables, i.e. they will produce notifications regardless of subscriptions because
a `ReadStream` can potentially emit items spontaneously or not, depending on the implementation:
At subscription time, the adapter calls ReadStream.handler(io.vertx.core.Handler)
to set its own handler.
Some `ReadStream` implementations can start to emit events after this call, others will emit events wether an
handler is set:
- `AsyncFile` produces buffer events after the handler is set
- `HttpServerRequest` produces events independantly of the handler (i.e buffer may be lost if no handler is set)
In both cases, subscribing to the `Flowable` in the same call is safe because the event loop or the worker
verticles cannot be called concurrently, so the subscription will always happens before the handler starts emitting
data.
When you need to delay the subscription, you need to `pause` the `ReadStream` and then `resume` it, which is what
you would do with a `ReadStream`.
[source,java]
----
examples.RxifiedExamples#delayFlowable
----
Likewise it is possible to turn an existing `Flowable` into a Vert.x `ReadStream`.
The FlowableHelper.toReadStream(io.reactivex.Flowable)
static methods convert
a `Flowable` to a Vert.x read stream:
[source,java]
----
examples.NativeExamples#toReadStream
----
=== Async result support
You can create an RxJava `Observer` from an existing Vert.x `Handlerexamples.NativeExamples#handlerToSingleObserver
----
[source,java]
----
examples.NativeExamples#handlerToMaybeObserver
----
[source,java]
----
examples.NativeExamples#handlerToCompletableObserver
----
The _Rxified_ Vert.x API duplicates each such method with the `rx` prefix that returns an RxJava Single
,
Maybe
or Completable
:
[source,java]
----
examples.RxifiedExamples#single(io.vertx.reactivex.core.Vertx)
----
Such single are *cold* singles, and the corresponding API method is called on subscribe.
`Maybe` can produce a result or no result:
[source,java]
----
examples.RxifiedExamples#maybe
----
`Completable` is usually mapped to `Handlerexamples.RxifiedExamples#completable
----
=== Scheduler support
The reactive extension sometimes needs to schedule actions, for instance `Flowable#timer` creates and returns
a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the
timer threads are not Vert.x threads and therefore not executing in a Vert.x event loop nor on a Vert.x worker thread.
When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra `io.reactivex.Scheduler`,
the RxHelper.scheduler(io.vertx.core.Vertx)
method will return a scheduler that can be used
in such places.
[source,java]
----
examples.NativeExamples#scheduler(io.vertx.core.Vertx)
----
For blocking scheduled actions, a scheduler can be created with the RxHelper.blockingScheduler(io.vertx.core.Vertx)
method:
[source,java]
----
examples.NativeExamples#blockingScheduler
----
RxJava can also be reconfigured to use the Vert.x scheduler:
[source,java]
----
examples.NativeExamples#schedulerHook(io.vertx.core.Vertx)
----
CAUTION: RxJava uses the words _computation_ for non-blocking tasks and _io_ for blocking tasks
which is the opposite of the Vert.x terminology
The _Rxified_ Vert.x API provides also similar method on the io.vertx.reactivex.core.RxHelper
class:
[source,java]
----
examples.RxifiedExamples#scheduler(io.vertx.reactivex.core.Vertx)
----
[source,java]
----
examples.RxifiedExamples#schedulerHook(io.vertx.reactivex.core.Vertx)
----
It is also possible to create a scheduler backed by a named worker pool. This can be useful if you want to re-use
the specific thread pool for scheduling blocking actions:
[source,java]
----
examples.RxifiedExamples#scheduler(io.vertx.reactivex.core.WorkerExecutor)
----
=== Json unmarshalling
The FlowableHelper.unmarshaller(Class)
creates an `io.reactivex.rxjava2.FlowableOperator` that
transforms an `Flowableexamples.NativeExamples#unmarshaller(io.vertx.core.file.FileSystem)
----
The same can be done with the _Rxified_ helper:
[source,java]
----
examples.RxifiedExamples#unmarshaller(io.vertx.reactivex.core.file.FileSystem)
----
=== Deploying a Verticle
To deploy existing Verticle instances, you can use io.vertx.reactivex.core.RxHelper#deployVerticle(io.vertx.reactivex.core.Vertx, io.vertx.core.Verticle)
, it deploys a `Verticle` and returns an `Singleexamples.RxifiedExamples#deployVerticle
----
= Rxified API
The _Rxified_ API is a code generated version of the Vert.x API, just like the _JavaScript_ or _Groovy_
language. The API uses the `io.vertx.rxjava` prefix, for instance the `io.vertx.core.Vertx` class is
translated to the io.vertx.reactivex.core.Vertx
class.
=== Embedding Rxfified Vert.x
Just use the io.vertx.reactivex.core.Vertx#vertx()
methods:
[source,java]
----
examples.RxifiedExamples#embedded()
----
=== As a Verticle
Extend the io.vertx.reactivex.core.AbstractVerticle
class, it will wrap it for you:
[source,java]
----
examples.RxifiedExamples#verticle()
----
Deploying an RxJava verticle is still performed by the Java deployer and does not need a specified
deployer.
== Api examples
Let's study now a few examples of using Vert.x with RxJava.
=== EventBus message stream
The event bus io.vertx.reactivex.core.eventbus.MessageConsumer
provides naturally an `Observableexamples.RxifiedExamples#eventBusMessages(io.vertx.reactivex.core.Vertx)
----
The io.vertx.reactivex.core.eventbus.MessageConsumer
provides a stream of io.vertx.reactivex.core.eventbus.Message
.
The io.vertx.reactivex.core.eventbus.Message#body()
gives access to a new stream of message bodies if needed:
[source,java]
----
examples.RxifiedExamples#eventBusBodies(io.vertx.reactivex.core.Vertx)
----
RxJava map/reduce composition style can then be used:
[source,java]
----
examples.RxifiedExamples#eventBusMapReduce(io.vertx.reactivex.core.Vertx)
----
=== Timers
Timer task can be created with io.vertx.reactivex.core.Vertx#timerStream(long)
:
[source,java]
----
examples.RxifiedExamples#timer(io.vertx.reactivex.core.Vertx)
----
Periodic task can be created with io.vertx.reactivex.core.Vertx#periodicStream(long)
:
[source,java]
----
examples.RxifiedExamples#periodic(io.vertx.reactivex.core.Vertx)
----
The observable can be cancelled with an unsubscription:
[source,java]
----
examples.RxifiedExamples#periodicUnsubscribe(io.vertx.reactivex.core.Vertx)
----
=== Http client requests
We recommend to use the http://vertx.io/docs/vertx-web-client/java/#_rxjava_api[Vert.x Web Client] with RxJava.
=== Http server requests
The io.vertx.reactivex.core.http.HttpServer#requestStream()
provides a callback for each incoming
request:
[source,java]
----
examples.RxifiedExamples#httpServerRequest
----
The io.vertx.core.http.HttpServerRequest
can then be adapted to an `Observableexamples.RxifiedExamples#httpServerRequestObservable(io.vertx.reactivex.core.http.HttpServer)
----
The ObservableHelper.unmarshaller(Class)
can be used to parse and map
a json request to an object:
[source,java]
----
examples.RxifiedExamples#httpServerRequestObservableUnmarshall(io.vertx.reactivex.core.http.HttpServer)
----
=== Websocket client
The io.vertx.reactivex.core.http.HttpClient#websocketStream
provides a single callback when the websocket
connects, otherwise a failure:
[source,java]
----
examples.RxifiedExamples#websocketClient(io.vertx.reactivex.core.Vertx)
----
The io.vertx.reactivex.core.http.WebSocket
can then be turned into an `Observableexamples.RxifiedExamples#websocketClientBuffer(io.reactivex.Flowable)
----
=== Websocket server
The io.vertx.reactivex.core.http.HttpServer#websocketStream()
provides a callback for each incoming
connection:
[source,java]
----
examples.RxifiedExamples#websocketServer(io.vertx.reactivex.core.http.HttpServer)
----
The io.vertx.core.http.ServerWebSocket
can be turned into an `Observableexamples.RxifiedExamples#websocketServerBuffer(io.reactivex.Flowable)
----Copyright © 2019 Eclipse. All rights reserved.