Custom onDestroy / teardown rxjs pipe
April 02, 2024
In this post I want to give you a short introduction how you can write and use your custom rxjs pipe. And to demonstrate this best we will write an onDestroy / teardown pipe. With this pipe, you can attach a callback to an observable, that gets called if the observable completes or is unsubscribed.
You can find a StackBlitz example here
The code
import { Observable, Subscriber, Subscription, timer } from 'rxjs';
function onDestroy(callback: () => void) {
return function <T>(source: Observable<T>): Observable<T> {
return new Observable((subscriber: Subscriber<T>) => {
const subscription = source.subscribe({
next(value: any) {
subscriber.next(value);
},
error(error: Error) {
subscriber.error(error);
},
complete() {
subscriber.complete();
},
});
return () => {
callback();
subscription.unsubscribe();
};
});
};
}
How does a pipe work
A custom pipe may sound intimidating, but it actually is just a function that gets an observable as input and needs to return an observable. In a very minimal version this looks like this
function firstCustomPipe<T>(input: Observable<T>) {
return input;
}
The .pipe()
operator is looping over all provided pipe functions and each function gets invoked with the result from the previous
one.
In terms of logic we now can extend our basic example from before and create a new Observable to have full control over it.
Last but not least, we need to understand what unsubscribing from an Observable is doing. It’s not unsubscribing all pipes,
but just the last one. It’s the responsibility of each pipe to unsubscribe
the parent Observable. The new Observable needs to return
a function that gets called upon unsubscribing/completing, e.g. return () => subscription.unsubscribe();
Our teardown pipe
Lets quickly look at the code from above. We create a new function function onDestroy(callback: () => void) {}
and the callback
parameter is the one we want to call, as soon as the observable is unsubscribed.
To match the expected input for the pipe interface we return a function return function <T>(source: Observable<T>): Observable<T> {}
that gets an Observable as input and returns an Observable (in our case from the same type).
Inside our pipe function we create a new Observable
object. Inside this Observable we link everything from our new Observable
to the parent source to avoid any modification to the data in the observable.
And finally to achieve what we want to do in the first place, we are returning the previously mentioned function that gets
called on unsubscribing, but we extend it with our provided callback
parameter.
Using this pipe looks just like using any other pipe, e.g.
timer(0, 1000)
.pipe(
map(() => {}), // just an example, you can put any operator before or afterwards
onDestroy(() => {
console.log('My custom on destroy logic');
}),
).subscribe();
When is this useful
Honestly, just in very specific cases. Most of the time, if you manually subscribe
and unsusbscribe
from Observables you can
just add your teardown logic in the same place.
However, this gets really handy if you create Observables and pass them on to different parts of the code, e.g. services or
view components. With this approach you can still control if something needs to happen on unsubscribing, for example clearing
a storage.
Personal Blog written by Nicolas Gehlert, software developer from Freiburg im Breisgau. Developer & Papa. Github | Twitter
Add a comment
Comments
There are no comments available for this blog post yet