Mind is Software

Ying’s thoughts about software and business

RxJS introduction

The notes is based on the book RxJS in Action, the official RxJS document and the Learn RxJS gitbook .

1 A New Async Paradigm

The existing sync loop, conditional statements and exception handling strategies are not async aware. They are oblivious of wait time or latency between operations. Nesting calls are not only hard to understand but also bring clousures with them. Cancel long-running operation is hard, if not impossible. Throtting is missing. Composing of nested asyn flows is difficult.

RxJS is an API for asyn programming with observable streams. A stream is a sequence of event over time. Everything is a stream in RxJS. RxJS uses an observer design pattern that involves an object (the subject), which maitains a list of subscribers (each is an observer). RxJS adds features such as completion signal, lazy initialization, cancellation, resource management and dsiposal.

In RxJS, data is not stored in variables. It flows through the streams. RxJS follows a declarative design inspired by functional programming. Operations can be chained. Streams can be composed. RxJS combines ideas from the observer pattern, the iterator pattern and the functional programming.

Rx is good for

  • composing multiple events together
  • adding delay
  • clientside rate limiting
  • coordinating asyn tasks like drag and drop
  • when cancellation is required

Don’t use Rx for simple tasks such as button click. Angular uses RxJS in its HttpClient, AsyncPipe, reactive forms and routers.

2 Introduction

2.1 Basic Concepts

RxJS abstract over time under the same programming model regardless of source. It has the following components:

  • Producer: a source of data. It’s also called an observable that is in charge of pushing notification – a bahvior called fire-and-forget.
  • Consumer: an observer to process data. A stream of data travels from the producer to the consumer.
  • Data pipeline: using operators to process the data when it passes from the producer to the subscriber.
  • Time: there is always a time factor in data stream. Time passes when data flows from the producer to the subscriber with a pipeline between them.

RxJS introduces the following concepts:

  • Observable: is a core type that represents a collection of future values or events. It takes a function as the only constructor argument – the function takes an observer and optionally returns a subscription. The function is called in the observable’s subscribe method. The Observable has some static methods that generates data from different data sources such as promise or event soruces. Essentially an observable connects a proudcer (a data source) to a consumer (an observer). The observer is provided to the subscribe method and only then the observable starts to push data to the observer.
  • Observer: is an interface for a collecton of callbacks (next, error, complete) that handles values pushed by an observable instance.
  • Subscription: represents the execuation of an Observable, is primarily useful for cancelling the execution. It has a teardown function named unsubscribe. Cancelling a subscription will not call complete.
  • Operators: are pure functions that transforms an observable to another observable.
  • Subject: is both an observable and a observer. It can multicast a value to multiple observers.
  • Schedulers: are dispatchers to control concurrency.

In implementation, an Observable is a class that represents a set of values over any amount of time. It is constructed (initialized) with an observable function that uses an observer to process a set of inputs. The class has a subscribe method that executes the observable function. The method may return an object that has an unsubscribe() mehtod used to clean the data stream. An Observable instance behaviors like a funciton that can push values synchronously or aynchronously.

An observer is registered to an observable. An observer has three methods: next(val: any): void, error(error: any): void, and complete(): void.

If there is an error, the observable is gone. To isolate errors, use switchMap() and use catchError() inside the pipe method of the inner observable. Any error only cancels the inner obserable.

Subscriptions can be put together by using add(child) method to add a child subscription to a parent. When the parent subscription is unsubscribed, all its chiildren are unsubscribed. unsubscribe() is called automatically when observable is completed (complete() is called, take(n), or takeUntil(notifier)) or there is an error. You should call the unsubscribe() method when an observable is no longer used. If a component subscribes to an observable that lives longer than the component, you should call the unsubscribe() method when the component is destroyed.

There are four types of data sources and four different ways to create an Observable instance.

  • Single-value, synchronous: use simple sync operation to process the data. An observalbe wrapper is only used when they combine with other streams. Use Rx.Observable.of(value) to wrap it.
  • Multi-value, synchronous: is better processed by pull-based iterator. Use Rx.Observable.from(values) to wrap an array, or array-like data source.
  • Single-value, asynchronous: it’s often wrapped in a promise. A promise is execuated eagerly and asynchrounously. Use Rx.Observalbe.fromPromise(promise) to wrap it.
  • Multi-value, asynchronous: the typical solution is an EventEmitter. Use Rx.Observalbe.fromEvent() to wrap it. RxJS uses push-based notifications.

2.2 Imports

To import types, creation methods, schedulers and utilities, use import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent } from 'rxjs'. To import pipeable operators, use import { map, filter, scan } fro m 'rxjs/operators'. To import specific tools: use import { webSocket } from 'rxjs/webSocket' for web socket subject, import { ajax } from 'rxjs/ajax' for ajax utitlity and import { TestScheduler } from 'rxjs/testing' for testing utilities.

