RxJava操作符合集 - Go语言中文社区

RxJava操作符合集


创建操作

以下操作符用于创建Observable。

  • create: 使用OnSubscribe从头创建一个Observable,这种方法比较简单。需要注意的是,使用该方法创建时,建议在OnSubscribe#call方法中检查订阅状态,以便及时停止发射数据或者运算。

    
        Observable.create(new Observable.OnSubscribe<String>() {
    
            @Override
            public void call(Subscriber<? super String> subscriber) {
    
                subscriber.onNext("item1");
                subscriber.onNext("item2");
                subscriber.onCompleted();
            }
        });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • from: 将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable。Future转换为OnSubscribe是通过OnSubscribeToObservableFuture进行的,Iterable转换通过OnSubscribeFromIterable进行。数组通过OnSubscribeFromArray转换。 
    image_1arcl6d0a1iej60e6ccp48qic9.png-20.8kB

        //Iterable
        List<String> list=new ArrayList<>();
        ...
        Observable.from(list)
                .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
    
            }
        });
    
        //Future
         Future<String> futrue= Executors.newSingleThreadExecutor().submit(new Callable<String>() {
    
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return "maplejaw";
            }
        });
    
        Observable.from(futrue)
                  .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
    
            }
        });
    ;
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • just: 将一个或多个对象转换成发射这个或这些对象的一个Observable。如果是单个对象,内部创建的是ScalarSynchronousObservable对象。如果是多个对象,则是调用了from方法创建。

  • empty: 创建一个什么都不做直接通知完成的Observable
  • error: 创建一个什么都不做直接通知错误的Observable
  • never: 创建一个什么都不做的Observable

        Observable observable1=Observable.empty();//直接调用onCompleted。
        Observable observable2=Observable.error(new RuntimeException());//直接调用onError。这里可以自定义异常
        Observable observable3=Observable.never();//啥都不做
    • 1
    • 2
    • 3
  • timer: 创建一个在给定的延时之后发射数据项为0的Observable<Long>,内部通过OnSubscribeTimerOnce工作

     Observable.timer(1000,TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("JG",aLong.toString()); // 0
                    }
                });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • interval: 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>,内部通过OnSubscribeTimerPeriodically工作。

      Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                         //每隔1秒发送数据项,从0开始计数
                         //0,1,2,3....
                    }
                });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • range: 创建一个发射指定范围的整数序列的Observable<Integer>

     Observable.range(2,5).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("JG",integer.toString());// 2,3,4,5,6 从2开始发射5个数据
            }
        });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • defer: 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅时调用Func0创建Observable。

      Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                return Observable.just("hello");
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("JG",s);
            }
        });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

合并操作

以下操作符用于组合多个Observable。

注意,为了使结构更加清晰以及缩小代码量,之后的例子部分地方将会使用Lambda表达式书写,如果你对Lambda表达式不太熟悉的话,可以阅读JAVA8 Lambda表达式完全解析这篇文章。

  • concat: 按顺序连接多个Observables。需要注意的是Observable.concat(a,b)等价于a.concatWith(b)

        Observable<Integer> observable1=Observable.just(1,2,3,4);
        Observable<Integer>  observable2=Observable.just(4,5,6);
    
        Observable.concat(observable1,observable2)
                .subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6
    • 1
    • 2
    • 3
    • 4
    • 5
  • startWith: 在数据序列的开头增加一项数据。startWith的内部也是调用了concat

     Observable.just(1,2,3,4,5)
                .startWith(6,7,8)
        .subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5
    • 1
    • 2
    • 3
  • merge: 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知。 

  • zip: 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合。内部通过OperatorZip进行压合。

    Observable<Integer>  observable1=Observable.just(1,2,3,4);
    Observable<Integer>  observable2=Observable.just(4,5,6);
    
    
        Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
            @Override
            public String call(Integer item1, Integer item2) {
                return item1+"and"+item2;
            }
        })
        .subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • combineLatest: 。当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合。具体请看下面流程图。 
    zip工作流程 

    combineLatest工作流程 

过滤操作

        Observable.just(3,4,5,6)
                 .elementAt(2)
        .subscribe(item->Log.d("JG",item.toString())); //5

条件/布尔操作

聚合操作

转换操作

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_35100676/article/details/80765997
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