- ReactiveX is an API for asynchronous programming based on the observer pattern.
- RxGo is the official Go implementation of ReactiveX (a cousin of RxJS, RxJava, etc.).
- There are +50 new operators and multiple new features (hot vs cold observable, connectable observable, backpressure, etc.)
- Yes, Go has already great concurrency primitives and yes, there is still no generics. Nonetheless, RxGo may still be worth a look.
In ReactiveX, the two main concepts are the Observer and the Observable.
An Observable is a bounded or unbounded stream of data. An Observer subscribes to an Observable. Then, it reacts to whatever item the Observable emits. There are operators to create or compose Observables.
The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels. Each stage is a group of goroutines running the same function.
Let’s see a concrete example:
Here, each box is an operator:
- We create a static Observable based on a fixed list of items using
- We define a transformation function (circle into a square) using
- We filter each yellow square using
In this example, the final items are sent to a channel, available to a consumer.
There are other ways to consume items. Publishing the results in a channel is only one option.
By default, the operators are sequential. Yet, we can easily leverage parallelism by defining multiple instances of an operator. Each operator instance is a goroutine, connected to a common channel.
The main philosophy of RxGo is to implement the ReactiveX concepts and to leverage Go concurrency primitives. Therefore, the integration between both worlds is made as smooth as possible.
Real-World Example with v2
First, we have to install RxGo v2:
go get -u github.com/reactivex/rxgo/v2
Then, let’s implement a stream based on the following
We create a producer that will emit
Customers to a given
chan rxgo.Item channel. We create an
rxgo.Item using either
Then, we need to perform the two following operations:
- Filter the customers whose age is below 18.
- Enrich each customer with a tax number. Retrieving a tax number is performed in our example by a function calling a REST service.
As the enriching step is IO-bound, it might be interesting to parallelize it. We will use
rxgo.WithPool option for this purpose.
Nonetheless, in our example, all the
Customer items must be produced sequentially based on their
ID. We wil l use
rxgo.Serialize option that will act as a resequencer.
In the end, we consume using
Observe() that returns a
As part of the new features of RxGo v2, it is now possible to:
- Make a distinction between hot and cold Observables.
- Create Connectable Observables.
- Handle backpressure.
- Configure the way an Observer is created either lazily or eagerly.
- Leverage parallelism with a single option while having guaranteed ordering.
- Pass a context to manage cancellation.
- Configure the strategy upon error.
- Use the ad-hoc RxGo assert API to write unit tests.
- Use any of the +50 new operators (71 in total 🚀).
Last but not least, we put a lot of effort into our documentation.
On the v2 development:
- From 30th November 2018 to 24th March 2020 (480 days).
- 90 issues.
- 17 participants.
- 468 commits.
Release note: https://github.com/ReactiveX/RxGo/releases/tag/v2.0.0
Every contribution is very welcome!
Also, thanks to @jochasinga for having created RxGo.