RxJava操作符

一、创建型

Create

create操作符是所有创建型操作符的“根”,也就是说其他创建型操作符最后都是通过create操作符来创建Observable的.

实例

 public void test01(){
        Log.e(TAG,"----------create-----------");
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    if (!subscriber.isUnsubscribed()) {
                        for (int i = 1; i < 5; i++) {
                            subscriber.onNext(i);
                        }
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG,e.getMessage());
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,"onNext : "+integer);
            }
        });
    }

运行结果

=====>: ----------create-----------
=====>: onNext : 1
=====>: onNext : 2
=====>: onNext : 3
=====>: onNext : 4
=====>: onCompleted

在使用create操作符时,最好要在回调的call函数中增加isUnsubscribed的判断,以便在subscriber在取消订阅时不会再执行call函数中相关代码逻辑,从而避免导致一些意想不到的错误出现;

from

from操作符是把其他类型的对象和数据类型转化成Observable

实例

public void test02(){
        Log.e(TAG,"----------from-----------");

        String[] strs={"aa","bb","cc","dd"};
        Observable.from(strs).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG,e.getMessage());
            }

            @Override
            public void onNext(String str) {
                Log.e(TAG,"onNext : "+str);
            }
        });
    }

运行结果

=====>: ----------from-----------
=====>: onNext : aa
=====>: onNext : bb
=====>: onNext : cc
=====>: onNext : dd
=====>: onCompleted

just

just操作符也是把其他类型的对象和数据类型转化成Observable,它和from操作符很像,只是方法的参数有所差别

实例

public void test03(){
        Log.e(TAG,"----------just-----------");
        Observable.just(1,"a",'b',"%").subscribe(new Subscriber<Serializable>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG,e.getMessage());
            }

            @Override
            public void onNext(Serializable serializable) {
                Log.e(TAG,"onNext : "+serializable);
            }
        });
    }

运行结果

=====>: ----------just-----------
=====>: onNext : 1
=====>: onNext : a
=====>: onNext : b
=====>: onNext : %
=====>: onCompleted

defer

defer操作符是直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行,defer操作符能够保证Observable的状态是最新的.

实例

Integer i=0;
public void test04(){
    Log.e(TAG,"----------defer-----------");

    // 下面通过比较defer操作符和just操作符的运行结果作比较:
    i=10;
    Observable justObservable = Observable.just(i);
    i=12;
    Observable deferObservable = Observable.defer(new Func0<Observable<Integer>>() {
        @Override
        public Observable<Integer> call() {
            return Observable.just(i);
        }
    });
    i=15;

    justObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer o) {
            Log.e(TAG,"just i="+o);
        }
    });

    deferObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer o) {
            Log.e(TAG,"defer i="+o);
        }
    });
}

运行结果

=====>: ----------defer-----------
=====>: just i=10
=====>: defer i=15

可以看到,just操作符是在创建Observable就进行了赋值操作,而defer是在订阅者订阅时才创建Observable,此时才进行真正的赋值操作

timer

timer操作符是隔一段时间产生一个数字,然后就结束,可以理解为延迟产生数字,也可以用来延迟执行动作

实例

public void test05(){
    Log.e(TAG,"----------timer-----------");
    Observable.timer(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            Log.e(TAG,"timer : "+aLong);
        }
    });
}

运行结果

=====>: ----------timer-----------

两秒后

=====>: timer : 0

timer操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程.

interval

interval操作符是每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大,也可以通过改变参数使其在一段时间内递增。默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程.

实例

 public void test06(){
        Log.e(TAG,"----------interval-----------");
        Observable.interval(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.e(TAG,"interval : "+aLong);
            }
        });
    }

运行结果

=====>: ----------interval-----------
=====>: interval : 0
=====>: interval : 1
=====>: interval : 2
=====>: interval : 3
...

range

range操作符是创建一组在从n开始,个数为m的连续数字,比如range(3,10),就是创建3、4、5…12的一组数字

实例

public void test07(){
        Log.e(TAG,"----------range-----------");
        Observable.range(4,5).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,"range : "+integer );
            }
        });
    }

运行结果

=====>: ----------range-----------
=====>: range : 4
=====>: range : 5
=====>: range : 6
=====>: range : 7
=====>: range : 8
=====>: onCompleted