2.3 Multicasted Observables

A multicasted Observable uses a Subject to support multiple Observers. It uses multicase(subject) to setup a subject and use connect() to start pushing data.

Use multicase(subject).refCount() to track the number of Observers and start and stop/unsubscribed automatically.

A BehaviorSubject stores the latest value emitted to its consumers. When a new Observer subscribes, it gets the latest value immediately.

A ReplaySubject stores multiple recent values by counter or window time and emit them to the new subscribers.

An AsyncSubject only push the last value when it completes.

3 Operators

RxJS avoids premature allocation of data in two ways: using of a lazy subscription and, by default, pushing data as soon as the event is emitted without holding it in memory.

An operator is a pure, higher-order, lazily-evaluated function that is injected into an observable’s pipeline. Operators subscribe to a source observable and return a result observable.

3.1 Core Operators

The map operator transforms data from one type to another.

The filter removes unwanted items from a stream via a selector function, also called the predicate.

The reduce(accumulatorFunction, [initialValue]) operator turns a stream into a single value observable. The accumulatorFunction takes two parameters: the current result and the new steam element.

The scan() applies an accumulator function over an observable sequence but returns each intermediate results.

The take(count) operator returns a specified amount of contiguous elements. The first and last return the first or the last element, repsectively.

The min and max operator turens the minimum or maximum value of a finite stream.

The tap utitlity operator invokes an action for each element to perform some type of side effect, mostly for debugging or tracing purposes. It can be plugged into any step in the pipeline.

The pluck(propertyName) gets the value for a named-property from each element.

If a pipeline is side effect-free, it is said to be self-contained. It’s called operator chaining or fluent programming.

An observable must always produce the same results given the same events passing throught it. It’s a qulaity known in FP as referential transparency.

3.2 An Operator Example

An operator creates a brand-new observable, transforming the data from its source and delegating result to the next subscriber in the chain.

function exclude(predicate) {
    return Rx.Observable.create(subscriber => {
        let source = this
        return source.subscribe(
            value => {
                try {
                    if (!predicate(value)) {
                        subscriber.next(value)
                    }
                }
                catch(err) {
                    subscriber.error(err)
                }
            },
            err => subscriber.error(err),
            () => subscriber.complete()
        })
    })
}

Rx.Observable.prototype.exclude = exclude

Oberserables are lightweight and inexpensive to create. They have built-in capabilitys for disposal and cancellation via the unsubscribe() method. Use Rx.Observable.create to return a function that is called when the unsubscribe() method of the observable is called.

3.3 Flat Combination

The merge() method merges one observable with others. The elements are in the order of their original sources.

The concat() method appends all element of a source to another one. It begins emitting data from a second observable only when the first one completes.

The switch() is an instance method that subscribes to an observable that emits obserables. Eech time it sees a new emitted observable, it unsubscribes from the previously-emitted observable. As described in its operator document.

convert an Observable that emits Observables into a single. Observable that emits the items emitted by the. most-recently-emitted of those Observables.

3.4 Nested Observables

Observables manage and control the data that flows through them via data containerizing. Therefore, there are cases of observables whose values are observables. This software pattern is the FP paradigm called monad. A monad exposes an interface with three methods: a unit function to lift values into the monadic context (of()), a mapping function (map()), and a map-with-flatten function (mergeMap()).

Use switchMap if you want to cancel inner subscription when the source emits, allowing only one active inner subscription.

Use mergeMap if you want to keep all inner subscriptions and the ordering doesn’t matter. There are multiple active inner subscriptions. To avoid memory leak, use take or takeUntil or limit the number of active innner subscriptions at a time with the concurrent parameter.

Use concatMap if you want to keep all inner subscritions and process them serially. Their order are kepts.

Use exhaustMap if you want to ignore other values until the current inner subscription completes. It’s an opposite of switchMap.

3.5 Coordinating Observalbes

The startWith() emits a value before other observalbe values emitting.

The using(resourceFactory, observableFactory) calls resource.unsubscribe() when the observable is completed.

The combineLatest() emits an array of the latest values of multiple independent observalbes.

The forkJoin() emits only the last value of each forked stream.

Use zip() to combine streams that happen synchronously.

4 Scheduler

A Scheduler defines the execution context in which an Observable pushes data to its Observer. It may have a data structure to store and queue tasks based on priority or other criteria. I may have a virtual clock that denotes time for task sheduling. There are three types of schdulers: queue, asap (next tick), and async (setInterval or setTimeout). If not passing any scheduler, data are pushed synchronously and recursively.

