Java是什么?
Java历史
Java语言特点
C++ VS Java比较
Java工厂设计模式
Java抽象工厂模式
Java单例模式
Java建造者(Builder)模式
Java原型模式
Java适配器模式
Java桥接模式
Java获取网络文件大小
Java套接字到单一的客户端
Java连接套接字
Java URL部分
Java URL连接日期
Java下载网页
Java主机指定IP地址
Java确定本地IP地址
Java检查端口占用
Java查找代理服务器设置
Java创建Socket
Java线程实例
Java检查线程活着
Java如何检查一个线程停止或没有?
Java解决死锁实例
Java如何获取正在运行的线程的优先级?
Java如何监视线程的状态?
Java获取线程名称
Java线程生产者消费者问题
Java如何设置线程的优先级?
Java如何停止线程一会儿?
Java如何暂停线程?
Java获取线程ID
Java如何检查线程的优先级?
Java显示所有正在运行的线程?
Java显示线程状态
Java中断一个线程
Java Applet实例
Java创建Applet
Java使用Applet创建横幅
Java使用Applet显示时钟?
Java在一个Applet创建不同形状
Java如何使用Applet填充形状的颜色?
Java使用Applet跳转到一个链接
Java在Applet创建事件监听器
Java使用Applet显示图像
Java使用Applet在新窗口中打开链接
Java使用Applet播放声音?
Java使用Applet读取文件
Java使用Applet写入文件
Java中Swing应用程序applet
Java简单的图形用户界面-GUI
Java以不同的字体显示文本
Java使用GUI画一条线
Java创建框架-frame
Java使用GUI显示多边形
Java在矩形中显示文本
Java GUI显示不同形状
Java如何绘制GUI实心矩形?
Java创建GUI透明光标
Java检查GUI平滑处理状态
Java在框架中显示颜色
Java GUI显示饼图
Java使用图形用户界面绘制文本
Java编辑表-table
Java 使用prepared语句
Java使用保存点和回滚
Java同时执行数据库多个SQL命令
Java使用行方法
Java使用列方法
Java正则表达式实例
Java将字符串分割
Java搜索重复单词
Java查找出现的单词
Java最后一个词的索引
Java模式匹配
Java删除空格
Java匹配电话号码
Java计数组词
Java搜索词组
Java拆分正则表达式
Java替换第一个出现字符串
Java检查日期格式
Java验证电子邮件地址格式
Java替换所有匹配字符串
Java使每个单词的第一个字母大写
从XML创建SqlSessionFactory实例
不使用XML来创建SqlSessionFactory
从SqlSessionFactory获取SqlSession
映射SQL语句
作用域和生命周期
Mapper XML配置
properties元素
Settings元素
typeAliases 元素
typeHandlers元素
理解CacheLine与写出更好的JAVA
Java核心技术点之动态代理
更好的使用JAVA线程池
理解Java中字符流与字节流的区别
深入分析Java方法反射的实现原理
关于Java面试,你应该准备这些知识点
Java内存模型
2017年你不能错过的Java类库
Leakcanary Square的一款Android/Java内存泄漏检测工具
Java Synchronised机制
Java核心技术点之注解
JVM(8):JVM知识点总览-高级Java工程师面试必备
JVM(3):Java GC算法 垃圾收集器
JVM(1):Java 类的加载机制
解决ActiveMQ中,Java与C++交互中文乱码问题
关于Java Collections的几个常见问题
Java I/O 总结
JVM源码分析之Java对象的创建过程
JVM源码分析之Java类的加载过程
Java GC的那些事(下)
Java GC的那些事(上)
java对象头的HotSpot实现分析
面试的角度诠释Java工程师(一)
面试的角度诠释Java工程师(二)
框架开发之Java注解的妙用
谈谈Java反射机制
Java并发:volatile内存可见性和指令重排
死磕Java并发:Java内存模型之happens-before
死磕Java并发:深入分析volatile的实现原理
死磕Java并发:深入分析synchronized的实现原理
Java 10 可能对 Lambda 表达式进行升级
G1垃圾回收器中的字符串去重(Java 8 Update 20)
Java RESTful框架的性能比较
理解RxJava的线程模型
继续了解Java的纤程库 – Quasar
Java中的纤程库 – Quasar
Java豆瓣电影爬虫——抓取电影详情和电影短评数据
Java集合框架源码剖析:LinkedHashSet 和 LinkedHashMap
Java Lambda表达式初探
Java中的陷阱题
Java 9的这一基本功能,你可能从未听过
关于Java并发编程的总结和思考
几种简单的负载均衡算法及其Java代码实现
JAVA虚拟机关闭钩子(Shutdown Hook)
Java 脚本化编程指南
Java Scripting API 使用示例
Java 8 的 Nashorn 脚本引擎教程
如何开始使用 Java 机器学习
CognitiveJ —— Java 的图像分析库
Java 性能优化的五大技巧
Java 解惑:Comparable 和 Comparator 的区别
Google Java编程风格指南
java NIO详解
Java 异常处理的误区和经验总结
Java语法糖(4):内部类
Java语法糖(3):泛型
Java语法糖(2):自动装箱和自动拆箱
Java消息队列任务的平滑关闭
Java语法糖(1):可变长度参数以及foreach循环原理
2016最流行的Java EE服务器
自己写一个java.lang.reflect.Proxy代理的实现
java 如何在pdf中生成表格
如何防止单例模式被JAVA反射攻击
java虚拟机 jvm 局部变量表实战
聊聊并发-Java中的Copy-On-Write容器
java.lang.Instrument 代理Agent使用
Java开发者需要了解的移动开发编程语言
13个不容错过的Java项目
2016年7款最佳 Java 框架推荐
Java 开发者值得关注的 11 个技术博客
Redmonk发布Java框架流行度调研结果
Java 8开发的4大顶级技巧
GitHub漫游指南:10个值得你关注的Java项目
除了Guava,Java开发者还值得了解的5个谷歌类库
Java中创建对象的5种不同方法
Java性能优化全攻略
奇怪的Java题:为什么1000 == 1000返回为False,而100 == 100会返回为True?
11个最值得Java开发者收藏的网站
Java的常见误区与细节
对Java意义重大的7个性能指标
Java调优经验谈
关于Java并发编程的总结和思考
HDFS Federation设计动机与基本原理
《Effective STL》学习笔记(第三部分)
《Effective STL》学习笔记(第二部分)
《Effective STL》学习笔记(第一部分)
数据结构之位图
Thrift使用指南
Cassandra概要介绍
Cassandra部署与安装
Cassandra客户端
Cassandra数据模型
Cassandra中的各种策略
数据结构之树状数组
数据结构之伸展树
数据结构之后缀数组
数据结构之堆
浅析MRv1与MRv2的API兼容性
Apache Tez最新进展
运行在YARN上的计算框架
从传统操作系统角度理解Hadoop YARN