repeat

repeat操作符是对某一个Observable,重复产生多次结果

实例

public void test08(){
        Log.e(TAG,"----------repeat-----------");
        //连续产生两组(3,4,5)的数字
        Observable.range(3,3).repeat(2).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("error:" + e.getMessage());
            }

            @Override
            public void onNext(Integer i) {
                Log.e(TAG,"repeat : "+i );
            }
        });
    }

运行结果

=====>: ----------repeat-----------
=====>: repeat : 3
=====>: repeat : 4
=====>: repeat : 5
=====>: repeat : 3
=====>: repeat : 4
=====>: repeat : 5
=====>: onCompleted

二、变换

buffer

  • buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始。
  • 一旦源Observable在产生结果的过程中出现异常,即使buffer已经存在收集到的结果,订阅者也会马上收到这个异常,并结束整个过程。

实例

public void test10(){
    Log.e(TAG,"----------buffer----------");
    //定义邮件内容
    final String[] mails = new String[]{"Here is an email!", "Another email!", "Yet another email!"};
    //每隔1秒就随机发布一封邮件
    Observable<String> endlessMail = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                if (subscriber.isUnsubscribed()) return;
                Random random = new Random();
                while (true) {
                    String mail = mails[random.nextInt(mails.length)];
                    subscriber.onNext(mail);
                    Thread.sleep(1000);
                }
            } catch (Exception ex) {
                subscriber.onError(ex);
            }
        }
    }).subscribeOn(Schedulers.io());
    //把上面产生的邮件内容缓存到列表中,并每隔3秒通知订阅者
    endlessMail.buffer(3, TimeUnit.SECONDS).subscribe(new Action1<List<String>>() {
        @Override
        public void call(List<String> list) {
            Log.e(TAG,String.format("You've got %d new messages!  Here they are!", list.size()));
            for (int i = 0; i < list.size(); i++)
                Log.e(TAG,"**" + list.get(i).toString());
        }
    });
}

运行结果

=====>: ----------buffer----------
=====>: You've got 3 new messages!  Here they are!
=====>: **Yet another email!
=====>: **Here is an email!
=====>: **Here is an email!
=====>: You've got 3 new messages!  Here they are!
=====>: **Here is an email!
=====>: **Yet another email!
=====>: **Another email!
...

map

map操作符是把源Observable产生的结果,通过映射规则转换成另一个结果集,并提交给订阅者进行处理。

实例

public void test11(){
    Log.e(TAG,"----------map----------");
    Observable.just(1,2,3,4,5,6).map(new Func1<Integer, Integer>() {
        @Override
        public Integer call(Integer integer) {
            //对源Observable产生的结果,都统一乘以3处理
            return integer*3;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"next:" + integer);
        }
    });
}

运行结果

=====>: ----------map----------
=====>: next:3
=====>: next:6
=====>: next:9
=====>: next:12
=====>: next:15
=====>: next:18

flatMap

  • flatMap操作符是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。
  • flatMap操作符通过传入一个函数作为参数转换源Observable,在这个函数中,你可以自定义转换规则,最后在这个函数中返回一个新的Observable,然后flatMap操作符通过合并这些Observable结果成一个Observable,并依次提交结果给订阅者。
  • flatMap操作符在合并Observable结果时,有可能存在交叉的情况

concatMap

  • cancatMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。
  • 与flatMap操作符不同的是,concatMap操作符在处理产生的Observable时,采用的是“连接(concat)”的方式,而不是“合并(merge)”的方式,这就能保证产生结果的顺序性,也就是说提交给订阅者的结果是按照顺序提交的,不会存在交叉的情况。

switchMap

  • switchMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。
  • 与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果,举个例子来说,比如源Observable产生A、B、C三个结果,通过switchMap的自定义映射规则,映射后应该会产生A1、A2、B1、B2、C1、C2,但是在产生B2的同时,C1已经产生了,这样最后的结果就变成A1、A2、B1、C1、C2,B2被舍弃掉了!

三者区别实例

