RxJS in Angular (Observables, Subjects)

rxjs angular subject observable

This article on RxJS in Angular is part of the Learning Angular series.

Angular 6 and RxJS 6

If one is using Angular 6 and RxJS 6, and want to have their older codes work, need to use rxjs-compat
npm install rxjs-compat –save

Analyzing a built-in Angular Observable

A subscriber takes in 3 parameters as follows:

this.route.params.subscribe(
    () => {}, // On Result
    () => {}, // On Error
    () => {} // On Complete
);

Building and Using a First Simple Observable

// Create this app

import { Observable } from 'rxjs/Observable';
import 'rxjs/Rx'; // This provides some utility functions for use.

ngOnInit() {
  const myNumbers = Observable.interval(1000);
  myNumbers.subscribe(
    (number: number) => {
      console.log(number);
    }
  );
}

Building and using a Custom Observable from Scratch

const myObservable = Observable.create((observer: Observer) => {
    setTimeout(() => {
        observer.next('first package');
    }, 2000);
    setTimeout(() => {
        observer.next('second package');
    }, 4000);
    setTimeout(() => {
        observer.error('some error occured');
    }, 5000);
    setTimeout(() => {
        observer.complete();
    }, 6000);
    setTimeout(() => {
        // This won't execute as the complete has been called earlier.
        observer.next('third package');
    }, 7000);
});

Unsubscribe

For the custom Observables created in the previous lectures, if one navigates out of the page holding the Observables, one would notice that the Observables are still active and continue to emit.
To make sure that there is no memory leaks, one should definitely unsubscribe from them. To do that, save the observables to a subscription and unsubscribe.

this.myCustomSubscription = myBumbers.subscribe(() => {
    ...
});

ngOnDestory() {
    this.myCustomSubscription.unsubscribe();
}

While Angular clears up its own subscriptions, its better to clean them manually.

Where to Learn More

reactivex.io/rxjs

Using Subjects to Pass and Listen to Data

Subject is extended from Observable and it implements both the Observer and the Subscriber. This makes it easy to use.
Note: Angular EventEmitter is based upon Subject, but it is preferable to use the Subject instead of EventEmitter to perform cross-component communication.

import { Subject } from 'rxjs/Subject';
export class UsersService {
    userActivated = new Subject();
}

UserComponent.html

<button>Activate</button>

UserComponent.ts

onActivate() {
    // Acts as Observer
    this.usersService.userActivated.next(this.user.id);
}

app.component.ts

ngOnInit() {
    // Acts as Observable
    this.usersService.userActivated.subscribe((id: number) => {
        // Do something
    });
}

Understanding Observable Operators

The operators allow to transform the data we receive to something else and still remain in the Observable world.

const myNumbers =
        Observable
            .interval(1000)
            .map((data: number) => {
                return data * 2;
            });

In this case map is being used. map operator maps the data you pack into a new Observable with any transaformation we want in the function.
As because every operator returns an Observable, so they can be chained as much as required with as many operators as possible.

RxJS 6 without rxjs-compat

To make it work with the recent RxJS 6, one needs to get the Observable, Observer, direcytly from ‘rxjs’ module

import { Observer, Observable, Subject, Subscription, interval ... } from 'rxjs';

Also need to import the operators manually as requried.

import { map, ... } from 'rxjs/operators';

Also the interval needs to be imported form the rxjs module and should be used directly.
Also the operators can’t be sued directly any more. They need to pass through a pipe now.

const myNumbers = 
        Observable
            .interval(1000)
            .map(...);

gets modified to

const myNumbers = 
        interval(1000)
            .pipe(
                map(...)
            );

Q. How to pass multiple operators? Do they need to go through a single pipe or multiple pipes?

Ans – They pass through a single pipe, separated with comma.

Leave a Reply