Creating custom pipe operators by mixing existing operators
RxJS provides a powerful set of built-in operators that can be used to transform, filter, and combine streams of data.
Filter null
and undefined
values – filter in practice
In RxJS, filter
is a higher-order function that only emits items from the original sequence that satisfy a specified predicate function.
import { filter, of, OperatorFunction } from "rxjs";
function isDefined<T>(): OperatorFunction<T, Exclude<T, null | undefined>> {
return filter((value): value is Exclude<T, null | undefined> => value !== null && value !== undefined);
}
Thanks of T & {}
, null
and undefined
types will be removed from types.
of(false, null, true, undefined).subscribe((value) => console.log(value));
// ^? value: boolean | null | undefined
of(false, null, true, undefined)
.pipe(isDefined)
.subscribe((value) => console.log(value));
// ^? value: boolean
P.S: T & {}
means exactly the same like Exclude<T, null | undefined>
.
Filter passed values as an argument – advanced usage of filter operator
Instead of hardcoded null
and undefined
values in the function you can move it to the function’s argument.
import { filter, OperatorFunction } from "rxjs";
function isNot<T, G extends unknown[]>(...forbiddenValues: G): OperatorFunction<T, Exclude<T, G[number]>> {
return filter((value): value is Exclude<T, G[number]> => {
for (const forbiddenValue of forbiddenValues) {
if (value === forbiddenValue) {
return false;
}
}
return true;
});
}
Thanks to Exclude<T, G[number]>
TypeScript can exclude defined types.
of(false, null, true, undefined).subscribe((value) => console.log(value));
// ^? value: boolean | null | undefined
of(false, null, true, undefined)
.pipe(isNot(null, false as const, undefined))
.subscribe((value) => console.log(value));
// ^? value: true
Operator for auto-complete – combine three operators into one
Auto-complete, or word completion, is a feature in which an application predicts the rest of a word a user is typing. It is very important to limit the number of requests to a service to prevent excessive usage and protects the service from being overloaded or overwhelmed.
See the example of how to create a custom auto-complete operator in RxJS to protect your service from redundant calls.
import { MonoTypeOperatorFunction, pipe, fromEvent } from "rxjs";
import { auditTime, distinctUntilChanged, filter, map } from "rxjs/operators";
function limitEvents<T extends string>(): MonoTypeOperatorFunction<T> {
return pipe(
auditTime(500),
distinctUntilChanged(),
filter((value) => value.length > 3),
);
}
fromEvent(document.querySelector("input")!, "change")
.pipe(
map((event) => event.target.value),
limitEvents(),
)
.subscribe();
auditTime
ignores source values for a given time in milliseconds then emits the most recent value from the source. Thanks to that each change will emit value only once per 500 milliseconds.
distinctUntilChanged
emit value to the next operator only when a value has been changed.
filter
will emit value only if the input has more than 3 characters.
Logger operator – single operator with predefined values
The purpose of a logger is to help developers understand the behavior of a system, diagnose issues, and troubleshoot problems. See the example of how to create a custom logger operator in RxJS to log each event to the console.
import { MonoTypeOperatorFunction } from "rxjs";
import { tap } from "rxjs/operators";
import { interval } from "rxjs";
function logger<T>(tag: string = "DEFAULT"): MonoTypeOperatorFunction<T> {
return tap({
next: (value) => console.log(`%c[${tag}] %cNEXT`, "color: #bada55", "color: #FFF", value),
error: (error) => console.error(`%c[${tag}] %cERROR`, "color: #bada55", "color: #F00", error),
complete: () => console.log(`%c[${tag}] %cCOMPLETED`, "color: #bada55", "color: #0F0"),
subscribe: () => console.log(`%c[${tag}] %cSUBSCRIBE`, "color: #bada55", "color: #FF0"),
unsubscribe: () => console.log(`%c[${tag}] %cUNSUBSCRIBE`, "color: #bada55", "color: #FF0"),
});
}
interval(500).pipe(logger("INTERVAL")).subscribe();
Thanks to the defined colors in the log console, the output will look colorful and intuitive.
Creating a custom pipe operator based on a custom implementation
Creating a custom pipe operator should be the last step to resolving your problem. Try to resolve a problem using available operators, if it is not possible only then try to implement a custom operator.
To create a custom pipe operator, we first need to define a function that takes an observable as input and returns a new observable. This function should contain the logic for the transformation we want to perform.
import { Observable } from "rxjs";
function isDefined<T>(source: Observable<T>): Observable<T & {}> {
return new Observable((subscriber) => {
const subscription = source.subscribe({
next(value) {
if (value !== undefined && value !== null) {
subscriber.next(value);
}
},
});
return subscription;
});
}
You should always tear down created subscriptions inside a new Observable
.
To do it return created subscription or add a custom callback
which will be called when the source will be unsubscribed.
import { Observable } from "rxjs";
function interval(period: number): Observable<number> {
return new Observable((subscriber) => {
let i = 0;
const interval = setInterval(() => subscriber.next(i++), period);
return () => clearInterval(interval);
});
}
Thanks of T & {}
, null
and undefined
types will be removed from types.
of(false, null, true, undefined).subscribe((value) => console.log(value));
// ^? value: boolean | null | undefined
of(false, null, true, undefined)
.pipe(isDefined)
.subscribe((value) => console.log(value));
// ^? value: boolean
P.S: T & {}
means exactly the same like Exclude<T, null | undefined>
.
Custom pipe operator with arguments
To create a custom pipe operator with custom arguments, we need to create a function generator.
import { Observable, of, OperatorFunction } from "rxjs";
function isNot<T, G extends unknown[]>(...forbiddenValues: G): OperatorFunction<T, Exclude<T, G[number]>> {
return function (source) {
return new Observable((subscriber) => {
const subscription = source.subscribe({
next(value) {
for (const forbiddenValue of forbiddenValues) {
if (value === forbiddenValue) {
return;
}
}
subscriber.next(value as Exclude<T, G[number]>);
},
});
return subscription;
});
};
}
Do not forget to tear down your subscription!
Thanks to Exclude<T, G[number]>
TypeScript can exclude defined types.
of(false, null, true, undefined).subscribe((value) => console.log(value));
// ^? value: boolean | null | undefined
of(false, null, true, undefined)
.pipe(isNot(null, false as const, undefined))
.subscribe((value) => console.log(value));
// ^? value: true
Conclusion
Creating a custom pipe operator in RxJS can help simplify complex data transformations and make your reactive programming code more readable and reusable, but remember, creating your own pipe operator from scratch should be your last choice.