首家大数据培训挂牌机构 股票代码:837906 | EN CN
Java是什么?
Java历史
Java语言特点
C++ VS Java比较
Java工厂设计模式
Java抽象工厂模式
Java单例模式
Java建造者(Builder)模式
Java原型模式
Java适配器模式
Java桥接模式
Java获取网络文件大小
Java套接字到单一的客户端
Java连接套接字
Java URL部分
Java URL连接日期
Java下载网页
Java主机指定IP地址
Java确定本地IP地址
Java检查端口占用
Java查找代理服务器设置
Java创建Socket
Java线程实例
Java检查线程活着
Java如何检查一个线程停止或没有?
Java解决死锁实例
Java如何获取正在运行的线程的优先级?
Java如何监视线程的状态?
Java获取线程名称
Java线程生产者消费者问题
Java如何设置线程的优先级?
Java如何停止线程一会儿?
Java如何暂停线程?
Java获取线程ID
Java如何检查线程的优先级?
Java显示所有正在运行的线程?
Java显示线程状态
Java中断一个线程
Java Applet实例
Java创建Applet
Java使用Applet创建横幅
Java使用Applet显示时钟?
Java在一个Applet创建不同形状
Java如何使用Applet填充形状的颜色?
Java使用Applet跳转到一个链接
Java在Applet创建事件监听器
Java使用Applet显示图像
Java使用Applet在新窗口中打开链接
Java使用Applet播放声音?
Java使用Applet读取文件
Java使用Applet写入文件
Java中Swing应用程序applet
Java简单的图形用户界面-GUI
Java以不同的字体显示文本
Java使用GUI画一条线
Java创建框架-frame
Java使用GUI显示多边形
Java在矩形中显示文本
Java GUI显示不同形状
Java如何绘制GUI实心矩形?
Java创建GUI透明光标
Java检查GUI平滑处理状态
Java在框架中显示颜色
Java GUI显示饼图
Java使用图形用户界面绘制文本
Java编辑表-table
Java 使用prepared语句
Java使用保存点和回滚
Java同时执行数据库多个SQL命令
Java使用行方法
Java使用列方法
Java正则表达式实例
Java将字符串分割
Java搜索重复单词
Java查找出现的单词
Java最后一个词的索引
Java模式匹配
Java删除空格
Java匹配电话号码
Java计数组词
Java搜索词组
Java拆分正则表达式
Java替换第一个出现字符串
Java检查日期格式
Java验证电子邮件地址格式
Java替换所有匹配字符串
Java使每个单词的第一个字母大写
从XML创建SqlSessionFactory实例
不使用XML来创建SqlSessionFactory
从SqlSessionFactory获取SqlSession
映射SQL语句
作用域和生命周期
Mapper XML配置
properties元素
Settings元素
typeAliases 元素
typeHandlers元素
理解CacheLine与写出更好的JAVA
Java核心技术点之动态代理
更好的使用JAVA线程池
理解Java中字符流与字节流的区别
深入分析Java方法反射的实现原理
关于Java面试,你应该准备这些知识点
Java内存模型
2017年你不能错过的Java类库
Leakcanary Square的一款Android/Java内存泄漏检测工具
Java Synchronised机制
Java核心技术点之注解
JVM(8):JVM知识点总览-高级Java工程师面试必备
JVM(3):Java GC算法 垃圾收集器
JVM(1):Java 类的加载机制
解决ActiveMQ中,Java与C++交互中文乱码问题
关于Java Collections的几个常见问题
Java I/O 总结
JVM源码分析之Java对象的创建过程
JVM源码分析之Java类的加载过程
Java GC的那些事(下)
Java GC的那些事(上)
java对象头的HotSpot实现分析
面试的角度诠释Java工程师(一)
面试的角度诠释Java工程师(二)
框架开发之Java注解的妙用
谈谈Java反射机制
Java并发:volatile内存可见性和指令重排
死磕Java并发:Java内存模型之happens-before
死磕Java并发:深入分析volatile的实现原理
死磕Java并发:深入分析synchronized的实现原理
Java 10 可能对 Lambda 表达式进行升级
G1垃圾回收器中的字符串去重(Java 8 Update 20)
Java RESTful框架的性能比较
理解RxJava的线程模型
继续了解Java的纤程库 – Quasar
Java中的纤程库 – Quasar
Java豆瓣电影爬虫——抓取电影详情和电影短评数据
Java集合框架源码剖析:LinkedHashSet 和 LinkedHashMap
Java Lambda表达式初探
Java中的陷阱题
Java 9的这一基本功能,你可能从未听过
关于Java并发编程的总结和思考
几种简单的负载均衡算法及其Java代码实现
JAVA虚拟机关闭钩子(Shutdown Hook)
Java 脚本化编程指南
Java Scripting API 使用示例
Java 8 的 Nashorn 脚本引擎教程
如何开始使用 Java 机器学习
CognitiveJ —— Java 的图像分析库
Java 性能优化的五大技巧
Java 解惑:Comparable 和 Comparator 的区别
Google Java编程风格指南
java NIO详解
Java 异常处理的误区和经验总结
Java语法糖(4):内部类
Java语法糖(3):泛型
Java语法糖(2):自动装箱和自动拆箱
Java消息队列任务的平滑关闭
Java语法糖(1):可变长度参数以及foreach循环原理
2016最流行的Java EE服务器
自己写一个java.lang.reflect.Proxy代理的实现
java 如何在pdf中生成表格
如何防止单例模式被JAVA反射攻击
java虚拟机 jvm 局部变量表实战
聊聊并发-Java中的Copy-On-Write容器
java.lang.Instrument 代理Agent使用
Java开发者需要了解的移动开发编程语言
13个不容错过的Java项目
2016年7款最佳 Java 框架推荐
Java 开发者值得关注的 11 个技术博客
Redmonk发布Java框架流行度调研结果
Java 8开发的4大顶级技巧
GitHub漫游指南:10个值得你关注的Java项目
除了Guava,Java开发者还值得了解的5个谷歌类库
Java中创建对象的5种不同方法
Java性能优化全攻略
奇怪的Java题:为什么1000 == 1000返回为False,而100 == 100会返回为True?
11个最值得Java开发者收藏的网站
Java的常见误区与细节
对Java意义重大的7个性能指标
Java调优经验谈
关于Java并发编程的总结和思考
HDFS Federation设计动机与基本原理
《Effective STL》学习笔记(第三部分)
《Effective STL》学习笔记(第二部分)
《Effective STL》学习笔记(第一部分)
数据结构之位图
Thrift使用指南
Cassandra概要介绍
Cassandra部署与安装
Cassandra客户端
Cassandra数据模型
Cassandra中的各种策略
数据结构之树状数组
数据结构之伸展树
数据结构之后缀数组
数据结构之堆
浅析MRv1与MRv2的API兼容性
Apache Tez最新进展
运行在YARN上的计算框架
从传统操作系统角度理解Hadoop YARN