理解RxJava的线程模型

于2017-05-10由小牛君创建

分享到:


ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io

Netflix参考微软的Reactive Extensions创建了Java的实现RxJava,主要是为了简化服务器端的并发。2013年二月份,Ben Christensen 和 Jafar Husain发在Netflix技术博客的一篇文章第一次向世界展示了RxJava。

RxJava也在Android开发中得到广泛的应用。

ReactiveX
An API for asynchronous programming with observable streams.
A combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

虽然RxJava是为异步编程实现的库,但是如果不清楚它的使用,或者错误地使用了它的线程调度,反而不能很好的利用它的异步编程提到系统的处理速度。本文通过实例演示错误的RxJava的使用,解释RxJava的线程调度模型,主要介绍SchedulerobserveOnsubscribeOn的使用。

本文中的例子以并发发送http request请求为基础,通过性能检验RxJava的线程调度。

第一个例子,性能超好?

我们首先看第一个例子:

  public static void testRxJavaWithoutBlocking(int count) throws Exception {
    CountDownLatch finishedLatch = new CountDownLatch(1);
    long t = System.nanoTime();
    Observable.range(0, count).map(i -> {
        //System.out.println("A:" + Thread.currentThread().getName());
        return 200;
    }).subscribe(statusCode -> {
        //System.out.println("B:" + Thread.currentThread().getName());
    }, error -> {
    }, () -> {
        finishedLatch.countDown();
    });
    finishedLatch.await();
    t = (System.nanoTime() - t) / 1000000; //ms
    System.out.println("RxJavaWithoutBlocking TPS: " + count * 1000 / t);
}

这个例子是一个基本的RxJava的使用,利用Range创建一个Observable, subscriber处理接收的数据。因为整个逻辑没有阻塞,程序运行起来很快,
输出结果为:

RxJavaWithoutBlocking TPS: 7692307 。

2 加上业务的模拟,性能超差

上面的例子是一个理想化的程序,没雨任何阻塞。我们模拟一下实际的应用,加上业务处理。

业务逻辑是发送一个http的请求,httpserver是一个模拟器,针对每个请求有30毫秒的延迟。subscriber统计请求结果:

