Announcing RxGo v2
TL;DR;
- ReactiveX is an API for asynchronous programming based on the observer pattern.
- RxGo is the official Go implementation of ReactiveX (a cousin of RxJS, RxJava, etc.).
- There are +50 new operators and multiple new features (hot vs cold observable, connectable observable, backpressure, etc.)
- Yes, Go has already great concurrency primitives and yes, there is still no generics. Nonetheless, RxGo may still be worth a look.
Overview
In ReactiveX, the two main concepts are the Observer and the Observable.
An Observable is a bounded or unbounded stream of data. An Observer subscribes to an Observable. Then, it reacts to whatever item the Observable emits. There are operators to create or compose Observables.
The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels. Each stage is a group of goroutines running the same function.
Let’s see a concrete example:
Here, each box is an operator:
- We create a static Observable based on a fixed list of items using
Just
operator. - We define a transformation function (circle into a square) using
Map
operator. - We filter each yellow square using
Filter
operator.
In this example, the final items are sent to a channel, available to a consumer.
There are other ways to consume items. Publishing the results in a channel is only one option.
By default, the operators are sequential. Yet, we can easily leverage parallelism by defining multiple instances of an operator. Each operator instance is a goroutine, connected to a common channel.
The main philosophy of RxGo is to implement the ReactiveX concepts and to leverage Go concurrency primitives. Therefore, the integration between both worlds is made as smooth as possible.
Real-World Example with v2
First, we have to install RxGo v2:
go get -u github.com/reactivex/rxgo/v2
Then, let’s implement a stream based on the following Customer
structure:
We create a producer that will emit Customer
s to a given chan rxgo.Item
channel. We create an rxgo.Item
using either rxgo.Of(interface{})
or rxgo.Error(error)
.
Then, we need to perform the two following operations:
- Filter the customers whose age is below 18.
- Enrich each customer with a tax number. Retrieving a tax number is performed in our example by a function calling a REST service.
As the enriching step is IO-bound, it might be interesting to parallelize it. We will use rxgo.WithPool
option for this purpose.
Nonetheless, in our example, all the Customer
items must be produced sequentially based on their ID
. We wil l use rxgo.Serialize
option that will act as a resequencer.
In the end, we consume using Observe()
that returns a <-chan Item
:
What Else?
As part of the new features of RxGo v2, it is now possible to:
- Make a distinction between hot and cold Observables.
- Create Connectable Observables.
- Handle backpressure.
- Configure the way an Observer is created either lazily or eagerly.
- Leverage parallelism with a single option while having guaranteed ordering.
- Pass a context to manage cancellation.
- Configure the strategy upon error.
- Use the ad-hoc RxGo assert API to write unit tests.
- Use any of the +50 new operators (71 in total 🚀).
Last but not least, we put a lot of effort into our documentation.
Statistics
On the v2 development:
- From 30th November 2018 to 24th March 2020 (480 days).
- 90 issues.
- 17 participants.
- 468 commits.
Release note: https://github.com/ReactiveX/RxGo/releases/tag/v2.0.0
Every contribution is very welcome!
Also, thanks to @jochasinga for having created RxGo.