Java中的纤程库 – Quasar

于2017-05-10由小牛君创建

分享到:


最近遇到的一个问题大概是微服务架构中经常会遇到的一个问题:

服务 A 是我们开发的系统,它的业务需要调用 BCD 等多个服务,这些服务是通过http的访问提供的。 问题是 BCD 这些服务都是第三方提供的,不能保证它们的响应时间,快的话十几毫秒,慢的话甚至1秒多,所以这些服务的Latency比较长。幸运地是这些服务都是集群部署的,容错率和并发支持都比较高,所以不担心它们的并发性能,唯一不爽的就是就是它们的Latency太高了。

简化的微服务架构

简化的微服务架构

系统A会从Client接收Request, 每个Request的处理都需要多次调用B、C、D的服务,所以完成一个Request可能需要1到2秒的时间。为了让A能更好地支持并发数,系统中使用线程池处理这些Request。当然这是一个非常简化的模型,实际的业务处理比较复杂。

可以预见,因为系统B、C、D的延迟,导致整个业务处理都很慢,即使使用线程池,但是每个线程还是会阻塞在B、C、D的调用上,导致I/O阻塞了这些线程, CPU利用率相对来说不是那么高。

当然在测试的时候使用的是B、C、D的模拟器,没有预想到它们的响应是那么慢,因此测试数据的结果还不错,吞吐率还可以,但是在实际环境中问题就暴露出来了。