public void test12(){
    Log.e(TAG,"------flatMap、concatMap、switchMap-----");

    //flatMap操作符的运行结果
    Observable.just(10, 20, 30).flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
            int delay = 200;
            if (integer > 10)
                delay = 180;
            return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
        }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"flatMap Next:" + integer);
        }
    });

    //concatMap操作符的运行结果
    Observable.just(10, 20, 30).concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
            int delay = 200;
            if (integer > 10)
                delay = 180;

            return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
        }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"concatMap Next:" + integer);
        }
    });

    //switchMap操作符的运行结果
    Observable.just(10, 20, 30).switchMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
            int delay = 200;
            if (integer > 10)
                delay = 180;

            return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
        }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"switchMap Next:" + integer);
        }
    });

}

运行结果

=====>: ------flatMap、concatMap、switchMap-----
=====>: flatMap Next:30
=====>: flatMap Next:20
=====>: flatMap Next:10
=====>: flatMap Next:15
=====>: flatMap Next:10
=====>: flatMap Next:5

=====>: switchMap Next:30
=====>: switchMap Next:15

=====>: concatMap Next:10
=====>: concatMap Next:5
=====>: concatMap Next:20
=====>: concatMap Next:10
=====>: concatMap Next:30
=====>: concatMap Next:15

groupBy

  • groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。
  • 值得注意的是,由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。

实例

public void test13(){
    Log.e(TAG,"--------groupBy--------");
    Observable.interval(1, TimeUnit.SECONDS).take(10).groupBy(new Func1<Long, Long>() {
        @Override
        public Long call(Long value) {
            //按照key为0,1,2分为3组
            return value % 3;
        }
    }).subscribe(new Action1<GroupedObservable<Long, Long>>() {
        @Override
        public void call(final GroupedObservable<Long, Long> result) {
            result.subscribe(new Action1<Long>() {
                @Override
                public void call(Long value) {
                    Log.e(TAG,"key:" + result.getKey() +", value:" + value);
                }
            });
        }
    });
}

运行结果

=====>: --------groupBy--------
=====>: key:0, value:0
=====>: key:1, value:1
=====>: key:2, value:2
=====>: key:0, value:3
=====>: key:1, value:4
=====>: key:2, value:5
=====>: key:0, value:6
=====>: key:1, value:7
=====>: key:2, value:8
=====>: key:0, value:9

cast

cast操作符类似于map操作符,不同的地方在于map操作符可以通过自定义规则,把一个值A1变成另一个值A2,A1和A2的类型可以一样也可以不一样;而cast操作符主要是做类型转换的,传入参数为类型class,如果源Observable产生的结果不能转成指定的class,则会抛出ClassCastException运行时异常。

实例

public void test14(){
    Log.e(TAG,"--------cast--------");
    Observable.just(1,2,3,4,5,6).cast(Integer.class).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer value) {
            Log.e(TAG,"next:"+value);
        }
    });
}

运行结果

=====>: --------cast--------
=====>: next:1
=====>: next:2
=====>: next:3
=====>: next:4
=====>: next:5
=====>: next:6

scan

scan操作符通过遍历源Observable产生的结果,依次对每一个结果项按照指定规则进行运算,计算后的结果作为下一个迭代项参数,每一次迭代项都会把计算结果输出给订阅者。

实例

public void test15(){
    Log.e(TAG,"--------scan--------");
    Observable.just(1, 2, 3, 4, 5)
            .scan(new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer sum, Integer item) {
                    //参数sum就是上一次的计算结果
                    return sum + item;
                }
            }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            Log.e(TAG,"Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            Log.e(TAG,"Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            Log.e(TAG,"Sequence complete.");
        }
    });
}

运行结果

=====>: --------scan--------
=====>: Next: 1
=====>: Next: 3
=====>: Next: 6
=====>: Next: 10
=====>: Next: 15

window

window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理。

实例

    Log.e(TAG,"--------window--------");
    Observable.interval(1, TimeUnit.SECONDS).take(12)
            .window(3, TimeUnit.SECONDS)
            .subscribe(new Action1<Observable<Long>>() {
                @Override
                public void call(Observable<Long> observable) {
                    Log.e(TAG,"subdivide begin......");
                    observable.subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            Log.e(TAG,"Next:" + aLong);
                        }
                    });
                }
            });

运行结果

