Announcing RxGo v2

Teiva Harsanyi
4 min readMar 24, 2020

TL;DR;

  • ReactiveX is an API for asynchronous programming based on the observer pattern.
  • RxGo is the official Go implementation of ReactiveX (a cousin of RxJS, RxJava, etc.).
  • There are +50 new operators and multiple new features (hot vs cold observable, connectable observable, backpressure, etc.)
  • Yes, Go has already great concurrency primitives and yes, there is still no generics. Nonetheless, RxGo may still be worth a look.

Overview

In ReactiveX, the two main concepts are the Observer and the Observable.
An Observable is a bounded or unbounded stream of data. An Observer subscribes to an Observable. Then, it reacts to whatever item the Observable emits. There are operators to create or compose Observables.

The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels. Each stage is a group of goroutines running the same function.

Let’s see a concrete example:

Example

Here, each box is an operator:

  • We create a static Observable based on a fixed list of items using Just operator.
  • We define a transformation function (circle into a square) using Map operator.
  • We filter each yellow square using Filter operator.

In this example, the final items are sent to a channel, available to a consumer.
There are other ways to consume items. Publishing the results in a channel is only one option.

By default, the operators are sequential. Yet, we can easily leverage parallelism by defining multiple instances of an operator. Each operator instance is a goroutine, connected to a common channel.

The main philosophy of RxGo is to implement the ReactiveX concepts and to leverage Go concurrency primitives. Therefore, the integration between both worlds is made as smooth as possible.

Real-World Example with v2

First, we have to install RxGo v2:

go get -u github.com/reactivex/rxgo/v2

Then, let’s implement a stream based on the following Customer structure:

We create a producer that will emit Customers to a given chan rxgo.Item channel. We create an rxgo.Item using either rxgo.Of(interface{}) or rxgo.Error(error).

Then, we need to perform the two following operations:

  • Filter the customers whose age is below 18.
  • Enrich each customer with a tax number. Retrieving a tax number is performed in our example by a function calling a REST service.

As the enriching step is IO-bound, it might be interesting to parallelize it. We will use rxgo.WithPool option for this purpose.

Nonetheless, in our example, all the Customer items must be produced sequentially based on their ID. We wil l use rxgo.Serialize option that will act as a resequencer.

In the end, we consume using Observe() that returns a <-chan Item:

What Else?

As part of the new features of RxGo v2, it is now possible to:

Last but not least, we put a lot of effort into our documentation.

Statistics

On the v2 development:

  • From 30th November 2018 to 24th March 2020 (480 days).
  • 90 issues.
  • 17 participants.
  • 468 commits.

Release note: https://github.com/ReactiveX/RxGo/releases/tag/v2.0.0

Every contribution is very welcome!
Also, thanks to @jochasinga for having created RxGo.

Follow me on Twitter @teivah

--

--

Teiva Harsanyi

Software Engineer @Google | 📖 100 Go Mistakes author | 改善