Skip to content Skip to sidebar Skip to footer

There's A Way Of Create This Sequence Of Streams?

I'm trying to implement this marble diagram, with the hipotesis of have a N number of sN$, and I'm adding this streams to the main$. s1$ +--1--------------------99--------------

Solution 1:

You can use combineLatest. While that still require every stream to start with a value, you can prefix a null value to make every stream start with something using startWith.

const source = Rx.Observable.combineLatest(
  s1.startWith(void 0),
  s2.startWith(void 0),
  s3.startWith(void 0),
  (s1, s2, s3) => [s1, s2, s3])

Optional you can remove undefined values from the resulting array.

Now, we can extend that to work with a variable list of streams. Credits to @xgrommx.

main$
 .scan((a, c) => a.concat(c), [])
 .switch(obs => Rx.Observable.combineLatest(obs))

We can also use c.shareReplay(1) to make streams remember there last value when we switch. That however, wont combine with c.startWith(void 0), so we can use either one or the other.

Example:

    const main$ = new Rx.Subject()
    const s1$ = new Rx.Subject(1)
    const s2$ = new Rx.Subject(1)
    const s3$ = new Rx.Subject(1)
    const s4$ = new Rx.Subject(1)

    main$
     .scan((a, c) => a.concat(c.shareReplay(1)), [])
     .map(obs => Rx.Observable.combineLatest(obs))
     .switch()
     .map(v => v.filter(e => !!e))
     .map(v => v.join(','))
     .subscribe(v => $('#result').append('<br>' + v))

    main$.onNext(s1$)
    s1$.onNext(1)
    main$.onNext(s2$)
    s2$.onNext(void 0) // Since we can't use startWith
    main$.onNext(s3$)
    s3$.onNext(5)
    s1$.onNext(55)
    s2$.onNext(12)
    s2$.onNext(14)
    s3$.onNext(6)
    main$.onNext(s4$)
    s4$.onNext(999)
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
    <div id="result"></div>

Solution 2:

I solve finally the problem with some filter the nulls that I begin on the startWith():

main$
  .scan((a, c) => [...a, c.startWith(null).shareReplay(1)], [])
  .map(obs => Observable.combineLatest(obs))
  .switch()
  .map((x) => x.filter((x) => x != null))
  .filter((x) => x.length)

Looks un-readable (as any sequence of Rx, but If you draw the marble makes totally sense!)


Post a Comment for "There's A Way Of Create This Sequence Of Streams?"