Subject vs BehaviorSubject vs ReplaySubject vs AsyncSubject – what and when

There are four main types of subjects: Subject, BehaviorSubject, ReplaySubject, and AsyncSubject. In this article, we will explore the differences between these four types and when to use each one.

January 8, 2023 rxjs angular

What is Subject?

Subject is a special type of Observable that allows emitting value between all subscribers. Based on the implementation you can see that Subject defines methods like:

export class Subject<T> extends Observable<T> implements SubscriptionLike {
next(value: T) {
/** ... */
for (const observer of this.currentObservers) {
observer.next(value);
}
/** ... */
}
error(err: any) {
/** ... */
}
complete() {
/** ... */
}
}

When to use Subject?

Subject is useful if you want to emit events (ex: click event, page changed event, modal closed event) and you are not interested in previous events.

import { Component, OnDestroy } from "@angular/core";
import { Subject } from "rxjs";
@Component({
selector: "app-root",
templateUrl: "./app.component.html",
})
export class ModalComponent implements OnDestroy {
private close$$ = new Subject<void>();
modalClosed$ = this.close$$.asObservable();
ngOnDestroy() {
this.close$$.next();
}
}

What is BehaviorSubject?

BehaviorSubject is a special type of Subject that allows emitting value between all subscribers (like Subject) and storing the last emitted value. BehaviorSubject require to define the initial value. BehaviorSubject is useful if you know the initial state otherwise you should try to use ReplaySubject.

When a new subscriber arrives BehaviorSubject emit stored value to him (look at _subscribe method).

export class BehaviorSubject<T> extends Subject<T> {
constructor(private _value: T) {
/** ... */
}
get value(): T {
return this.getValue();
}
/** @internal */
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = super._subscribe(subscriber);
!subscription.closed && subscriber.next(this._value);
return subscription;
}
getValue(): T {
/** ... */
return this._value;
}
next(value: T): void {
this._value = value;
super.next(value);
}
}

A new subscriber will not receive a value if BehaviorSubject is completed.

import { BehaviorSubject } from "rxjs";
const behavior$$ = new BehaviorSubject<number>(1);
behavior$$.complete();
// there will be no logs
behavior$$.subscribe((event) => console.log("Message: ", event));

Getting stored value

You can get the current state without calling subscribe method. To do it use getValue().

import { BehaviorSubject } from "rxjs";
const state$$ = new BehaviorSubject<AppState>({ state: "PRISTINE" });
console.log(state$$.getValue()); // -> { state: 'PRISTINE' }

When to use BehaviorSubject?

BehaviorSubject is useful if you want to store the current state of the application and you know the initial state or you can emulate it by marking the state as pristine.

@Component({
selector: "app-root",
templateUrl: "./app.component.html",
})
export class AppComponent {
state$$ = new BehaviorSubject<AppState>({ state: "PRISTINE" });
constructor(private projectService: ProjectService) {}
load(): void {
this.state$$.next({ state: "LOADING" });
this.projectService.getProjects().subscribe((projects) =>
this.state$$.next({
state: "LOADED",
data: projects,
}),
);
}
}

Passing null as an initial state is not a good idea if null does not represent a real state.

What is ReplaySubject?

ReplaySubject is a special type of Subject that allows emitting values between all subscribers (like Subject) and storing emitted values based on time and/or buffer size. ReplaySubject does not require to define an initial value, so is useful if the initial value is unknown (in contrast to BehaviorSubject).

When a new subscriber arrives ReplaySubject emits all values from the buffer (look at _subscribe method). If the buffer is empty no events will be emitted.

export class ReplaySubject<T> extends Subject<T> {
constructor(
private _bufferSize = Infinity,
private _windowTime = Infinity,
private _timestampProvider: TimestampProvider = dateTimestampProvider,
) {
/**... */
}
next(value: T): void {
/** ... */
_buffer.push(value);
/** ... */
this._trimBuffer();
super.next(value);
}
/** @internal */
protected _subscribe(subscriber: Subscriber<T>): Subscription {
/** ... */
this._trimBuffer();
/** ... */
const copy = _buffer.slice();
for (let i = 0; i < copy.length && !subscriber.closed; i += _infiniteTimeWindow ? 1 : 2) {
subscriber.next(copy[i] as T);
}
/** ... */
}
private _trimBuffer() {
/** ... */
}
}

Thanks to _bufferSize and _windowTime params you can define how many values and how long you want to store them.

ReplaySubject does not have a method to get the current buffer size so checking if the buffer is empty/full is not possible.

Buffer is trimmed only if a new subscriber has been added or a new value has been emitted. That means ReplaySubject it will not emit trimmed values even after window time has arrived.

By default buffer size is set as Infinity. If you do not need historical values do not forget to set the buffer size as one.

const token$ = new ReplaySubject<string>(1);

If you subscribe after calling complete method you will also receive the latest value (unlike BehaviorSubject).

const replay$$ = new ReplaySubject<number>();
replay$$.next(1);
replay$$.next(2);
replay$$.next(3);
replay$$.complete();
replay$$.subscribe((e) => console.log(e)); // -> 1, 2, 3

When to use ReplaySubject?

ReplaySubject is useful if you want to store the state of part of an application but you do not know the initial state.

@Component({
selector: "app-root",
templateUrl: "./app.component.html",
})
export class AppComponent {
project$$ = new ReplaySubject<Project>(1);
constructor(
private projectService: ProjectService,
private router: Router,
private activatedRoute: ActivatedRoute,
) {}
ngOnInit(): void {
this.activatedRoute.paramMap
.pipe(
map((params) => params.get("projectId")),
switchMap((id) => this.projectService.getProject(id)),
)
.subscribe((project) => this.project$$.next(project));
}
openProjectDetails() {
this.project$$
.pipe(
take(1),
switchMap((project) => this.router.navigate(["/", "projects", project.id])),
)
.subscribe();
}
}

What is AsyncSubject?

AsyncSubject is a special type of Subject that allows emitting value between all subscribers (like Subject) but only the latest value on complete.

export class AsyncSubject<T> extends Subject<T> {
private _value: T | null = null;
private _hasValue = false;
next(value: T): void {
this._value = value;
this._hasValue = true;
}
complete(): void {
/** ... */
this._hasValue && super.next(this._value!);
/** ... */
}
}

Note that the AsyncSubject will only emit a value when the complete function is called, and it will only emit the last value that was passed to the next function.

If the complete function is not called, the AsyncSubject will not emit any values.

If you subscribe after the calling complete method you will also receive the latest value.

const asyncSubject$$ = new AsyncSubject<number>();
// you will receive only the last value after complete
asyncSubject$$.subscribe((e) => console.log(e)); // -> 3
asyncSubject$$.next(1);
asyncSubject$$.next(2);
asyncSubject$$.next(3);
asyncSubject$$.complete();

When to use AsyncSubject?

AsyncSubject is useful if you want to cache a value that never changed.

@Component({
selector: "app-root",
templateUrl: "./app.component.html",
})
export class AppComponent implements OnInit {
token$$ = new AsyncSubject<string>();
constructor(private projectService: ProjectService) {}
ngOnInit() {
this.projectService.getTemporaryToken().subscribe((token) => {
this.token$$.next(token);
this.token$$.complete();
});
}
}

Do you like the content?

Your support helps me continue my work. Please consider making a donation.

Donations are accepted through PayPal or Stripe. You do not need a account to donate. All major credit cards are accepted.

Leave a comment