What is Subject
?
Subject
is a special type of Observable
that allows emitting value between all subscribers.
Based on the implementation you can see that Subject
defines methods like:
next
: emit value to all subscriberserror
: emit error to all subscribers and unsubscribe all subscriberscomplete
: close the source and unsubscribe all subscribers
export class Subject<T> extends Observable<T> implements SubscriptionLike { next(value: T) { /** ... */ for (const observer of this.currentObservers) { observer.next(value); } /** ... */ }
error(err: any) { /** ... */ }
complete() { /** ... */ }}
When to use Subject
?
Subject
is useful if you want to emit events (ex: click event, page changed event, modal closed event) and you are not interested in previous events.
import { Component, OnDestroy } from "@angular/core";import { Subject } from "rxjs";
@Component({ selector: "app-root", templateUrl: "./app.component.html",})export class ModalComponent implements OnDestroy { private close$$ = new Subject<void>(); modalClosed$ = this.close$$.asObservable();
ngOnDestroy() { this.close$$.next(); }}
What is BehaviorSubject
?
BehaviorSubject
is a special type of Subject that allows emitting value between all subscribers (like Subject
)
and storing the last emitted value. BehaviorSubject
require to define the initial value.
BehaviorSubject
is useful if you know the initial state otherwise you should try to use ReplaySubject
.
When a new subscriber arrives BehaviorSubject
emit stored value to him (look at _subscribe
method).
export class BehaviorSubject<T> extends Subject<T> { constructor(private _value: T) { /** ... */ }
get value(): T { return this.getValue(); }
/** @internal */ protected _subscribe(subscriber: Subscriber<T>): Subscription { const subscription = super._subscribe(subscriber); !subscription.closed && subscriber.next(this._value); return subscription; }
getValue(): T { /** ... */ return this._value; }
next(value: T): void { this._value = value; super.next(value); }}
A new subscriber will not receive a value if BehaviorSubject is completed.
import { BehaviorSubject } from "rxjs";
const behavior$$ = new BehaviorSubject<number>(1);behavior$$.complete();
// there will be no logsbehavior$$.subscribe((event) => console.log("Message: ", event));
Getting stored value
You can get the current state without calling subscribe
method. To do it use getValue()
.
import { BehaviorSubject } from "rxjs";
const state$$ = new BehaviorSubject<AppState>({ state: "PRISTINE" });
console.log(state$$.getValue()); // -> { state: 'PRISTINE' }
When to use BehaviorSubject
?
BehaviorSubject
is useful if you want to store the current state of the application
and you know the initial state or you can emulate it by marking the state as pristine.
@Component({ selector: "app-root", templateUrl: "./app.component.html",})export class AppComponent { state$$ = new BehaviorSubject<AppState>({ state: "PRISTINE" });
constructor(private projectService: ProjectService) {}
load(): void { this.state$$.next({ state: "LOADING" });
this.projectService.getProjects().subscribe((projects) => this.state$$.next({ state: "LOADED", data: projects, }), ); }}
Passing null
as an initial state is not a good idea if null
does not represent a real state.
What is ReplaySubject
?
ReplaySubject
is a special type of Subject
that allows emitting values between all subscribers (like Subject
)
and storing emitted values based on time and/or buffer size.
ReplaySubject
does not require to define an initial value, so is useful if the initial value is unknown (in contrast to BehaviorSubject
).
When a new subscriber arrives ReplaySubject
emits all
values from the buffer (look at _subscribe method).
If the buffer is empty no events will be emitted.
export class ReplaySubject<T> extends Subject<T> { constructor( private _bufferSize = Infinity, private _windowTime = Infinity, private _timestampProvider: TimestampProvider = dateTimestampProvider, ) { /**... */ }
next(value: T): void { /** ... */ _buffer.push(value); /** ... */ this._trimBuffer(); super.next(value); }
/** @internal */ protected _subscribe(subscriber: Subscriber<T>): Subscription { /** ... */ this._trimBuffer(); /** ... */ const copy = _buffer.slice(); for (let i = 0; i < copy.length && !subscriber.closed; i += _infiniteTimeWindow ? 1 : 2) { subscriber.next(copy[i] as T); } /** ... */ }
private _trimBuffer() { /** ... */ }}
Thanks to _bufferSize
and _windowTime
params you can define how many values and how long you want to store them.
ReplaySubject
does not have a method to get the current buffer size so checking if the buffer is empty/full is not possible.
Buffer is trimmed only if a new subscriber has been added or a new value has been emitted. That means ReplaySubject it will not emit trimmed values even after window time has arrived.
By default buffer size is set as Infinity
.
If you do not need historical values do not forget to set the buffer size as one.
const token$ = new ReplaySubject<string>(1);
If you subscribe after calling complete
method you will also receive the latest value (unlike BehaviorSubject
).
const replay$$ = new ReplaySubject<number>();
replay$$.next(1);replay$$.next(2);replay$$.next(3);replay$$.complete();
replay$$.subscribe((e) => console.log(e)); // -> 1, 2, 3
When to use ReplaySubject
?
ReplaySubject
is useful if you want to store the state of part of an application but you do not know the initial state.
@Component({ selector: "app-root", templateUrl: "./app.component.html",})export class AppComponent { project$$ = new ReplaySubject<Project>(1);
constructor( private projectService: ProjectService, private router: Router, private activatedRoute: ActivatedRoute, ) {}
ngOnInit(): void { this.activatedRoute.paramMap .pipe( map((params) => params.get("projectId")), switchMap((id) => this.projectService.getProject(id)), ) .subscribe((project) => this.project$$.next(project)); }
openProjectDetails() { this.project$$ .pipe( take(1), switchMap((project) => this.router.navigate(["/", "projects", project.id])), ) .subscribe(); }}
What is AsyncSubject
?
AsyncSubject
is a special type of Subject
that allows emitting value between all subscribers (like Subject
) but only the latest value on complete.
export class AsyncSubject<T> extends Subject<T> { private _value: T | null = null; private _hasValue = false;
next(value: T): void { this._value = value; this._hasValue = true; }
complete(): void { /** ... */ this._hasValue && super.next(this._value!); /** ... */ }}
Note that the AsyncSubject
will only emit a value when the complete function is called, and it will only emit the last value that was passed to the next function.
If the complete function is not called, the AsyncSubject will not emit any values.
If you subscribe after the calling complete
method you will also receive the latest value.
const asyncSubject$$ = new AsyncSubject<number>();
// you will receive only the last value after completeasyncSubject$$.subscribe((e) => console.log(e)); // -> 3
asyncSubject$$.next(1);asyncSubject$$.next(2);asyncSubject$$.next(3);asyncSubject$$.complete();
When to use AsyncSubject
?
AsyncSubject
is useful if you want to cache a value that never changed.
@Component({ selector: "app-root", templateUrl: "./app.component.html",})export class AppComponent implements OnInit { token$$ = new AsyncSubject<string>();
constructor(private projectService: ProjectService) {}
ngOnInit() { this.projectService.getTemporaryToken().subscribe((token) => { this.token$$.next(token); this.token$$.complete(); }); }}