1 概述

最开始线程池设置的是200,然后用HttpUrlConnection作为http client发送请求到B、C、D。当然HttpUrlConnection也有一些坑,比如Persistent ConnectionsCaveats of HttpURLConnection,跳出坑后性能依然不行。

通过测试,如果B、C、D等服务延迟接近0毫秒,则HttpUrlConnection的吞吐率(线程池的大小为200)能到40000 requests/秒,但是随着第三方服务的响应时间变慢,它的吞吐率急剧下降,B、C、D的服务的延迟为100毫秒的时候,则HttpUrlConnection的吞吐率降到1800 requests/秒,而B、C、D的服务的延迟为100毫秒的时候HttpUrlConnection的吞吐率降到550 requests/秒。

增加http.maxConnections系统属性并不能显著增加吞吐率。

如果增加调用HttpUrlConnection的线程池的大小,比如增加到2000,性能会好一些,但是B、C、D的服务的延迟为500毫秒的时候,吞吐率为3800 requests/秒,延迟为1秒的时候,吞吐率为1900 requests/秒。

虽然线程池的增大能带来性能的提升,但是线程池也不能无限制的增大,因为每个线程都会占用一定的资源,而且随着线程的增多,线程之间的切换也更加的频繁,对CPU等资源也是一种浪费。

切换成netty(channel pool),与B、C、D通讯的性能还不错, latency为500ms的时候吞吐率能达到10000 requests/秒,通讯不成问题,问题是需要将业务代码改成异步的方式,异步地接收到这些response后在一个线程池中处理这些消息。

下面列出了一些常用的http client:

  • JDK’s URLConnection uses traditional thread-blocking I/O.
  • Apache HTTP Client uses traditional thread-blocking I/O with thread-pools.
  • Apache Async HTTP Client uses NIO.
  • Jersey is a ReST client/server framework; the client API can use several HTTP client backends including URLConnection and Apache HTTP Client.
  • OkHttp uses traditional thread-blocking I/O with thread-pools.
  • Retrofit turns your HTTP API into a Java interface and can use several HTTP client backends including Apache HTTP Client.
  • Grizzly is network framework with low-level HTTP support; it was using NIO but it switched to AIO .
  • Netty is a network framework with HTTP support (low-level), multi-transport, includes NIO and native (the latter uses epoll on Linux).
  • Jetty Async HTTP Client uses NIO.
  • Async HTTP Client wraps either Netty, Grizzly or JDK’s HTTP support.
  • clj-http wraps the Apache HTTP Client.
  • http-kit is an async subset of clj-http implemented partially in Java directly on top of NIO.
  • http async client wraps the Async HTTP Client for Java.

这个列表摘自 High-Concurrency HTTP Clients on the JVM,不止于此,这篇文章重点介绍基于java纤程库quasar的实现的http client库,并比较了性能。我们待会再说。

回到我前面所说的系统,如何能更好的提供性能?有一种方案是借助其它语言的优势,比如Go,让Go来代理完成和B、C、D的请求,系统A通过一个TCP连接与Go程序交流。第三方服务B、C、D的Response结果可以异步地返回给系统A。

Go的优势在于可以实现request-per-goroutine,整个系统中可以有成千上万个goroutine。 goroutine是轻量级的,而且在I/O阻塞的时候可以不占用线程,这让Go可以轻松地处理上万个链接,即使I/O阻塞也没问题。Go和Java之间的通讯协议可以通过Protobuffer来实现,而且它们之间只保留一个TCP连接即可。

当然这种架构的修改带来系统稳定性的降低,服务A和服务B、C、D之间的通讯增加了复杂性。同时,因为是异步方式,服务A的业务也要实现异步方式,否则200个线程依然等待Response的话,还是一个阻塞的架构。

通过测试,这种架构可以带来稳定的吞吐率。 不管服务B、C、D的延迟有多久,A的吞吐率能维持15000 requests/秒。当然Go到B、C、D的并发连接数也有限制,我把最大值调高到20000。