public static void testRxJavaWithBlocking(int count) throws Exception {
        URL url = new URL("http://127.0.0.1:8999/");
        CountDownLatch finishedLatch = new CountDownLatch(1);
        long t = System.nanoTime();
        Observable.range(0, count).map(i -> {
            try {
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                int responseCode = conn.getResponseCode();
                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    //response.append(inputLine);
                }
                in.close();
                return responseCode;
            } catch (Exception ex) {
                return -1;
            }
        }).subscribe(statusCode -> {
        }, error -> {
        }, () -> {
            finishedLatch.countDown();
        });
        finishedLatch.await();
        t = (System.nanoTime() - t) / 1000000; //ms
        System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);
    }

运行结果如下:

RxJavaWithBlocking TPS: 29。

@#¥%%……&!

性能怎么突降呢,第一个例子看起来性能超好啊,http server只增加了一个30毫秒的延迟,导致这个方法每秒只能处理29个请求。

如果我们估算一下, 29*30= 870 毫秒,大约1秒,正好和单个线程发送处理所有的请求的TPS差不多。
后面我们也会看到,实际的确是一个线程处理的,你可以在代码中加入

3 加上调度器,不起作用?

如果你对subscribeOnobserveOn方法有些印象的话,可能会尝试使用调度器去解决:

public static void testRxJavaWithBlocking(int count) throws Exception {
        URL url = new URL("http://127.0.0.1:8999/");
        CountDownLatch finishedLatch = new CountDownLatch(1);
        long t = System.nanoTime();
        Observable.range(0, count).map(i -> {
            try {
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                int responseCode = conn.getResponseCode();
                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    //response.append(inputLine);
                }
                in.close();
                return responseCode;
            } catch (Exception ex) {
                return -1;
            }
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(statusCode -> {
        }, error -> {
        }, () -> {
            finishedLatch.countDown();
        });
        finishedLatch.await();
        t = (System.nanoTime() - t) / 1000000; //ms
        System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);
    }

加上.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())看一下性能:

RxJavaWithBlocking TPS: 30

性能没有改观,是时候了解一下RxJava线程调度的问题了。

4 RxJava的线程模型

首先,依照Observable ContractonNext是顺序执行的,不会同时由多个线程并发执行。

图片来自 http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

默认情况下,它是在调用subscribe方法的那个线程中执行的。如第一个例子和第二个例子,Rx的操作和消息接收处理都是在同一个线程中执行的。一旦由阻塞,比如第二个例子,久会导致这个线程被阻塞,吞吐量下降。

图片来自 https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2

但是subscribeOn可以改变Observable的运行线程。

图片来自 https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2

上图中可以看到,如果你使用了subscribeOn方法,则Rx的运行将会切换到另外的线程上,而不是默认的调用线程。

需要注意的是,如果在Observable链中调用了多个subscribeOn方法,无论调用点在哪里,Observable链只会使用第一个subscribeOn指定的调度器,正所谓”一见倾情”。
但是onNext还是顺序执行的,所以第二个例子的性能依然低下。

observeOn可以中途改变Observable链的线程。前面说了,subscribeOn方法改变的源Observable的整个的运行线程,要想中途切换线程,就需要observeOn方法。

图片来自 http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

官方的一个简略晦涩的解释如下:

The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

一图胜千言:

图片来自 http://reactivex.io

注意箭头的颜色和横轴的颜色,不同的颜色代表不同的线程。

5 Schedulers

上面我们了解了RxJava可以使用subscribeOnobserveOn可以改变和切换线程,以及onNext是顺序执行的,不是并发执行,至多也就切换到另外一个线程,如果它中间的操作是阻塞的,久会影响整个Rx的执行。

Rx是通过调度器来选择哪个线程执行的,RxJava内置了几种调度器,分别为不同的case提供线程:

  • io() : 这个调度器时用于I/O操作, 它可以增长或缩减来确定线程池的大小它是使用CachedThreadScheduler来实现的。需要注意的是,它的线程池是无限制的,如果你使用了大量的线程的话,可能会导致OutOfMemory等资源用尽的异常。
  • computation() : 这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。

因为这些方法内部已经调用的调度器,所以你再调用subscribeOn是无效的,比如下面的例子总是使用computation调度器的线程。

Observable.just(1,2,3)
                .delay(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.newThread())
                .map(i -> {
                    System.out.println("map: " + Thread.currentThread().getName());
                    return i;
                })
                .subscribe(i -> {});
  • immediate() :这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。
  • newThread() :创建一个新的线程只从。
  • trampoline() :为当前线程建立一个队列,将当前任务加入到队列中依次执行。

