Back to overview

forkJoin use cases

AngularRxJS

Sometimes we will have to get collections by Id instead of getAll once. There're different ways to do this. In this article, we will use forkJoin.

forkJoin

Accepts an Array of ObservableInput or a dictionary Object of ObservableInput and returns an Observable that emits either an array of last values in the exact same order as the passed array, or a dictionary of values in the same shape as the passed dictionary.

forkJoin(...args: any[]): Observable<any>

forkJoin will wait for all passed observables to emit and complete and then it will emit an array or an object with last values from corresponding observables.

forkJoin will not emit more than once and it will complete after that. If you need to emit combined values not only at the end of the lifecycle of passed observables, but also throughout it, try out combineLatest or zip instead.

If no input observables are provided (e.g. an empty array is passed), then the resulting stream will complete immediately.

forkJoin([]).subscribe({
  next: (res) => console.log('not execute'),
  complete: () => console.log('complete execute!'),
});

If any given observable errors at some point, forkJoin will error as well and immediately unsubscribe from the other observables.

forkJoin([
  of(1),
  throwError(() => new Error('any error in forkJoin will immediately unsubscribe other Observable')),
]).subscribe({
  next: (res) => console.log(res),
  error: (err) => console.error('error will execute due to input stream error:', err),
});

EMPTY will complete.

forkJoin([of(1), EMPTY]).subscribe({
  next: (res) => console.log('not execute'),
  complete: () => console.log('EMPTY will trigger complete'),
});

undefined cannot be removed using operator filter(Boolean) 😟:

forkJoin([of(1), of(2), of(undefined)])
  .pipe(filter(Boolean))
  .subscribe((res) => console.log(res)); // [1, 2, undefined], unexpected

To do this, create a new stream which filters out undefined 😘:

forkJoin([of(1), of(2), of(undefined)])
  .pipe(mergeMap((numbers) => of(numbers.filter(Boolean))))
  .subscribe((res) => console.log(res)); // [1, 2]

Use case

Requirement:

  1. Get all employees on page init, with checkbox for each employee. The checkbox value is the employee id.
  2. User can select multiple checkboxes (Ids), and set manager for them.
  3. Click "Submit" button (a) get more employee information by Ids, (b) post requests to update those employees with manager.

Let's assume we want to get employees with Id = 1,2,3 and 1000. The employee with Id=1000 doesn't exist.

Since forkJoin needs every Observable to be green to emit the final result, let's catchError and return undefined to ignore the error. This is just one way to handle the error. You can use other ways to catchError.

// prepare forkJoin input observables
const employee1$ = this.employeeService.getEmployeeById(1).pipe(catchError(() => of(undefined)));
const employee2$ = this.employeeService.getEmployeeById(2).pipe(catchError(() => of(undefined)));
const employee3$ = this.employeeService.getEmployeeById(3).pipe(catchError(() => of(undefined)));
const employee1000$ = this.employeeService.getEmployeeById(1000).pipe(catchError(() => of(undefined))); // will error out
// .pipe(catchError((err) => of(err))); We could use this approach as well.

// filter out only existing employees
const employees$ = forkJoin([employee1$, employee2$, employee3$, employee1000$]).pipe(
  switchMap((items) => of(items.filter(Boolean))),
);

employees$
  .pipe(
    // map existing employees to post API observables
    map((employArr) => {
      return employArr.map((e) =>
        this.employeeService.updateEmployeeById(e.id, {manager: 'William'}).pipe(catchError(() => of(undefined))),
      );
    }),
    // change stream to forkJoin, pass post observables as param
    switchMap((updateItems) => forkJoin(updateItems)),
  )
  .subscribe((updatedEmployees) => console.log(updatedEmployees));

Now let's do the real demo.

export class AppComponent {
  constructor(public employeeService: EmployeeService) {}

  form = new FormGroup({
    ids: new FormArray<FormControl<number>>([]),
    manager: new FormControl(''),
  });

