Combine streams series 1
combineLatest
combineLatest([a$, b$, c$])
-
Emitted value combines the
latest
emitted value from each input Observable into an array -
When an item is emitted from
any
Observable- If all Observables have emitted
at least once
- Emits a value to the output Observable
- If all Observables have emitted
-
Static creation function, not a pipeable operator
-
Completes when all input Observables complete
Caveats
- To work with multiple data sets
- To reevaluate state when an action occurs, such as user action, filter items, click next page
forkJoin
forkJoin([a$, b$, c$])
-
Emitted value combines the
last
emitted value from each input observable into an array -
To wait to process any results and
Only emits one time
, until all input Observables complete -
Static creation function, not a pipeable operator
Caveats
- Don't use when working with Observables that don't complete, Such as action streams
withLatestFrom
a$.pipe(withLatestFrom(b$, c$))
: when a$ emits, get the latest of a$, b$, c$.
- Pipeable operator
Only when source Observable
emits, and If all Observables have emitted at least once, it will combine the latest
emitted value from each Observable into an array as the emit value of the output Observable
- Completes when the source Observable completes
Caveats
- To react to changes in only one Observable
- To regulate the output of the other Observables
Demo
Product model includes categoryId. In UI, we want to display the category. Category data rarely changes, and it's a small dataset. So we can load it once.
export interface Product {
id: number;
productName: string;
productCode?: string;
description?: string;
price?: number;
categoryId?: number;
+ category?: string;
}
Good examples:
productsWithCategory$ = combineLatest([this.products$, this.productCategories$]);
// output: [product[], category[]]
productsWithCategory$ = forkJoin([this.products$, this.productCategories$]);
// output: [product[], category[]]
Bad example: Why cannot we use withLatestFrom
? Because we are not sure if productCategories$ is returned earlier. If
products$ returns earlier, at the emit time, productCategories$ is not complete, it never emits a value, the output
observable cannot be emitted.
productsWithCategory$ = this.products$.pipe(withLatestFrom(this.productCategories$));
// output: [product[], category[]]
Full example:
products$ = this.http.get<Product[]>(this.url);
productCategories$ = this.http.get<ProductCategory[]>(this.categoriesUrl);
productsWithCategory$ = combineLatest([this.products$, this.productCategories$]).pipe(
map(([products, categories]) =>
products.map(
(product) =>
({
...product,
price: product.price * 1.5,
category: categories.find((c) => product.categoryId === c.id).name,
} as Product),
),
),
);
Combine action stream with data stream
Now let's step further. We want to filter products by category dropdown.
products$ = combineLatest([this.productService.products$, this.action$]).pipe(
map(([products, category]) => products.filter((product) => product.category === category)),
);
Creating an Action Stream
category dropdown selection will trigger to emit the categoryId.
private categorySelectedSubject = new Subject<number>();
categorySelectedAction$ = this.categorySelectedSubject.asObservable();
// expose subject emit method
onSelected = (categoryId: string) => {
this.categorySelectedSubject.next(+categoryId);
};
<select (change)="onSelected($event.target.value)">
<option *ngFor="let category of categories$ | async" [value]="category.id">{{ category.name }}</option>
</select>
Reacting to Actions
- Create an action stream (Subject/BehaviorSubject)
- Combine the action stream and data stream to react to each emission from the action stream
- Emit a value to the action stream when an action occurs
products$ = combineLatest([this.productService.products$, this.categorySelectedAction$]).pipe(
map(([products, categoryId]) =>
products.filter((product) => (categoryId ? product.categoryId === categoryId : true)),
),
);
Starting with an Initial Value
- startWith
this.categorySelectedAction$.pipe(startWith(0));
- BehaviorSubject
private categorySelectedSubject = new BehaviorSubject<number>(0);
categorySelectedAction$ = this.categorySelectedSubject.asObservable();
Final code
<!-- categoryId select -->
<select (change)="onSelected($any($event.target).value)">
<option value="0">- Display All Categories-</option>
<option *ngFor="let category of categories$ | async" [value]="category.id">{{ category.name }}</option>
</select>
<!-- product with category -->
<table *ngIf="products$ | async as products">
<thead>
<tr>
<th>Product</th>
<th>Category</th>
<th>Price</th>
</tr>
</thead>
<tbody>
<tr *ngFor="let product of products">
<td>{{ product.productName }}</td>
<td>{{ product.category }}</td>
<td>{{ product.price | currency : 'USD' : 'symbol' : '1.2-2' }}</td>
</tr>
</tbody>
</table>
<div class="alert alert-danger" *ngIf="errorMessage$ | async as errorMessage">{{ errorMessage }}</div>
@Component({
templateUrl: './product-list.component.html',
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ProductListComponent {
constructor(private productService: ProductService, private productCategoryService: ProductCategoryService) {}
private errorMessageSubject = new Subject<string>();
errorMessage$ = this.errorMessageSubject.asObservable();
private categorySelectedSubject = new BehaviorSubject<number>(0);
categorySelectedAction$ = this.categorySelectedSubject.asObservable();
products$ = combineLatest([this.productService.productsWithCategory$, this.categorySelectedAction$]).pipe(
map(([products, selectedCategoryId]) =>
products.filter((product) => (selectedCategoryId ? product.categoryId === selectedCategoryId : true)),
),
catchError((err) => {
this.errorMessageSubject.next(err);
return EMPTY;
}),
);
categories$ = this.productCategoryService.productCategories$.pipe(
catchError((err) => {
this.errorMessageSubject.next(err);
return EMPTY;
}),
);
onSelected(categoryId: string): void {
this.categorySelectedSubject.next(+categoryId);
}
}