RxJava2.0教程(六)_一骑绝尘Louisk的博客-程序员宅基地

前言

在上一节中, 我们找到了上下游流速不均衡的源头 , 在这一节里我们将学习如何去治理它 . 可能很多看过其他人写的文章的朋友都会觉得只有Flowable才能解决 , 所以大家对这个Flowable都抱有很大的期许 , 其实呐 , 你们毕竟图样图森破 , 今天我们先抛开Flowable, 仅仅依靠我们自己的双手和智慧 , 来看看我们如何去治理 , 通过本节的学习之后我们再来看Flowable, 你会发现它其实并没有想象中那么牛叉, 它只是被其他人过度神化了.

正题

我们接着来看上一节的这个例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {  //无限循环发送事件
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

上一节中我们看到了它的运行结果是直接爆掉了内存, 也明白它为什么就爆掉了内存, 那么我们能做些什么, 才能不让这种情况发生呢.

之前我们说了, 上游发送的所有事件都放到水缸里了, 所以瞬间水缸就满了, 那我们可以只放我们需要的事件到水缸里呀, 只放一部分数据到水缸里, 这样不就不会溢出来了吗, 因此, 我们把上面的代码修改一下:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer % 10 == 0;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

在这段代码中我们增加了一个filter, 只允许能被10整除的事件通过, 再来看看运行结果:


filter.gif

可以看到, 虽然内存依然在增长, 但是增长速度相比之前, 已经减少了太多了, 至少在我录完GIF之前还没有爆掉内存, 大家可以试着改成能被100整除试试.

可以看到, 通过减少进入水缸的事件数量的确可以缓解上下游流速不均衡的问题, 但是力度还不够, 我们再来看一段代码:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .sample(2, TimeUnit.SECONDS)  //sample取样
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

这里用了一个sample操作符, 简单做个介绍, 这个操作符每隔指定的时间就从上游中取出一个事件发送给下游. 这里我们让它每隔2秒取一个事件给下游, 来看看这次的运行结果吧:


sample.gif

这次我们可以看到, 虽然上游仍然一直在不停的发事件, 但是我们只是每隔一定时间取一个放进水缸里, 并没有全部放进水缸里, 因此这次内存仅仅只占用了5M.

大家以后可以出去吹牛逼了: 我曾经通过技术手段去优化一个程序, 最终使得内存占用从300多M变成不到5M. ~(≧▽≦)/~

前面这两种方法归根到底其实就是减少放进水缸的事件的数量, 是以数量取胜, 但是这个方法有个缺点, 就是丢失了大部分的事件.

那么我们换一个角度来思考, 既然上游发送事件的速度太快, 那我们就适当减慢发送事件的速度, 从速度上取胜, 听上去不错, 我们来试试:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //每次发送完事件延时2秒
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

这次我们让上游每次发送完事件后都延时了2秒, 来看看运行结果:


sleep.gif

完美 ! 一切都是那么完美 !

可以看到, 我们给上游加上延时了之后, 瞬间一头发情的公牛就变得跟只小绵羊一样, 如此温顺, 如此平静, 如此平稳的内存线, 美妙极了. 而且事件也没有丢失, 上游通过适当的延时, 不但减缓了事件进入水缸的速度, 也可以让下游充足的时间从水缸里取出事件来处理 , 这样一来, 就不至于导致大量的事件涌进水缸, 也就不会OOM啦.

到目前为止, 我们没有依靠任何其他的工具, 就轻易解决了上下游流速不均衡的问题.

因此我们总结一下, 本节中的治理的办法就两种:

  • 一是从数量上进行治理, 减少发送进水缸里的事件
  • 二是从速度上进行治理, 减缓事件发送进水缸的速度

大家一定没忘记, 在上一节还有个Zip的例子, 这个例子也爆了我们的内存, 现学现用, 我们用刚学到的办法来试试能不能惩奸除恶, 先来看看第一种办法.

先来减少进入水缸的事件的数量:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).sample(2, TimeUnit.SECONDS); //进行sample采样

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });

来试试运行结果吧:


zip_sample.gif

哈哈, 成功了吧, 再来用第二种办法试试.

这次我们来减缓速度:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //发送事件之后延时2秒
                }
            }
        }).subscribeOn(Schedulers.io());

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });

来看看运行结果吧:


zip_sleep.gif

果然也成功了, 这里只打印出了下游收到的事件, 所以只有一个. 如果你对这个结果看不懂, 请自觉掉头看前面几篇文章.

通过本节的学习, 大家应该对如何处理上下游流速不均衡已经有了基本的认识了, 大家也可以看到, 我们并没有使用Flowable, 所以很多时候仔细去分析问题, 找到问题的原因, 从源头去解决才是最根本的办法. 后面我们讲到Flowable的时候, 大家就会发现它其实没什么神秘的, 它用到的办法和我们本节所讲的基本上是一样的, 只是它稍微做了点封装.

好了, 今天的教程就到这里吧, 下一节中我们就会来学习你们喜闻乐见的Flowable.

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u014628886/article/details/70233537

智能推荐

55、【backtrader股票策略】炒股票应该买便宜股票还是贵的股票?_backtrader 一个股票买一半-程序员宅基地

