forkJoin use cases
Sometimes we will have to get collections by Id instead of getAll
once. There're different ways to do this. In this
article, we will use forkJoin
.
forkJoin
Accepts an Array of ObservableInput or a dictionary Object of ObservableInput and returns an Observable that emits either an array of
last values
in the exact same order as the passed array, or a dictionary of values in the same shape as the passed dictionary.
forkJoin(...args: any[]): Observable<any>
forkJoin
will wait for all passed observables to emit and complete and then it will emit an array or an object with
last values from corresponding observables.
forkJoin
will not emit more than once and it will complete after that. If you need to emit combined values not only at
the end of the lifecycle of passed observables, but also throughout it, try out combineLatest
or zip
instead.
If no input observables are provided (e.g. an empty array is passed), then the resulting stream will complete immediately.
forkJoin([]).subscribe({
next: (res) => console.log('not execute'),
complete: () => console.log('complete execute!'),
});
If any given observable errors at some point, forkJoin
will error as well and immediately unsubscribe from the other
observables.
forkJoin([
of(1),
throwError(() => new Error('any error in forkJoin will immediately unsubscribe other Observable')),
]).subscribe({
next: (res) => console.log(res),
error: (err) => console.error('error will execute due to input stream error:', err),
});
EMPTY
will complete.
forkJoin([of(1), EMPTY]).subscribe({
next: (res) => console.log('not execute'),
complete: () => console.log('EMPTY will trigger complete'),
});
undefined
cannot be removed using operator filter(Boolean)
😟:
forkJoin([of(1), of(2), of(undefined)])
.pipe(filter(Boolean))
.subscribe((res) => console.log(res)); // [1, 2, undefined], unexpected
To do this, create a new stream which filters out undefined
😘:
forkJoin([of(1), of(2), of(undefined)])
.pipe(mergeMap((numbers) => of(numbers.filter(Boolean))))
.subscribe((res) => console.log(res)); // [1, 2]
Use case
Requirement:
- Get all employees on page init, with checkbox for each employee. The checkbox value is the employee id.
- User can select multiple checkboxes (Ids), and set manager for them.
- Click "Submit" button (a) get more employee information by Ids, (b) post requests to update those employees with manager.
Let's assume we want to get employees with Id = 1,2,3 and 1000. The employee with Id=1000 doesn't exist.
Since forkJoin
needs every Observable to be green to emit the final result, let's catchError
and return undefined to
ignore the error. This is just one way to handle the error. You can use other ways to catchError
.
// prepare forkJoin input observables
const employee1$ = this.employeeService.getEmployeeById(1).pipe(catchError(() => of(undefined)));
const employee2$ = this.employeeService.getEmployeeById(2).pipe(catchError(() => of(undefined)));
const employee3$ = this.employeeService.getEmployeeById(3).pipe(catchError(() => of(undefined)));
const employee1000$ = this.employeeService.getEmployeeById(1000).pipe(catchError(() => of(undefined))); // will error out
// .pipe(catchError((err) => of(err))); We could use this approach as well.
// filter out only existing employees
const employees$ = forkJoin([employee1$, employee2$, employee3$, employee1000$]).pipe(
switchMap((items) => of(items.filter(Boolean))),
);
employees$
.pipe(
// map existing employees to post API observables
map((employArr) => {
return employArr.map((e) =>
this.employeeService.updateEmployeeById(e.id, {manager: 'William'}).pipe(catchError(() => of(undefined))),
);
}),
// change stream to forkJoin, pass post observables as param
switchMap((updateItems) => forkJoin(updateItems)),
)
.subscribe((updatedEmployees) => console.log(updatedEmployees));
Now let's do the real demo.
export class AppComponent {
constructor(public employeeService: EmployeeService) {}
form = new FormGroup({
ids: new FormArray<FormControl<number>>([]),
manager: new FormControl(''),
});
employees$: Observable<Employee[]>;
requestedEmployee$: Observable<Employee[]>;
updatedEmployees$: Observable<Employee[]>;
// good, but hard to reset, we want to clear errors when re-submitting
private errorSubject$ = new Subject<HttpErrorResponse>();
errors$ = this.errorSubject$.asObservable().pipe(
scan((acc, curr) => {
if (acc.some((e) => e.error === curr.error)) {
return [...acc];
} else {
return [...acc, curr];
}
}, []),
);
errors = [];
submit() {
// Error handling Thoughts:
// thought 1: catchError() of(error) instead of undefined, so in the end the stream contains both error and result. Loop thru template and render errors
// thought 2: use errors array instead of Observables. push error; clear array when submission
// clear previous errors when submission
this.errors = [];
const employee$ = this.form.value.ids.map((id) =>
this.employeeService.getEmployeeById(+id).pipe(
catchError((err: HttpErrorResponse) => {
// this.errorSubject$.next(err);
this.errors.push(err);
return of(undefined);
}),
),
);
this.requestedEmployee$ = forkJoin(employee$).pipe(
switchMap((items) => of(items.filter(Boolean))),
shareReplay(1), // avoid subscribing twice since updatedEmployees$ replies on requestedEmployee$
);
this.updatedEmployees$ = this.requestedEmployee$.pipe(
map((employArr) =>
employArr.map((e) =>
this.employeeService
.updateEmployeeById(e.id, {manager: this.form.value.manager})
.pipe(catchError(() => of(undefined))),
),
),
switchMap((updateItems) => forkJoin(updateItems)),
);
}
ngOnInit() {
this.employees$ = this.employeeService.getEmployees();
}
onCheckboxChange(e) {
const checkArray = this.form.get('ids') as FormArray;
if (e.target.checked) {
checkArray.push(new FormControl(e.target.value));
} else {
let i = 0;
checkArray.controls.forEach((item: FormControl) => {
if (item.value == e.target.value) {
checkArray.removeAt(i);
return;
}
i++;
});
}
}
}
<form [formGroup]="form" (ngSubmit)="submit()">
<label for="ids">Select employees</label>
<ul id="ids">
<li *ngFor="let e of employees$ | async">
<input type="checkbox" name="id" [value]="e.id" (change)="onCheckboxChange($event)" />
{{ e.id }} - {{ e.name }}
</li>
<li>
<input type="checkbox" name="id" value="1000" (change)="onCheckboxChange($event)" />
id 1000 which not exists in db
</li>
<li>
<input type="checkbox" name="id" value="2000" (change)="onCheckboxChange($event)" />
id 2000 which not exists in db
</li>
</ul>
<label
>Select Manager
<label>
<input type="radio" name="manager" value="William" formControlName="manager" />
William
</label>
<label>
<input type="radio" name="manager" value="Michael" formControlName="manager" />
Michael
</label>
<label>
<input type="radio" name="manager" value="John" formControlName="manager" />
John
</label>
</label>
<div>
<button type="submit">Submit</button>
</div>
</form>
<section>
<h2>Requested employees (GET by Ids)</h2>
<ul>
<li *ngFor="let e of requestedEmployee$ | async">{{ e.id }} - {{ e.name }} - {{ e.etag }}</li>
</ul>
</section>
<section class="red">
<h2>errors</h2>
<ul>
<li *ngFor="let e of errors">{{ e.error }}</li>
</ul>
</section>
<section>
<h2>Updated employees (POST)</h2>
<ul>
<li *ngFor="let e of updatedEmployees$ | async">
{{ e.id }} - {{ e.name }} - {{ e.manager }} - {{ e.department }}
</li>
</ul>
</section>
Complete Code: