How to create custom pipe operators in RxJS

RxJS is a popular library for reactive programming in JavaScript, and creating custom pipe operators can help simplify complex data transformations. In this article, we'll explore how to create a custom pipe operator in RxJS.

February 18, 2023 rxjs

Creating custom pipe operators by mixing existing operators

RxJS provides a powerful set of built-in operators that can be used to transform, filter, and combine streams of data.

Filter null and undefined values – filter in practice

In RxJS, filter is a higher-order function that only emits items from the original sequence that satisfy a specified predicate function.

import { filter, of, OperatorFunction } from "rxjs";

function isDefined<T>(): OperatorFunction<T, Exclude<T, null | undefined>> {
  return filter((value): value is Exclude<T, null | undefined> => value !== null && value !== undefined);
}

Thanks of T & {}, null and undefined types will be removed from types.

of(false, null, true, undefined).subscribe((value) => console.log(value));
//                                            ^? value: boolean | null | undefined

of(false, null, true, undefined)
  .pipe(isDefined)
  .subscribe((value) => console.log(value));
//              ^? value: boolean

P.S: T & {} means exactly the same like Exclude<T, null | undefined>.

Filter passed values as an argument – advanced usage of filter operator

Instead of hardcoded null and undefined values in the function you can move it to the function’s argument.

import { filter, OperatorFunction } from "rxjs";

function isNot<T, G extends unknown[]>(...forbiddenValues: G): OperatorFunction<T, Exclude<T, G[number]>> {
  return filter((value): value is Exclude<T, G[number]> => {
    for (const forbiddenValue of forbiddenValues) {
      if (value === forbiddenValue) {
        return false;
      }
    }

    return true;
  });
}

Thanks to Exclude<T, G[number]> TypeScript can exclude defined types.

of(false, null, true, undefined).subscribe((value) => console.log(value));
//                                           ^? value: boolean | null | undefined

of(false, null, true, undefined)
  .pipe(isNot(null, false as const, undefined))
  .subscribe((value) => console.log(value));
//            ^? value: true

Operator for auto-complete – combine three operators into one

Auto-complete, or word completion, is a feature in which an application predicts the rest of a word a user is typing. It is very important to limit the number of requests to a service to prevent excessive usage and protects the service from being overloaded or overwhelmed.

See the example of how to create a custom auto-complete operator in RxJS to protect your service from redundant calls.

import { MonoTypeOperatorFunction, pipe, fromEvent } from "rxjs";
import { auditTime, distinctUntilChanged, filter, map } from "rxjs/operators";

function limitEvents<T extends string>(): MonoTypeOperatorFunction<T> {
  return pipe(
    auditTime(500),
    distinctUntilChanged(),
    filter((value) => value.length > 3),
  );
}

fromEvent(document.querySelector("input")!, "change")
  .pipe(
    map((event) => event.target.value),
    limitEvents(),
  )
  .subscribe();

auditTime ignores source values for a given time in milliseconds then emits the most recent value from the source. Thanks to that each change will emit value only once per 500 milliseconds.

distinctUntilChanged emit value to the next operator only when a value has been changed.

filter will emit value only if the input has more than 3 characters.

Logger operator – single operator with predefined values

The purpose of a logger is to help developers understand the behavior of a system, diagnose issues, and troubleshoot problems. See the example of how to create a custom logger operator in RxJS to log each event to the console.

import { MonoTypeOperatorFunction } from "rxjs";
import { tap } from "rxjs/operators";
import { interval } from "rxjs";

function logger<T>(tag: string = "DEFAULT"): MonoTypeOperatorFunction<T> {
  return tap({
    next: (value) => console.log(`%c[${tag}] %cNEXT`, "color: #bada55", "color: #FFF", value),
    error: (error) => console.error(`%c[${tag}] %cERROR`, "color: #bada55", "color: #F00", error),
    complete: () => console.log(`%c[${tag}] %cCOMPLETED`, "color: #bada55", "color: #0F0"),
    subscribe: () => console.log(`%c[${tag}] %cSUBSCRIBE`, "color: #bada55", "color: #FF0"),
    unsubscribe: () => console.log(`%c[${tag}] %cUNSUBSCRIBE`, "color: #bada55", "color: #FF0"),
  });
}

interval(500).pipe(logger("INTERVAL")).subscribe();

Thanks to the defined colors in the log console, the output will look colorful and intuitive.

Creating a custom pipe operator based on a custom implementation

Creating a custom pipe operator should be the last step to resolving your problem. Try to resolve a problem using available operators, if it is not possible only then try to implement a custom operator.

To create a custom pipe operator, we first need to define a function that takes an observable as input and returns a new observable. This function should contain the logic for the transformation we want to perform.

import { Observable } from "rxjs";

function isDefined<T>(source: Observable<T>): Observable<T & {}> {
  return new Observable((subscriber) => {
    const subscription = source.subscribe({
      next(value) {
        if (value !== undefined && value !== null) {
          subscriber.next(value);
        }
      },
    });

    return subscription;
  });
}

You should always tear down created subscriptions inside a new Observable. To do it return created subscription or add a custom callback which will be called when the source will be unsubscribed.

import { Observable } from "rxjs";

function interval(period: number): Observable<number> {
  return new Observable((subscriber) => {
    let i = 0;
    const interval = setInterval(() => subscriber.next(i++), period);

    return () => clearInterval(interval);
  });
}

Thanks of T & {}, null and undefined types will be removed from types.

of(false, null, true, undefined).subscribe((value) => console.log(value));
//                                           ^? value: boolean | null | undefined

of(false, null, true, undefined)
  .pipe(isDefined)
  .subscribe((value) => console.log(value));
//             ^? value: boolean

P.S: T & {} means exactly the same like Exclude<T, null | undefined>.

Custom pipe operator with arguments

To create a custom pipe operator with custom arguments, we need to create a function generator.

import { Observable, of, OperatorFunction } from "rxjs";

function isNot<T, G extends unknown[]>(...forbiddenValues: G): OperatorFunction<T, Exclude<T, G[number]>> {
  return function (source) {
    return new Observable((subscriber) => {
      const subscription = source.subscribe({
        next(value) {
          for (const forbiddenValue of forbiddenValues) {
            if (value === forbiddenValue) {
              return;
            }
          }

          subscriber.next(value as Exclude<T, G[number]>);
        },
      });

      return subscription;
    });
  };
}

Do not forget to tear down your subscription!

Thanks to Exclude<T, G[number]> TypeScript can exclude defined types.

of(false, null, true, undefined).subscribe((value) => console.log(value));
//                                           ^? value: boolean | null | undefined

of(false, null, true, undefined)
  .pipe(isNot(null, false as const, undefined))
  .subscribe((value) => console.log(value));
//             ^? value: true

Conclusion

Creating a custom pipe operator in RxJS can help simplify complex data transformations and make your reactive programming code more readable and reusable, but remember, creating your own pipe operator from scratch should be your last choice.

Do you like the content?

Your support helps me continue my work. Please consider making a donation.

Donations are accepted through PayPal or Stripe. You do not need a account to donate. All major credit cards are accepted.

Leave a comment