同时,Schedulers还提供了from静态方法,用户可以定制线程池:

ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
Schedulers.from(es)

6 改造,异步执行

现在,我们已经了解了RxJava的线程运行,以及相关的调度器。可以看到上面的例子还是顺序阻塞执行的,即使是切换到另外的线程上,依然是顺序阻塞执行,显示它的吞吐率非常非常的低。下一步我们就要改造这个例子,让它能异步的执行。

下面是一种改造方案,我先把代码贴出来,再解释:

public static void testRxJavaWithFlatMap(int count) throws Exception {
    ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
    URL url = new URL("http://127.0.0.1:8999/");
    CountDownLatch finishedLatch = new CountDownLatch(1);
    long t = System.nanoTime();
    Observable.range(0, count).subscribeOn(Schedulers.io()).flatMap(i -> {
                //System.out.println("A: " + Thread.currentThread().getName());
                return Observable.just(i).subscribeOn(Schedulers.from(es)).map(v -> {
                            //System.out.println("B: " + Thread.currentThread().getName());
                            try {
                                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                                conn.setRequestMethod("GET");
                                int responseCode = conn.getResponseCode();
                                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                                String inputLine;
                                while ((inputLine = in.readLine()) != null) {
                                    //response.append(inputLine);
                                }
                                in.close();
                                return responseCode;
                            } catch (Exception ex) {
                                return -1;
                            }
                        }
                );
            }
    ).observeOn(Schedulers.computation()).subscribe(statusCode -> {
        //System.out.println("C: " + Thread.currentThread().getName());
    }, error -> {
    }, () -> {
        finishedLatch.countDown();
    });
    finishedLatch.await();
    t = (System.nanoTime() - t) / 1000000; //ms
    System.out.println("RxJavaWithFlatMap TPS: " + count * 1000 / t);
    es.shutdownNow();
}

通过flatmap可以将源Observable的元素项转成n个Observable,生成的每个Observable可以使用线程池并发的执行,同时flatmap还会将这n个Observable merge成一个Observable。你可以将其中的注释打开,看看线程的执行情况。

性能还不错:

RxJavaWithFlatMap TPS: 3906

FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

图片来自 http://reactivex.io

7 另一种解决方案

我们已经清楚了要并行执行提高吞吐率的解决办法就是创建多个Observable并且并发执行。基于这种解决方案,我们还可以有其它的解决方案。

上一方案中利用flatmap创建多个Observable,针对我们的例子,我们何不直接创建多个Observable呢?

public static void testRxJavaWithParallel(int count) throws Exception {
    ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
    URL url = new URL("http://127.0.0.1:8999/");
    CountDownLatch finishedLatch = new CountDownLatch(count);
    long t = System.nanoTime();
    for (int k = 0; k < count; k++) {
        Observable.just(k).map(i -> {
            //System.out.println("A: " + Thread.currentThread().getName());
            try {
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                int responseCode = conn.getResponseCode();
                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    //response.append(inputLine);
                }
                in.close();
                return responseCode;
            } catch (Exception ex) {
                return -1;
            }
        }).subscribeOn(Schedulers.from(es)).observeOn(Schedulers.computation()).subscribe(statusCode -> {
        }, error -> {
        }, () -> {
            finishedLatch.countDown();
        });
    }
    finishedLatch.await();
    t = (System.nanoTime() - t) / 1000000; //ms
    System.out.println("RxJavaWithParallel TPS: " + count * 1000 / t);
    es.shutdownNow();
}

性能更好一点:

RxJavaWithParallel2 TPS: 4716。

这个例子没有使用Schedulers.io()作为它的调度器,这是因为如果在大并发的情况下,可能会出现创建过多的线程导致资源不错,所以我们限定使用200个线程。

8 总结

  • subscribeOn() 改变的Observable运行(operate)使用的调度器,多次调用无效。
  • observeOn() 改变Observable发送notifications的调度器,会影响后续的操作,可以多次调用
  • 默认情况下, 操作链使用的线程是调用subscribe()的线程
  • Schedulers提供了多个调度器,可以并行运行多个Observable
  • 使用RxJava可以实现异步编程,但是依然要小心线程阻塞。而且由于这种异步的编程,调试代码可能更加的困难

9 参考文档

  1. http://reactivex.io/documentation/contract.html
  2. http://reactivex.io/documentation/operators/subscribeon.html 中文翻译
  3. http://reactivex.io/documentation/operators/observeon.html 中文翻译
  4. http://reactivex.io/documentation/scheduler.html
  5. http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html
  6. http://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html
  7. https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2 中文翻译
  8. https://github.com/mcxiaoke/RxDocs