In this post you’ll learn how to use the forkJoin operator in RxJS.
forkJoin is one of the most popular combination operators due to its similar behavior to Promise.all but for observables.
forkJoin accepts a variable number of observables and subscribes to them in parallel. When all of the provided observables complete, forkJoin collects the last emitted value from each and emits them as an array (this is by default, more on this later).
🔍 “emit” is the term used to mean when an observable “produces” a value!
Let’s create an observable with forkJoin that combines 3 observables that each emit a single value, then complete, using the of operator:
const joined$ = forkJoin(
of(3),
of('foo')
of(42)
);
joined$.subscribe(console.log);
Which produces the output:
[3, "foo", 42]
As a point of comparison, here’s how Promise.all
works:
Promise.all([
Promise.resolve(3),
new Promise((resolve, reject) => setTimeout(resolve, 3000, 'foo')),
42
]).then(values => console.log(values));
Which produces:
[3, "foo", 42]
So that’s the basic use of the forkJoin operator in RxJS, let’s get a bit more advanced.
Order and parallel execution
It’s worth noting that the forkJoin operator will preserve the order of inner observables regardless of when they complete.
To demonstrate this let’s create 2 delayed observables where the second observable will complete first:
const joinedAndDelayed$ = forkJoin(
of('Hey').pipe(delay(2000)),
of('Ho').pipe(delay(3000)),
of('Lets go!').pipe(delay(1000))
);
joinedAndDelayed$.subscribe(console.log);
The first observable, despite completing last, will still be the first result in the output array as forkJoin preserves the order of observables that were passed in - giving us:
['Hey', 'Ho', 'Lets go!'];
The output is produced after 3 seconds, which shows that all inner observables are run in parallel.
Alternative output options with forkJoin
By default, forkJoin will produce a result as an array of values as per the examples above.
Since RxJS 6.5+ the result can also be produced in object form by providing an object map as an argument to forkJoin instead of comma separated arguments:
const joinedWithObjectForm$ = forkJoin({
hey: of('Hey'),
ho: of('Ho')
});
joinedWithObjectForm$.subscribe(console.log);
This will then return you a dictionary of results, rather than an array, which you may find more readable:
{ hey: "Hey", ho: "Ho" }
Whichever method you choose I would suggest you be consistent throughout your application.
Free eBook
Directives, simple right? Wrong! On the outside they look simple, but even skilled Angular devs haven’t grasped every concept in this eBook.
- Observables and Async Pipe
- Identity Checking and Performance
- Web Components <ng-template> syntax
- <ng-container> and Observable Composition
- Advanced Rendering Patterns
- Setters and Getters for Styles and Class Bindings
Inner observable lifetime and forkJoin
As forkJoin only completes when all inner observables complete, we must be mindful if an observable never completes. If this does happen forkJoin will never complete either.
Using an interval we can ensure forkJoin will never complete and you’ll see the interval forever logging to the console:
const joinedWithIntervalNeverCompleting$ = forkJoin(
of('completed!'),
interval(1000).pipe(tap(i => console.log(`tick ${i}`)))
);
joinedWithIntervalNeverCompleting$.subscribe(console.log);
Output:
tick 0
tick 1
tick 2
tick 3
forever...
Ensure that all observables provided to forkJoin do eventually complete.
Let’s add the take operator to our interval, which will complete an observable after n
emissions (here we’ll use 3
):
const joinedWithIntervalCompleting$ = forkJoin(
of('completed!'),
interval(1000).pipe(
tap(i => console.log(`ticking ${i}`)),
take(3)
)
);
joinedWithIntervalCompleting$.subscribe(console.log);
Which ouputs:
['completed!', 2];
The number 2
inside the array output confirms that only the last emitted value from inner observables will be output in the result of forkJoin - with earlier values (when applicable) being ignored.
Using forkJoin with fromEvent
It’s always good to demonstrate using the forkJoin operator when clicking a button so we can learn more:
<button id="btn1">click once!</button>
<button id="btn2">click twice!</button>
<button id="btn3">click 3 times!</button>
Then we can dynamically construct an observable with an abstraction function that internally uses fromEvent
:
const fromEventWithId = (id) => {
return fromEvent(document.getElementById(`btn${id}`), 'click')
.pipe(take(id));
};
forkJoin(
fromEventWithId(1),
fromEventWithId(2),
fromEventWithId(3)
).subscribe(console.log);
Our id
is not only used as part of a selector, but also is passed into take(id)
.
This means we’d have to click buttons 1, 2 and 3 times, respectively, to get the result logged out to the console.
If you don’t click them the required number of times, the inner observables will not complete (and we know what happens then - nothing!).
Check it out in the live StackBlitz embed:
Error handling with forkJoin
Now let’s demonstrate an inner observable throwing an error - there are 2 ways to handle errors here.
The first, we’ll catch the error on the forkJoin level (but you must remember that it will only produce output from the catchError
observable).
This means that even if inner observables successfully completed, the result will not contain values emitted from the inner observables:
const joinedErroringWithOuterCatch$ = forkJoin(
of('Hey!'),
throwError('boooom!')
).pipe(catchError(error => of(error)));
joinedErroringWithOuterCatch$.subscribe(console.log);
output:
boooom!
Notice how 'Hey!'
does not get emitted due to the error, despite successfully completing.
The second way to handle errors is at the inner observable level. With this approach, the output will contain both error messages and successful value emissions from other observables which completed - much better:
const joinedErroringWitInnerCatch$ = forkJoin(
of('Hey!'),
throwError('boooom!').pipe(catchError(error => of(error)))
);
joinedErroringWitInnerCatch$.subscribe(console.log);
output:
['Hey!', 'boooom!'];
Managing a variable number of requests
As a final forkJoin example, there may be times where you need to dynamically handle a variable number of requests.
Here’s how you can create a helper function to send a variable number of http requests by utilizing forkJoin and the browser’s fetch API:
const joinWithVariableRequests$ = (...args) => {
return forkJoin(args.map(e => fetch(e).then(e => e.json())));
};
By utilizing the spread operator with ...args
this observable can be used with any number of urls:
joinWithVariableRequests$(
'https://aws.random.cat/meow',
'https://api.agify.io?name=michael'
).subscribe(console.log);
Since we are extracting the json
of the response body (then(e => e.json()))
), the above example will produce an array of response bodies:
[
{
file: 'https://purr.objects-us-east-1.dream.io/i/img_1013.jpg'
},
{
age: 68,
count: 233482,
name: 'michael'
}
];
You can find all code examples from this post in the StackBlitz embed below:
Summary
We’ve covered a lot of ground here, from combining successful and erroring observables through to intervals and DOM events.
forkJoin is an extremely powerful combination operator which will help you to combine your observables.
Remember these common tips (and gotchas) while using the RxJS forkJoin operator:
- Array versus Object output, looking at the alternative and new syntax for using an object instead of an array result
- How errors can effect the output result and how to properly handle errors on the inner or outer observable level
- All inner observables must complete before forkJoin will emit any result!
If you are serious about your RxJS skills, your next step is to take a look at our RxJS courses where you’ll learn RxJS practices, operators and advanced principles from beginner to expert level.
Thanks for reading, happy combining!