=====>: --------window--------
=====>: subdivide begin......
=====>: Next:0
=====>: Next:1
=====>: subdivide begin......
=====>: Next:2
=====>: Next:3
=====>: Next:4
=====>: subdivide begin......
=====>: Next:5
=====>: Next:6
=====>: Next:7
=====>: subdivide begin......
=====>: Next:8
=====>: Next:9
=====>: Next:10
=====>: subdivide begin......
=====>: Next:11

三、过滤

debounce

  • debounce操作符对源Observable每产生一个结果后,如果在规定的间隔时间内没有别的结果产生,则把这个结果提交给订阅者处理,否则忽略该结果。
  • 值得注意的是,如果源Observable产生的最后一个结果后在规定的时间间隔内调用了onCompleted,那么通过debounce操作符也会把这个结果提交给订阅者。

实例

public void test17(){
    Log.e(TAG,"--------debounce--------");
    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            if(subscriber.isUnsubscribed()) return;
            try {
                //产生结果的间隔时间分别为100、200、300...900毫秒
                for (int i = 1; i < 10; i++) {
                    subscriber.onNext(i);
                    Thread.sleep(i * 100);
                }
                subscriber.onCompleted();
            }catch(Exception e){
                subscriber.onError(e);
            }
        }
    }).subscribeOn(Schedulers.newThread())
            .debounce(400, TimeUnit.MILLISECONDS)  //超时时间为400毫秒
            .subscribe(
                    new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e(TAG,"Next:" + integer);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            Log.e(TAG,"Next:" + "Error:" + throwable.getMessage());

                        }
                    }, new Action0() {
                        @Override
                        public void call() {
                            System.out.println("completed!");
                            Log.e(TAG,"completed!");

                        }
                    });
}

运行结果

=====>: --------debounce--------
=====>: Next:4
=====>: Next:5
=====>: Next:6
=====>: Next:7
=====>: Next:8
=====>: Next:9
=====>: completed!

distinct

distinct操作符对源Observable产生的结果进行过滤,把重复的结果过滤掉,只输出不重复的结果给订阅者,非常类似于SQL里的distinct关键字

实例

public void test18(){
    Log.e(TAG,"--------distinct--------");
    Observable.just(1, 2, 1, 1, 2, 3)
            .distinct()
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onNext(Integer item) {
                    Log.e(TAG,"Next: " + item);
                }

                @Override
                public void onError(Throwable error) {
                    Log.e(TAG,"Error: " + error.getMessage());
                }

                @Override
                public void onCompleted() {
                    Log.e(TAG,"Sequence complete.");
                }
            });

}

运行结果

=====>: --------distinct--------
=====>: Next: 1
=====>: Next: 2
=====>: Next: 3

elementAt

elementAt操作符在源Observable产生的结果中,仅仅把指定索引的结果提交给订阅者,索引是从0开始的

filter

filter操作符是对源Observable产生的结果按照指定条件进行过滤,只有满足条件的结果才会提交给订阅者

实例

private void test20() {
    Log.e(TAG,"--------filter--------");
    Observable.just(1, 2, 3, 4, 5)
            .filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer item) {
                    return( item < 4 );
                }
            }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"next:"+integer);
        }
    });
}

运行结果

=====>: --------filter--------
=====>: next:1
=====>: next:2
=====>: next:3

ofType

ofType操作符类似于filter操作符,区别在于ofType操作符是按照类型对结果进行过滤

实例

private void test21() {
    Log.e(TAG,"--------ofType--------");
    Observable.just(1, "hello world", true, 200L, 0.23f)
            .ofType(Float.class)
            .subscribe(new Action1<Float>() {
                @Override
                public void call(Float aFloat) {
                    Log.e(TAG,"next:"+aFloat);
                }
            });
}

运行结果

=====>: --------ofType--------
=====>: next:0.23

first

first操作符是把源Observable产生的结果的第一个提交给订阅者,first操作符可以使用elementAt(0)和take(1)替代

single

single操作符是对源Observable的结果进行判断,如果产生的结果满足指定条件的数量不为1,则抛出异常,否则把满足条件的结果提交给订阅者

实例

