TL;DR;

Overview

In ReactiveX, the two main concepts are the Observer and the Observable.
An Observable is a bounded or unbounded stream of data. An Observer subscribes to an Observable. Then, it reacts to whatever item the Observable emits. There are operators to create or compose Observables.

The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels. Each stage is a group of goroutines running the same function.

Let’s see a concrete example:

Example

Here, each box is an operator:

In this example, the final items are sent to a channel, available to a consumer.
There are other ways to consume items. Publishing the results in a channel is only one option.

By default, the operators are sequential. Yet, we can easily leverage parallelism by defining multiple instances of an operator. Each operator instance is a goroutine, connected to a common channel.

The main philosophy of RxGo is to implement the ReactiveX concepts and to leverage Go concurrency primitives. Therefore, the integration between both worlds is made as smooth as possible.

Real-World Example with v2

First, we have to install RxGo v2:

go get -u github.com/reactivex/rxgo/v2

Then, let’s implement a stream based on the following Customer structure:

We create a producer that will emit Customers to a given chan rxgo.Item channel. We create an rxgo.Item using either rxgo.Of(interface{}) or rxgo.Error(error).

Then, we need to perform the two following operations:

As the enriching step is IO-bound, it might be interesting to parallelize it. We will use rxgo.WithPool option for this purpose.

Nonetheless, in our example, all the Customer items must be produced sequentially based on their ID. We wil l use rxgo.Serialize option that will act as a resequencer.

In the end, we consume using Observe() that returns a <-chan Item:

What Else?

As part of the new features of RxGo v2, it is now possible to:

Last but not least, we put a lot of effort into our documentation.

Statistics

On the v2 development:

Release note: https://github.com/ReactiveX/RxGo/releases/tag/v2.0.0

Every contribution is very welcome!
Also, thanks to @jochasinga for having created RxGo.

Follow me on Twitter @teivah

Software Engineer, Go, Rust, Java | 改善