Skip to content Skip to sidebar Skip to footer

Joining Two Streams Of Observables In Rxjs According To Specific Conditions

I have two streams of objects, the accounts and balances. I need to merge (join) the two streams according to the id and account_id var accounts = Rx.Observable.from([ { id:

Solution 1:

Accoring to one of your comment, your example is to simulate a stream from an Angular Http call.

So instead of :

varaccounts=Rx.Observable.from([
    { id:1, name:'account 1' },
    { id:2, name:'account 2' },
    { id:3, name:'account 3' },]);varbalances=Rx.Observable.from([
    { account_id:1, balance:100 },
    { account_id:2, balance:200 },
    { account_id:3, balance:300 },]);

I'd rather say that it is :

varaccounts=Rx.Observable.of([
    { id:1, name:'account 1' },
    { id:2, name:'account 2' },
    { id:3, name:'account 3' },]);varbalances=Rx.Observable.of([
    { account_id:1, balance:100 },
    { account_id:2, balance:200 },
    { account_id:3, balance:300 },]);

Why :from will emit every item one by one, of will emit the entire array and I guess your http response is the whole array.

That said, what you probably want to achieve is :

const { Observable } = Rx;

// simulate HTTP requestsconst accounts$ = Rx.Observable.of([
  { id: 1, name: 'account 1' },
  { id: 2, name: 'account 2' },
  { id: 3, name: 'account 3' }
]);

const balances$ = Rx.Observable.of([
  { account_id: 1, balance: 100 },
  { account_id: 2, balance: 200 },
  { account_id: 3, balance: 300 }
]);

// utilsconstjoinArrays = (accounts, balances) =>
  accounts
    .map(account =>Object.assign({}, account, { balance: findBalanceByAccountId(balances, account.id).balance }));

constfindBalanceByAccountId = (balances, id) =>
  balances.find(balance => balance.account_id === id) || { balance: 0 };

constprint = (obj) => JSON.stringify(obj, null, 2)

// use forkJoin to start both observables at the same time and not wait between every requestObservable
  .forkJoin(accounts$, balances$)
  .map(([accounts, balances]) =>joinArrays(accounts, balances))
  .do(rslt =>console.log(print(rslt)))
  .subscribe();

Output :

[{"id":1,"name":"account 1","balance":100},{"id":2,"name":"account 2","balance":200},{"id":3,"name":"account 3","balance":300}]

Here's a working Plunkr : https://plnkr.co/edit/bc0YHrISu3FT45ftIFwz?p=preview

EDIT 1 : Working on an array to compose your result is probably not the best idea for performance and instead of returning an array, maybe try to return an object which have as key, the ID of the account. This way you might simply remove the findBalanceByAccountId function and have a faster app (only modified code here) :

constbalances$=Rx.Observable.of({1: { account_id:1, balance:100 },2: { account_id:2, balance:200 },3: { account_id:3, balance:300 }
});//utilsconstjoinArrays=(accounts,balances)=>accounts.map(account=>Object.assign(
      {},account, 
      { balance:balances[account.id].balance }
    ));

Solution 2:

If you have truly 2 observables that emit the results as Observable<{}> in random order, this is a way you can combine them. If the order is not random, or if they always come in 'pairs', more efficient way's exists to combine them.

import { from, merge } from'rxjs';
import { map, scan, tap } from'rxjs/operators';

const accounts = from([
  { id: 1, name: 'account 1' },
  { id: 2, name: 'account 2' },
  { id: 3, name: 'account 3' }
]);

const balances = from([
  { account_id: 1, balance: 100 },
  { account_id: 2, balance: 200 },
  { account_id: 3, balance: 300 }
]);

interfaceOutcome {
  id: number;
  name?: string;
  balance?: number;
}

merge<Outcome>(
  accounts,
  balances.pipe(map(a => ({ id: a.account_id, balance: a.balance })))
)
  .pipe(
    scan<Outcome>((result: Outcome[], incomming) => {
        const found = result.find(row => row.id === incomming.id);
        if (found) {
          Object.assign(found, incomming);
        } else {
          result.push(incomming);
        }
      return result;
    }, []),
    tap(r =>console.log(r))
  )
  .subscribe();

Please note that the result is a hot observable. If you only want to emit a single result and complete when all results are in, replace the scan operator with the reduce operator.

The source is based on RXjs version 6. your imports might differ a bit on older versions.

Post a Comment for "Joining Two Streams Of Observables In Rxjs According To Specific Conditions"