  employees$: Observable<Employee[]>;
  requestedEmployee$: Observable<Employee[]>;
  updatedEmployees$: Observable<Employee[]>;

  // good, but hard to reset, we want to clear errors when re-submitting
  private errorSubject$ = new Subject<HttpErrorResponse>();
  errors$ = this.errorSubject$.asObservable().pipe(
    scan((acc, curr) => {
      if (acc.some((e) => e.error === curr.error)) {
        return [...acc];
      } else {
        return [...acc, curr];
      }
    }, []),
  );

  errors = [];

  submit() {
    // Error handling Thoughts:
    // thought 1: catchError() of(error) instead of undefined, so in the end the stream contains both error and result. Loop thru template and render errors

    // thought 2: use errors array instead of Observables. push error; clear array when submission
    // clear previous errors when submission
    this.errors = [];

    const employee$ = this.form.value.ids.map((id) =>
      this.employeeService.getEmployeeById(+id).pipe(
        catchError((err: HttpErrorResponse) => {
          // this.errorSubject$.next(err);
          this.errors.push(err);
          return of(undefined);
        }),
      ),
    );

    this.requestedEmployee$ = forkJoin(employee$).pipe(
      switchMap((items) => of(items.filter(Boolean))),
      shareReplay(1), // avoid subscribing twice since updatedEmployees$ replies on requestedEmployee$
    );

    this.updatedEmployees$ = this.requestedEmployee$.pipe(
      map((employArr) =>
        employArr.map((e) =>
          this.employeeService
            .updateEmployeeById(e.id, {manager: this.form.value.manager})
            .pipe(catchError(() => of(undefined))),
        ),
      ),
      switchMap((updateItems) => forkJoin(updateItems)),
    );
  }

  ngOnInit() {
    this.employees$ = this.employeeService.getEmployees();
  }

  onCheckboxChange(e) {
    const checkArray = this.form.get('ids') as FormArray;
    if (e.target.checked) {
      checkArray.push(new FormControl(e.target.value));
    } else {
      let i = 0;
      checkArray.controls.forEach((item: FormControl) => {
        if (item.value == e.target.value) {
          checkArray.removeAt(i);
          return;
        }
        i++;
      });
    }
  }
}
<form [formGroup]="form" (ngSubmit)="submit()">
  <label for="ids">Select employees</label>
  <ul id="ids">
    <li *ngFor="let e of employees$ | async">
      <input type="checkbox" name="id" [value]="e.id" (change)="onCheckboxChange($event)" />
      {{ e.id }} - {{ e.name }}
    </li>

    <li>
      <input type="checkbox" name="id" value="1000" (change)="onCheckboxChange($event)" />
      id 1000 which not exists in db
    </li>
    <li>
      <input type="checkbox" name="id" value="2000" (change)="onCheckboxChange($event)" />
      id 2000 which not exists in db
    </li>
  </ul>

  <label
    >Select Manager
    <label>
      <input type="radio" name="manager" value="William" formControlName="manager" />
      William
    </label>
    <label>
      <input type="radio" name="manager" value="Michael" formControlName="manager" />
      Michael
    </label>
    <label>
      <input type="radio" name="manager" value="John" formControlName="manager" />
      John
    </label>
  </label>

  <div>
    <button type="submit">Submit</button>
  </div>
</form>

<section>
  <h2>Requested employees (GET by Ids)</h2>
  <ul>
    <li *ngFor="let e of requestedEmployee$ | async">{{ e.id }} - {{ e.name }} - {{ e.etag }}</li>
  </ul>
</section>

<section class="red">
  <h2>errors</h2>
  <ul>
    <li *ngFor="let e of errors">{{ e.error }}</li>
  </ul>
</section>

<section>
  <h2>Updated employees (POST)</h2>
  <ul>
    <li *ngFor="let e of updatedEmployees$ | async">
      {{ e.id }} - {{ e.name }} - {{ e.manager }} - {{ e.department }}
    </li>
  </ul>
</section>

Complete Code:

Loading Code Editor