RxJS uses a lest concurrency policy. For a finite and small number of data, RxJS uses no Scheduler. For a loarge or finite number of data, queue scheduler is used. For operators that use timers, async is used.

Static creation operators usually take a Scheduler as argument. Use subscribeOn(scheduler) to specify a scheduler. Use observeOn to schedule the context for pushing data. Time-related operators like bufferTime, debounceTime, delay, etc. take a Scheduler as the last argument.

5 Time Management

Functions that deal with time are inherently impure because time is global to the entire applicatonand forever changing. JavaScript functions like Data.now() and Math.random() are impure because their returns are inconsistent. An async event brings two challenges:

  • It may or may not happen in the future.
  • It’s conditional that it depends on the result of a previous task.

Callbacks and event handlers have implicit timing in the process.

5.1 Timing Methods

The Rx.Observable.timer(offset) creates an observable that will emit a single event after a given period of time. Similarly, interval(span) emits a sequnece of natural numbers starting from 0 at the fixed interval. They also accept an additional scheduler parameter that makes testing easy. The timeInterval() instance method gives both a count as well as the time intervals between two events. The delay(offset) time shifts the entire observable sequence.

There are two important characteristics of RxJS timing:

  • Each operator only affects the propagatin of an event, not its creation.
  • Time operators act sequentially.

debounceTime(period) triggers only if a certian period has passed without it being call. The last value will be emitted.

throttleTiem(period) emits at most once every period.

5.2 Buffering

buffer(closingObservable) buffers data until a closing observable emits an event.

bufferCount(number) buffers the number of events.

bufferTime(period) buffers for a specific period.

bufferWhen(selector) buffers when the selector call emits a value.

6 Error Handling

RxJS implements a functional error-handling technique. It abstracts errors and exception handling via several strategies.

6.1 Error Propagation

At the end of the observable stream is a subscriber waiting to pounce on the next event to occur. The subscriber implements the Observer interface that consisting of three methodsd: next(), error(), and complete(). Errors occur at the begining of the stream or in the middle are propagated down to any observer, finally resulting in a call to error(). The first exception that fires will result in the entire stream being cancelled.

6.2 Catching and Reacting to Errors

The catch() operator intercepts any error in the Observable and give you the option to handle by returning a new Obseervable or propogating it downstream.

The catch() operato is passed a function that takes an error argument as well as the soruce observable that was caught. Therefore you can return the source observable to retry from the begining.

RxJS provides retry() operator to reexecuting the source obserbale a number of retries. Be carful when use it with an observalbe creaetd from a promise because a promise alway return a settled value (success or failure). You can control retry strategy using retryWhen().

RxJS provides throw() and finally() operators to throw exception or run cleanup code. The finally() runs when a stream compleetes or when it errors.

7 Hot and Cold

The hot and cold category determines the stream behavior, not just the subscription semantics, but also the entire lifetime of the strea.

7.1 Cold Observables

A cold observable doesn’t begin emittting until an observer subscribes to it. It is typically used to wrap bounded data resource types such as numbers, ranges of numbers, strings, arrays and HTTP requests, as well as unbounded types like generator functions. These resources are known as passive in the sense that their declaration is independent of their execution. They are truly lazy in their creation and execution.

Being cold means that each new subscription is creating a new independent stream with a new starting point for that stream. Each subscriber will always independently receive the exact same set of events. A cold observable can be thought of as a function of an object factory that takes input data and return an output to the caller.

The declaration of a cold observable frequently begins with the static operators such as of(), from(), interval(), and timer().

7.2 Hot Observables

Hot observables produce events regardless of the presence of subscribers. Hot observables are used to model events like clicks, mouse movement, touch, or any other vents exposed via event emitters.

Simialarly, an HTTP request is cold where a Promise is hot – a promise is not reexecutable once it’s been fulfilled.

A hot observable shares the same subscription to all observers that listen to it. It emits ongoing sequence of events from the point of subscription and not from the beginning.

7.3 Change Temperature

The default resubscription behavior of RxJS is code observable subscription: each subscriber gets its own copy of the producer. It is the case for synchronous data sources as well as async data sources wrapped created within an observable context. An implication is that anything subscribing to a cold observable creates an one-to-one unicast comunication between the proudcer and the consumer. Subscribing to a hot observable creates an one-to-many shared/multicast communication between the producer and its consumers.

By moving a hot source producer such as a promise or a websocket creation into an observer context, you make a hot source cold.

By moving a cold source producer out of an observable context and let the observable to subscribe the producer event can make a cold source hot.

The share() operator turns a cold stream to hot by managing the underlying stream’s state thus the stream can be shahred by all subscribers.


Share