rxjs6学习笔记----结合react,redux使用 - Go语言中文社区

rxjs6学习笔记----结合react,redux使用


rxjs版本是当前2018-07-23最新, 6.2.2。本文写了rxjs的基本知识和结合react的应用,ajax怎么用rx控制的套路,错误处理,以及常用操作符的细节。如switchMap,concatMap, flatMap的区别等。

"rxjs": "^6.2.2"

导入模块的区别

rxjs已经出到6了,5.5有一个大更新,6又有很多更新。首先是自动rethrow的错误不再是同步的,而是异步的,其次是模块导入的方法。现在只分为两类,前一类,如of, empty等方法,直接从rxjs里导入。
这里写图片描述

正确的导入方式:

import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, combineLatest, SubscriptionLike, PartialObserver } from 'rxjs';
import { map, filter, scan } from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';
import { ajax } from 'rxjs/ajax';
import { TestScheduler } from 'rxjs/testing';

注意concat和combineLatest都要从rxjs里Import

pipe

rxjs5.5以上使用pipe方法而非链式调用。所有链式的起点都用Pipe来开始,之后的每一次处理都用逗号隔开。例子

// rxjs v5.5+
const source$ = range(1, 3)
const liveStreaming$ = source$.pipe(
  flatMap(val => of(val).pipe(
    flatMap(val => from(fetch(`http://swapi.co/api/people/${val}`))),
    flatMap(res => from(res.json())),
    map(res => res.name),
    tap(console.log)
  ))
)

//rxjs old version
let stream$ = Rx.Observable
.of(1,2,3)
.flatMap((val) => {
  return Rx.Observable
            .of(val)
            .ajax({ url : url })
            .map((e) => e.response )
})

再比如一个concat的例子

import {
  interval, concat,
} from 'rxjs'
import {
  take, map,  tap,
} from 'rxjs/operators'

function addItem(val) {
  const node = document.createElement('li')
  const textnode = document.createTextNode(val)
  node.appendChild(textnode)
  document.getElementById('output').appendChild(node)
}
const s1$ = interval(1000).pipe(
  map(val => `s1   ${val}`),
  take(5)
)

const s2$ = interval(500).pipe(
  map(val => `s2  ${val}`),
  take(3)
)

const liveStreaming$ = concat(s1$, s2$).pipe(
  tap(console.log)
)

const subscription = liveStreaming$.subscribe(
  data => addItem(`${data}`),
  err => console.log(err),
  () => console.log('completed')
)

操作符

过滤操作符

distinctUntilChanged

会接收到上一次和这一次的对象作为参数,默认的比较是===,可以传入一个comparer function来自定义。例如,比较两个Object时,可以用R.equals
这个例子来自官网

* @example <caption>An example using a compare function</caption>
 * interface Person {
 *    age: number,
 *    name: string
 * }
 *
 * Observable.of<Person>(
 *     { age: 4, name: 'Foo'},
 *     { age: 7, name: 'Bar'},
 *     { age: 5, name: 'Foo'})
 *     { age: 6, name: 'Foo'})
 *     .distinctUntilChanged((p: Person, q: Person) => p.name === q.name)
 *     .subscribe(x => console.log(x));
 *
 * // displays:
 * // { age: 4, name: 'Foo' }
 * // { age: 7, name: 'Bar' }
 * // { age: 5, name: 'Foo' }
 *

在保存表单时,也可以用它来过滤xhr请求,这是redux里的写法

// epics.js  通常文件名叫epics
const openProject = actions$ => actions$.ofType(OPEN_PROJECT).pipe(
  distinctUntilChanged(R.equals),
  flatMap(
    ({ projectUid }) => request$.post('/api/project/open-status', { projectUid }).pipe(
      map(R.path(['data'])),
     ...

转换操作符

switchMap, flatMap, concatMap 的区别

这三个操作符都是map 与其他操作符的结合。这里简单总结了下,详见30 天精通RxJS(18): Observable Operators - switchMap, mergeMap, concatMap

concatMap = .map(fn).concatAll()
前一个完成后,第二个才会发出,有顺序
switchMap = .map(fn).switch()
下一个发出,则退订前一个。前一个成功也好,失败也好,不会造成任何side-effect了
flatMap = .map(fn).flatAll()  // 其实就是mergeMap, mergeAll()
有摊平的observable的效果,并行处理多个流。可能重叠。

三个操作符都可以传入第二个selector callback 参数,flatMap还可以传第三个参数限制并行处理的数量。

source : -----------c--c------------------...
        concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-1-2-0-1-2---------...

source : -----------c--c-----------------...
        concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0--0-1-2-----------...

source : -----------c-c------------------...
        concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-(10)-(21)-2----------...

错误处理

注意catchError本身可以再返回一个流,即catchError之后还可以再catch或者继续做事情。
所以顺序应该是

catchError(fn),
takeUntil(LOCATION_CHANGE$),
finalize(fn)

如果是request失败,需要再次尝试。retry()会立即发出,而在网络不稳定的时候,应该隔一会儿再发请求。需要用到delay 和 retryWhen, 以及 scan 来控制retry 的次数。参见错误处理 · RxJS 5 基本原理

const ATTEMPT_COUNT = 1
const DELAY_INTERVAL = 200

...

stream$.pipe(
    pluck('data'),
    retryWhen(e => e.pipe(
      scan((errorCount, err) => {
        if (errorCount >= ATTEMPT_COUNT) {
          throw err
        }
        return errorCount + 1
      }, 0),
      delay(DELAY_INTERVAL)
    )),
    catchError((err) => {
      console.error(`Request ${url} Error `, err.stack)
      return throwError(JSON.stringify(err.response.data))
    })

subject

这个话题就多了,包括怎么warm up 一个 observable, 把它从冷的变成热的。什么什么。。。
大意就是,subject是一个代理人的角色,它的subscribe 是把诸多 observer 加入到一个数组中,在next的时候依次通知这些observers 。作为一个中间代理人,它同时拥有 Observer 和 Observable 的行为。

学习资源

rxjs5的中文gitbook,略老但排版好看,适合入门
rxjs的github,在reactiveX下面,已经到6版本了 RxJS: Reactive Extensions For JavaScript
rxjs 英文官网,关于如何迁移到版本6
rxjs 中文翻译官方文档 比较全,翻译的一般般吧。。。
一个博客上关于rxjs的系列入门文章,英文的
hot-cold-observables,什么是热的观察对象,什么是冷的observable,怎么warm up

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/github_36487770/article/details/81168346
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-04-19 16:43:29
  • 阅读 ( 1361 )
  • 分类:前端

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