rxjs6 Multicasting Operators & Error Handling Operators

Multicasting Operators
 

multicast

Multiple subscriptions, the value of post subscription is still from the beginning

let s = interval(300)
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)
// s1 0
// s1 1
// s1 2
// s1 3
// s2 0
// s1 4
// s2 1
// s1 5

multicast connects multiple subscription objects so that they can receive the same value even if they are not subscribed at the same time

let s = interval(300).pipe(multicast(() => new Subject()))
s.connect()
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)

// s1 0
// s1 1
// s1 2
// s1 3
// s2 3
// s1 4
// s2 4
// s1 5


publish

Short for multicast

let s = interval(300).pipe(publish())
s.connect()
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)
// s1 0
// s1 1
// s1 2
// s1 3
// s2 0
// s1 4
// s2 1
// s1 5


publishBehavior

There is a first default

let s = interval(300).pipe(publishBehavior('a'))
s.connect()
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)

// s1 a
// s1 0
// s1 1
// s1 2
// s2 2


publishLast

Take only the last data

let s = interval(300).pipe(take(10), publishLast())
s.connect()
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)
// s1 9
// s2 9


publishReplay

Specifies the cache length. When subscribing later, the data flow starts from the cache

let s = interval(100).pipe(take(20), publishReplay(3))
s.connect()
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)
// s1 7
// s1 8
// s2 6
// s2 7
// s2 8
// s1 9


share

share = publish + refCount

let s = interval(100).pipe(share())
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)
// s1 9
// s2 9
// s1 10
// s2 10
// s1 11

 

shareReplay

let s = interval(100).pipe(shareReplay(3))
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)

// s1 5
// s1 6
// s1 7
// s1 8
// s2 6
// s2 7
// s2 8

refCount

Auto connect

let s = interval(100).pipe(publish(),refCount())
s.subscribe(v => console.log('s1', v))
setTimeout(
  () => s.subscribe(v => console.log('s2', v)),
  1000
)
// s1 8
// s1 9
// s2 9
// s1 10
// s2 10
// s1 11


Error Handling Operators

catchError

Catch the error in the stream, return a new stream, if the error occurs again in the capture function, the exception capture function of the subscribe will be executed

range(1, 10).pipe(
  map(v => {
    if (v === 5)
      throw 'err'
    return v
  }),
  catchError(err => {
    console.log('err', err)
    return of('five')
  })
).subscribe(
  console.log,
  err => console.log('err2', err),
  () => console.log('complete')
)

// 1
// 2
// 3
// 4
// err err
// five
// complete


retry

Try again after the error, pay attention to the time interval

const {interval, defer, from} = require('rxjs')
const {repeat, tap, switchMap, take, retry} = require('rxjs/operators')

function ajax(text) {
  return new Promise(
    (resolve, reject) => {
      setTimeout(
        () => {
          if (Math.random() < .5) {
            console.log('reject', text)
            reject('reject err')
            return
          }
          console.log('resolve', text)
          resolve('get ' + text)
        }, 100 // When 200 * 3 > 500, it will not stop
      )
    }
  )
}


const input = interval(500)
// The input interval of input must be three times that of input, otherwise input will refresh and restart the request every time it stops
input.pipe(
  tap(v => {
    console.log('tap ', v)
  }),
  switchMap(x => defer(() => ajax(x)).pipe(
    retry(2)
  )),
).subscribe(val => {
  console.log('val', val)
}, err => {
  console.log('err', err)
})

// tap  0
// resolve 0
// val get 0
// tap  1
// reject 1
// resolve 1
// val get 1
// tap  2
// resolve 2
// val get 2
// tap  3
// reject 3
// resolve 3
// val get 3
// tap  4
// reject 4
// reject 4
// reject 4
// err reject err


retryWhen

When an error occurs in the outer data stream, retry when the inner data stream sends data

const source = interval(1000);
const example = source.pipe(
  map(val => {
    if (val > 5) {
      //error will be picked up by retryWhen
      throw val;
    }
    return val;
  }),
  retryWhen(errors =>
    errors.pipe(
      //log error message
      tap(val => console.log(`Value ${val} was too high!`)),
      //restart in 6 seconds
      delayWhen(val => timer(val * 1000))
    )
  )
);
/*
  output:
  0
  1
  2
  3
  4
  5
  "Value 6 was too high!"
  --Wait 6 seconds then repeat
*/
const subscribe = example.subscribe(val => console.log(val));

Tags: Programming

Posted on Mon, 10 Feb 2020 06:31:38 -0800 by phaseonemedia