这篇文章是一篇单因子分析的文章,因子就是价格,分析的结果就是用于指导我们:买股票的时候是买便宜的股票还是买贵的股票更占优势?即价格便宜的股票相对于价格贵的股票是否具有超额收益。策略逻辑说明这个策略和前几个策略也非常相似,只是挑选股票的因子换成了价格,把股票按照价格高低排序为10组,做多价格最低的一组,做空价格最高的一组。和前几个策略的资金、资金分配、交易手续费都是一样的。我们使用全市场的A股日数据进行测试,做多头,也做空头。多头和空头都占用资金。假设初始资金有1个亿,手续费为万分之二。策略代_backtrader 一个股票买一半

MLFviewer2.0阅读器|MLF文件打开软件【完整版】_mlfwiewer-程序员宅基地

MLFviewer2.0阅读器|MLF文件打开软件【完整版】MLFviewer2.0阅读器|MLF文件打开软件; MLF 查看工具,查看.mlf 文件 MLF文件专用浏览器,分享一下,有需要的请下载。链接: https://pan.baidu.com/s/1glQxY-2PmaomqG74NnryYQ提取码: ysyy..._mlfwiewer

模板文件存在,在调试模式下却说不存在。-程序员宅基地

模板文件存在,在调试模式下却说不存在。如图。这是什么问题呢?调试模式是严格区分大小写的哦,方便在部署到Linux服务器之前找到问题。

SELECT查询语句-测试题-mysql_select 语法 测试-程序员宅基地

~本特利~以下测试请结合数据库 myemployees;链接:点击提取码:rhjw1. 下面的语句是否可以执行成功SELECT last_name , job_id , salary AS salFROM employees; ????部分截图:2.下面的语句是否可以执行成功SELECT * FROM employees; 3.找出下面语句中的错误select employee_id , last_name,salary * 12 “ANNUAL SALARY”fr_select 语法 测试

互联网IT当线上出现 bug 时,是怎么处理的?-程序员宅基地

线上BUG说处理方法:1、关于线上BUG问题,目前公司有一整套线上故障流程规范,包括故障定义、定级、处理流程、故障处理超时升级机制、故障处理小组、故障处罚(与故障存在时长有关)等;2、最主要的是,线上故障是研发和测试团队的KPI,KPI计算是分开的,线上只要出现BUG,研发必须承担责任;对于测试来说,是复盘的时候确认是不是漏测,只有漏测才会计算测试责任;3、最重要的是,公司从上下都重视这套故障流..._it软件线上出现bug后标准处理流程是什么

Ubuntu16.04 install on the Dell_ae_already_exists-程序员宅基地

[0.101261] ACPI Error: [_ppc] Namespace lookup failure, AE_ALREADY_EXISTS (20170531/dswload-378)[0.101268] ACPI Exception: AE_ALREADY_EXISTS, During name lookup/catalog (20170531/psobject-252)[0.170..._ae_already_exists

随便推点

golang重复声明变量_golang err重复命名-程序员宅基地

golang使用简短方式声明变量,左侧必须要有一个新变量,变量也可以重复声明。func main() { test1 := 0 test1, test2:= 1, 2 test1:= 3 //错误}编译报错:no new variables on left side of :=test1:=3是错误的,因为左边没有一个新变量..._golang err重复命名

自己动手创建开源项目的帮助文档_开源可集成文帮助文档-程序员宅基地

下面就以struts的帮助文档为例在学习Java的时候我们曾经学习过,Java有很强大的注释功能,其中 /****/ 这种注释可以用来生成文档,更具这个思路,一个开源的项目,我们可以下载到他们的源文件,那么其注释应该是完整的,通过这些注释就能生成完整的apidoc文档。下载最新的struts包:http://struts.apache.org/download下载文档_开源可集成文帮助文档

Java日志链路追踪工具LogHelper在ELK系统上的效果_java 链路日志-程序员宅基地

轻量级日志链路追踪工具,结合logstash-logback-encoder实现日志输出json格式化;支持Sykwalking traceId,支持Apache Dubbo,Alibaba Dubbo,SpringCloud微服务框架日志链路追踪;支持异步线程日志链路追踪;支持OkHttp,HttpClient,RestTemplate Http客户端工具日志链路追踪;提供分布式消息队列日志链路追中解决方案;支持简单的敏感字段脱敏打印项目地址:https://githu_java 链路日志

Go语言学习知识笔记_所有的内存在 go 中都是经过初始化的。-程序员宅基地

0x00 说明这里只记录和c语言不同的语法。0x01 基本语法1.变量声明的方式标准方式var 变量名 变量类型批量方式var( a int b string…)简短格式名字:=表达式使用简短格式的要求(只能在函数内部)2.Go语言变量的初始化所有的内存,在go里都是经过初始化的。这里仅记录bool初始为fa..._所有的内存在 go 中都是经过初始化的。

yocto project terms & 深入理解 BitBake-程序员宅基地

Yocto Project TermsFollowing is a list of terms and definitions users new to the Yocto Project development environment might find helpful. While some of these terms are universal, the list includes

1003 我要通过! (Python实现)-程序员宅基地

“答案正确”是自动判题系统给出的最令人欢喜的回复。本题属于 PAT 的“答案正确”大派送 —— 只要读入的字符串满足下列条件,系统就输出“答案正确”,否则输出“答案错误”。得到“答案正确”的条件是:字符串中必须仅有P、A、T这三种字符,不可以包含其它字符; 任意形如xPATx的字符串都可以获得“答案正确”,其中x或者是空字符串,或者是仅由字母A组成的字符串; 如果a...

推荐文章

热门文章

相关标签