private void test23() {
    Log.e(TAG,"--------single--------");
    Observable.just(1,2,3,4,5,6,7,8)
            .single(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    //取大于10的第一个数字
                    return integer>10;
                }
            })
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onNext(Integer item) {
                    Log.e(TAG,"Next: " + item);
                }

                @Override
                public void onError(Throwable error) {
                    Log.e(TAG,"Error: " + error.getMessage());
                }

                @Override
                public void onCompleted() {
                    Log.e(TAG,"Sequence complete.");
                }
            });
}

运行结果

=====>: --------single--------
=====>: Error: Sequence contains no elements

last

last操作符把源Observable产生的结果的最后一个提交给订阅者,last操作符可以使用takeLast(1)替代。

ignoreElements

ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。ignoreElements操作符适用于不太关心Observable产生的结果,只是在Observable结束时(onCompleted)或者出现错误时能够收到通知。

实例

private void test24() {
    Log.e(TAG,"--------ignoreElements--------");
    Observable.just(1,2,3,4,5,6,7,8).ignoreElements()
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onNext(Integer item) {
                    Log.e(TAG,"Next: " + item);
                }

                @Override
                public void onError(Throwable error) {
                    Log.e(TAG,"Error: " + error.getMessage());
                }

                @Override
                public void onCompleted() {
                    Log.e(TAG,"Sequence complete.");
                }
            });
}

运行结果

=====>: --------ignoreElements--------
=====>: Sequence complete.

sample

sample操作符定期扫描源Observable产生的结果,在指定的时间间隔范围内对源Observable产生的结果进行采样

实例

private void test25() {
    Log.e(TAG,"--------sample--------");
    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            if(subscriber.isUnsubscribed()) return;
            try {
                //前8个数字产生的时间间隔为1秒,后一个间隔为3秒
                for (int i = 1; i < 9; i++) {
                    subscriber.onNext(i);
                    Thread.sleep(1000);
                }
                Thread.sleep(2000);
                subscriber.onNext(9);
                subscriber.onCompleted();
            } catch(Exception e){
                subscriber.onError(e);
            }
        }
    }).subscribeOn(Schedulers.newThread())
            .sample(2200, TimeUnit.MILLISECONDS)  //采样间隔时间为2200毫秒
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onNext(Integer item) {
                    Log.e(TAG,"Next: " + item);
                }

                @Override
                public void onError(Throwable error) {
                    Log.e(TAG,"Error: " + error.getMessage());
                }

                @Override
                public void onCompleted() {
                    Log.e(TAG,"Sequence complete.");
                }
            });
}

运行结果

=====>: --------sample--------
=====>: Next: 3
=====>: Next: 5
=====>: Next: 7
=====>: Next: 8
=====>: Next: 9
=====>: Sequence complete.

skip

skip操作符针对源Observable产生的结果,跳过前面n个不进行处理,而把后面的结果提交给订阅者处理

skipLast

  • skipLast操作符针对源Observable产生的结果,忽略Observable最后产生的n个结果,而把前面产生的结果提交给订阅者处理
  • 值得注意的是,skipLast操作符提交满足条件的结果给订阅者是存在延迟效果的

take

take操作符是把源Observable产生的结果,提取前面的n个提交给订阅者,而忽略后面的结果

takeFirst

takeFirst操作符类似于take操作符,同时也类似于first操作符,都是获取源Observable产生的结果列表中符合指定条件的前一个或多个,与first操作符不同的是,first操作符如果获取不到数据,则会抛出NoSuchElementException异常,而takeFirst则会返回一个空的Observable,该Observable只有onCompleted通知而没有onNext通知。

实例

private void test27() {
    Log.e(TAG,"--------takeFirst--------");
    Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            //获取数值大于3的数据
            return integer>3;
        }
    })
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onNext(Integer item) {
                    Log.e(TAG,"Next: " + item);
                }

                @Override
                public void onError(Throwable error) {
                    Log.e(TAG,"Error: " + error.getMessage());
                }

                @Override
                public void onCompleted() {
                    Log.e(TAG,"Sequence complete.");
                }
            });
}

运行结果

=====>: --------takeFirst--------
=====>: Next: 4
=====>: Sequence complete.

takeLast

takeLast操作符是把源Observable产生的结果的后n项提交给订阅者,提交时机是Observable发布onCompleted通知之时