这种曲折的方案的最大的两个弊病就是架构的复杂性以及对原有系统需要进行大的重构。 高复杂性带来的是系统的稳定性的降低,包括部署、维护、网络状况、系统资源等。同时系统要改成异步模型,因为系统业务线程发送Request后不能等待Go返回Response,它需要从Client接收更多的Request,而收到Response之后它才继续执行剩下的业务,只有这样才不会阻塞,进而提到系统的吞吐率。

将系统A改成异步,然后使用HttpUrlConnection线程池行不行?
HttpUrlConnection线程池还是导致和B、C、D通讯的吞吐率下降,但是Go这种方案和B、C、D通讯的吞吐率可以维持一个较高的水平。

考虑到Go的优势,那么能不能在Java中使用类似Go的这种goroutine模型呢?那就是本文要介绍的Java纤程库: [Quasar](http://docs.paralleluniverse.co/quasar/)。

实际测试结果表明Go和Netty都是两种比较好的解决方案,而且Netty的性能惊人的好,不好的地方正如前面所讲,我们需要将代码改成异步的处理。线程池中的业务单元用Netty发送完Request之后,不要等待Response, Response的处理交给另外的线程来处理,同时注意不要在Netty的Handler里面处理业务逻辑。要解决的问题就变成如何更高效的处理Response了,而不是第三方系统阻塞的问题。

2 quasar初步

以下介绍Java的另一个解决方案,也就是Java中的coroutine库,因为最近刚刚看这个库,感觉挺不错的,而且用它替换Thread改动较少。

Java官方并没有纤程库。但是伟大的社区提供了一个优秀的库,它就是Quasar。

创始人是Ron Pressler和Dafna Pressler,由Y Combinator孵化。

Quasar is a library that provides high-performance lightweight threads, Go-like channels, Erlang-like actors, and other asynchronous programming tools for Java and Kotlin.

Quasar提供了高性能轻量级的线程,提供了类似Go的channel,Erlang风格的actor,以及其它的异步编程的工具,可以用在Java和Kotlin编程语言中。Scala目前的支持还不完善,我想如果这个公司能快速的发展壮大,或者被一些大公司收购的话,对Scala的支持才能提上日程。

你需要把下面的包加入到你的依赖中:

  • Core (必须) co.paralleluniverse:quasar-core:0.7.5[:jdk8] (对于 JDK 8,需要增加jdk8 classifier)
  • Actor co.paralleluniverse:quasar-actors:0.7.5
  • Clustering co.paralleluniverse:quasar-galaxy:0.7.5
  • Reactive Stream co.paralleluniverse:quasar-reactive-streams:0.7.5
  • Kotlin co.paralleluniverse:quasar-kotlin:0.7.5

Quasar fiber依赖java instrumentation修改你的代码,可以在运行时通过java Agent实现,也可以在编译时使用ant task实现。

通过java agent很简单,在程序启动的时候将下面的指令加入到命令行:

-javaagent:path-to-quasar-jar.jar

对于maven来说,你可以使用插件maven-dependency-plugin,它会为你的每个依赖设置一个属性,以便在其它地方引用,我们主要想使用 ${co.paralleluniverse:quasar-core:jar}:

<plugin>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>2.5.1</version>
    <executions>
        <execution>
            <id>getClasspathFilenames</id>
            <goals>
                <goal>properties</goal>
            </goals>
        </execution>
     </executions>
</plugin>

然后你可以配置exec-maven-plugin或者maven-surefire-plugin加上agent参数,在执行maven任务的时候久可以使用Quasar了。

官方提供了一个Quasar Maven archetype,你可以通过下面的命令生成一个quasar应用原型:

git clone https://github.com/puniverse/quasar-mvn-archetype
cd quasar-mvn-archetype
mvn install
cd ..
mvn archetype:generate -DarchetypeGroupId=co.paralleluniverse -DarchetypeArtifactId=quasar-mvn-archetype -DarchetypeVersion=0.7.4 -DgroupId=testgrp -DartifactId=testprj
cd testprj
mvn test
mvn clean compile dependency:properties exec:exec

如果你使用gradle,可以看一下gradle项目模版:Quasar Gradle template project

最容易使用Quasar的方案就是使用Java Agent,它可以在运行时instrument程序。如果你想编译的时候就使用AOT instrumentation(Ahead-of-Time),可以使用Ant任务co.paralleluniverse.fibers.instrument.InstrumentationTask,它包含在quasar-core.jar中。

Quasar最主要的贡献就是提供了轻量级线程的实现,叫做fiber(纤程)。Fiber的功能和使用类似Thread, API接口也类似,所以使用起来没有违和感,但是它们不是被操作系统管理的,它们是由一个或者多个ForkJoinPool调度。一个idle fiber只占用400K内存,切换的时候占用更少的CPU,你的应用中可以有上百万的fiber,显然Thread做不到这一点。这一点和Go的goroutine类似。

Fiber并不意味着它可以在所有的场景中都可以替换Thread。当fiber的代码经常会被等待其它fiber阻塞的时候,就应该使用fiber。
对于那些需要CPU长时间计算的代码,很少遇到阻塞的时候,就应该首选thread

以上两条是选择fiber还是thread的判断条件,主要还是看任务是I/O blocking相关还是CPU相关。幸运地是,fiber API使用和thread使用类似,所以代码略微修改久就可以兼容。

Fiber特别适合替换哪些异步回调的代码。使用FiberAsync异步回调很简单,而且性能很好,扩展性也更高。

类似Thread, fiber也是用Fiber类表示:

new Fiber<V>() {
  @Override
  protected V run() throws SuspendExecution, InterruptedException {
        // your code
    }
}.start();

与Thread类似,但也有些不同。Fiber可以有一个返回值,类型为泛型V,也可以为空Void。run也可以抛出异常InterruptedException

你可以传递SuspendableRunnable 或 SuspendableCallable 给Fiber的构造函数:

new Fiber<Void>(new SuspendableRunnable() {
  public void run() throws SuspendExecution, InterruptedException {
    // your code
  }
}).start();

甚至你可以调用Fiber的join方法等待它完成,调用get方法得到它的结果。

Fiber继承Strand类。Strand类代表一个Fiber或者Thread,提供了一些底层的方法。

逃逸的Fiber(Runaway Fiber)是指那些陷入循环而没有block、或者block fiber本身运行的线程的Fiber。偶尔有逃逸的fiber没有问题,但是太频繁会导致性能的下降,因为需要调度器的线程可能都忙于逃逸fiber了。Quasar会监控这些逃逸fiber,你可以通过JMX监控。如果你不想监控,可以设置系统属性co.paralleluniverse.fibers.detectRunawayFibersfalse

fiber中的ThreadLocal是fiber local的。InheritableThreadLocal继承父fiber的值。

Fiber、SuspendableRunnable 、SuspendableCallable 的run方法会抛出SuspendExecution异常。但这并不是真正意义的异常,而是fiber内部工作的机制,通过这个异常暂停因block而需要暂停的fiber。

任何在Fiber中运行的方法,需要声明这个异常(或者标记@Suspendable),都被称为suspendable method。

反射调用通常都被认为是suspendable, Java8 lambda 也被认为是suspendable。不应该将类构造函数或类初始化器标记为suspendable。

synchronized语句块或者方法会阻塞操作系统线程,所以它们不应该标记为suspendable。Blocking线程调用默认也不被quasar允许。但是这两种情况都可以被quasar处理,你需要在Quasar javaagent中分别加上mb参数,或者ant任务中加上allowMonitorsallowBlocking属性。

3 quasar原理

Quasar最初fork自Continuations Library

如果你了解其它语言的coroutine, 比如Lua,你久比较容易理解quasar的fiber了。 Fiber实质上是 continuation, continuation可以捕获一个计算的状态,可以暂停当前的计算,等隔一段时间可以继续执行。Quasar通过instrument修改suspendable方法。Quasar的调度器使用ForkJoinPool调度这些fiber。

Fiber调度器FiberScheduler是一个高效的、work-stealing、多线程的调度器。

默认的调度器是FiberForkJoinScheduler,但是你可以使用自己的线程池去调度,请参考FiberExecutorScheduler

当一个类被加载时,Quasar的instrumentation模块 (使用 Java agent时) 搜索suspendable 方法。每一个suspendable 方法 f通过下面的方式 instrument:
它搜索对其它suspendable方法的调用。对suspendable方法g的调用,一些代码会在这个调用g的前后被插入,它们会保存和恢复fiber栈本地变量的状态,记录这个暂停点。在这个“suspendable function chain”的最后,我们会发现对Fiber.park的调用。park暂停这个fiber,扔出 SuspendExecution异常。

g block的时候,SuspendExecution异常会被Fiber捕获。 当Fiber被唤醒(使用unpark), 方法f会被调用, 执行记录显示它被block在g的调用上,所以程序会立即跳到f调用g的那一行,然后调用它。最终我们会到达暂停点,然后继续执行。当g返回时, f中插入的代码会恢复f的本地变量。

过程听起来很复杂,但是它只会带来3% ~ 5%的性能的损失。

下面看一个简单的例子, 方法m2声明抛出SuspendExecution异常,方法m1调用m2和m3,所以也声明抛出这个异常,最后这个异常会被Fiber所捕获:

public class Helloworld {
    static void m1() throws SuspendExecution, InterruptedException {
        String m = "m1";
        System.out.println("m1 begin");
        m = m2();
        m = m3();
        System.out.println("m1 end");
        System.out.println(m);
    }
    static String m2() throws SuspendExecution, InterruptedException {
        return "m2";
    }
    static String m3() throws SuspendExecution, InterruptedException {
        return "m3";
    }
    static public void main(String[] args) throws ExecutionException, InterruptedException {
        new Fiber<Void>("Caller", new SuspendableRunnable() {
            @Override
            public void run() throws SuspendExecution, InterruptedException {
                m1();
            }
        }).start();
    }
}

反编译这段代码 (一般的反编译软件如jd-gui不能把这段代码反编译java文件,Procyon虽然能反编译,但是感觉反编译有错。所以我们还是看字节码吧):

@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
 static void m1()
   throws SuspendExecution, InterruptedException
 {
   // Byte code:
   //   0: aconst_null
   //   1: astore_3
   //   2: invokestatic 88	co/paralleluniverse/fibers/Stack:getStack	()Lco/paralleluniverse/fibers/Stack;
   //   5: dup
   //   6: astore_1
   //   7: ifnull +42 -> 49
   //   10: aload_1
   //   11: iconst_1
   //   12: istore_2
   //   13: invokevirtual 92	co/paralleluniverse/fibers/Stack:nextMethodEntry	()I
   //   16: tableswitch	default:+24->40, 1:+64->80, 2:+95->111
   //   40: aload_1
   //   41: invokevirtual 96	co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed	()Z
   //   44: ifne +5 -> 49
   //   47: aconst_null
   //   48: astore_1
   //   49: iconst_0
   //   50: istore_2
   //   51: ldc 2
   //   53: astore_0
   //   54: getstatic 3	java/lang/System:out	Ljava/io/PrintStream;
   //   57: ldc 4
   //   59: invokevirtual 5	java/io/PrintStream:println	(Ljava/lang/String;)V
   //   62: aload_1
   //   63: ifnull +26 -> 89
   //   66: aload_1
   //   67: iconst_1
   //   68: iconst_1
   //   69: invokevirtual 100	co/paralleluniverse/fibers/Stack:pushMethod	(II)V
   //   72: aload_0
   //   73: aload_1
   //   74: iconst_0
   //   75: invokestatic 104	co/paralleluniverse/fibers/Stack:push	(Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
   //   78: iconst_0
   //   79: istore_2
   //   80: aload_1
   //   81: iconst_0
   //   82: invokevirtual 108	co/paralleluniverse/fibers/Stack:getObject	(I)Ljava/lang/Object;
   //   85: checkcast 110	java/lang/String
   //   88: astore_0
   //   89: invokestatic 6	com/colobu/fiber/Helloworld:m2	()Ljava/lang/String;
   //   92: astore_0
   //   93: aload_1
   //   94: ifnull +26 -> 120
   //   97: aload_1
   //   98: iconst_2
   //   99: iconst_1
   //   100: invokevirtual 100	co/paralleluniverse/fibers/Stack:pushMethod	(II)V
   //   103: aload_0
   //   104: aload_1
   //   105: iconst_0
   //   106: invokestatic 104	co/paralleluniverse/fibers/Stack:push	(Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
   //   109: iconst_0
   //   110: istore_2
   //   111: aload_1
   //   112: iconst_0
   //   113: invokevirtual 108	co/paralleluniverse/fibers/Stack:getObject	(I)Ljava/lang/Object;
   //   116: checkcast 110	java/lang/String
   //   119: astore_0
   //   120: invokestatic 7	com/colobu/fiber/Helloworld:m3	()Ljava/lang/String;
   //   123: astore_0
   //   124: getstatic 3	java/lang/System:out	Ljava/io/PrintStream;
   //   127: ldc 8
   //   129: invokevirtual 5	java/io/PrintStream:println	(Ljava/lang/String;)V
   //   132: getstatic 3	java/lang/System:out	Ljava/io/PrintStream;
   //   135: aload_0
   //   136: invokevirtual 5	java/io/PrintStream:println	(Ljava/lang/String;)V
   //   139: aload_1
   //   140: ifnull +7 -> 147
   //   143: aload_1
   //   144: invokevirtual 113	co/paralleluniverse/fibers/Stack:popMethod	()V
   //   147: return
   //   148: aload_1
   //   149: ifnull +7 -> 156
   //   152: aload_1
   //   153: invokevirtual 113	co/paralleluniverse/fibers/Stack:popMethod	()V
   //   156: athrow
   // Line number table:
   //   Java source line #13	-> byte code offset #51
   //   Java source line #15	-> byte code offset #54
   //   Java source line #16	-> byte code offset #62
   //   Java source line #17	-> byte code offset #93
   //   Java source line #18	-> byte code offset #124
   //   Java source line #19	-> byte code offset #132
   //   Java source line #21	-> byte code offset #139
   // Local variable table:
   //   start	length	slot	name	signature
   //   53	83	0	m	String
   //   6	147	1	localStack	co.paralleluniverse.fibers.Stack
   //   12	99	2	i	int
   //   1	1	3	localObject	Object
   //   156	1	4	localSuspendExecution	SuspendExecution
   // Exception table:
   //   from	to	target	type
   //   49	148	148	finally
   //   49	148	156	co/paralleluniverse/fibers/SuspendExecution
   //   49	148	156	co/paralleluniverse/fibers/RuntimeSuspendExecution
 }

这段反编译的代码显示了方法m被instrument后的样子,虽然我们不能很清楚的看到代码执行的样子,但是也可以大概地看到它实际在方法的最开始加入了此方法的栈信息的检查(#0 ~ #49,如果是第一次运行这个方法,则直接运行,
然后在一些暂停点上加上一些栈压入的处理,并且可以在下次执行的时候直接跳到上次的暂停点上。

官方的工程师关于Quasar的instrument操作如下:

  • Fully analyze the bytecode to find all the calls into suspendable methods. A method that (potentially) calls into other suspendable methods is itself considered suspendable, transitively.
  • Inject minimal bytecode in suspendable methods (and only them) that will manage an user-mode stack, in the following places:
    • At the beginning we’ll check if we’re resuming the fiber and only in this case we’ll jump into the relevant bytecode index.
    • Before a call into another suspendable method we’ll push a snapshot of the current activation frame, including the resume bytecode index; we can do it because we know the structure statically from the analysis phase.
    • After a call into another suspendable method we’ll pop the top activation frame and, if resumed, we’ll restore it in the current fiber.

我并没有更深入的去了解Quasar的实现细节以及调度算法,有兴趣的读者可以翻翻它的代码。如果你有更深入的剖析,请留下相关的地址,以便我加到参考文档中。

曾经, 陆陆续续也有一些Java coroutine的实现(coroutine-libraries), 但是目前来说最好的应该还是Quasar。

Oracle会实现一个官方的纤程库吗?目前来说没有看到这方面的计划,而且从Java的开发进度上来看,这个特性可能是遥遥无期的,所以目前还只能借助社区的力量,从第三方库如Quasar中寻找解决方案。

更多的Quasar知识,比如Channel、Actor、Reactive Stream 的使用可以参考官方的文档,官方也提供了多个例子

4 Comsat介绍

Comsat又是什么?

Comsat还是Parallel Universe提供的集成Quasar的一套开源库,可以提供web或者企业级的技术,如HTTP服务和数据库访问。

Comsat并不是一套web框架。它并不提供新的API,只是为现有的技术如Servlet、JAX-RS、JDBC等提供Quasar fiber的集成。

它包含非常多的库,比如Spring、ApacheHttpClient、OkHttp、Undertow、Netty、Kafka等。

5 性能对比

刘小溪在CSDN上写了一篇关于Quasar的文章:次时代Java编程(一):Java里的协程,写的挺好,建议读者读一读。

它参考Skynet的测试写了代码进行对比,这个测试是并发执行整数的累加:
测试结果是Golang花了261毫秒,Quasar花了612毫秒。其实结果还不错,但是文中指出这个测试没有发挥Quasar的性能。因为quasar的性能主要在于阻塞代码的调度上。
虽然文中加入了排序的功能,显示Java要比Golang要好,但是我觉得这又陷入了另外一种错误的比较, Java的排序算法使用TimSort,排序效果相当好,Go的排序效果显然比不上Java的实现,所以最后的测试主要测试排序算法上。 真正要体现Quasar的性能还是测试在有阻塞的情况下fiber的调度性能。

5.1 HttpClient

话题扯的越来越远了,拉回来。我最初的目的是要解决的是在第三方服务响应慢的情况下提高系统 A 的吞吐率。最初A是使用200个线程处理业务逻辑,调用第三方服务。因为线程总是被第三方服务阻塞,所以系统A的吞吐率总是很低。

虽然使用Go可以解决这个问题,但是对于系统A的改造比较大,还增加了系统的复杂性。Netty性能好,改动量还可以接受,但是不妨看一下这个场景,系统的问题是由http阻塞引起。

这正是Quasar fiber适合的场景,如果一个Fiber被阻塞,它可以暂时放弃线程,以便线程可以用来执行其它的Fiber。虽然整个集成系统的吞吐率依然很低,这是无法避免的,但是系统的吞吐率确很高。

Comsat提供了Apache Http Client的实现: FiberHttpClientBuilder

final CloseableHttpClient client = FiberHttpClientBuilder.
        create(2). // use 2 io threads
        setMaxConnPerRoute(concurrencyLevel).
        setMaxConnTotal(concurrencyLevel).build();

然后在Fiber中久可以调用:

String response = client.execute(new HttpGet("http://localhost:8080"), BASIC_RESPONSE_HANDLER);

你也可以使用异步的HttpClient:

final CloseableHttpAsyncClient client = FiberCloseableHttpAsyncClient.wrap(HttpAsyncClients.
        custom().
        setMaxConnPerRoute(concurrencyLevel).
        setMaxConnTotal(concurrencyLevel).
        build());
client.start();

Comsat还提供了Jersey Http Client: AsyncClientBuilder.newClient()

甚至提供了RetrofitOkHttp的实现。

经过测试,虽然随着系统B、C、D的响应时间的拉长,吞吐率有所降低,但是在latency为100毫秒的时候吞吐率依然能达到9900 requests/秒,可以满足我们的需求,而我们的代码改动的比较小。

综上所述,如果想彻底改造系统A,则可以使用Go库重写,或者使用Netty + Rx的方式去处理,都能达到比较好的效果。如果想改动比较小,可以考虑使用quasar替换线程对代码进行维护。

我希望本文不要给读者造成误解,以为Java NIO/Selector这种方式不能解决本文的问题,也就是第三方阻塞的问题。 事实上Java NIO也正是适合解决这样的问题, 比如Netty性能就不错,但是你需要小心的是, 不要让你的这个client对外又变成阻塞的方式,而是程序应该异步的去发送request和处理response。当然本文重点不是介绍这种实现,而是介绍Java的线程库,它可以改造传统的代码,即使有阻塞,也只是阻塞Fiber,而不是阻塞线程,这是另一个解决问题的思路。

另一篇关于Quasar的文档: 继续了解Java的纤程库 – Quasar

参考文档