Java是什么?
Java历史
Java语言特点
C++ VS Java比较
Java工厂设计模式
Java抽象工厂模式
Java单例模式
Java建造者(Builder)模式
Java原型模式
Java适配器模式
Java桥接模式
Java获取网络文件大小
Java套接字到单一的客户端
Java连接套接字
Java URL部分
Java URL连接日期
Java下载网页
Java主机指定IP地址
Java确定本地IP地址
Java检查端口占用
Java查找代理服务器设置
Java创建Socket
Java线程实例
Java检查线程活着
Java如何检查一个线程停止或没有?
Java解决死锁实例
Java如何获取正在运行的线程的优先级?
Java如何监视线程的状态?
Java获取线程名称
Java线程生产者消费者问题
Java如何设置线程的优先级?
Java如何停止线程一会儿?
Java如何暂停线程?
Java获取线程ID
Java如何检查线程的优先级?
Java显示所有正在运行的线程?
Java显示线程状态
Java中断一个线程
Java Applet实例
Java创建Applet
Java使用Applet创建横幅
Java使用Applet显示时钟?
Java在一个Applet创建不同形状
Java如何使用Applet填充形状的颜色?
Java使用Applet跳转到一个链接
Java在Applet创建事件监听器
Java使用Applet显示图像
Java使用Applet在新窗口中打开链接
Java使用Applet播放声音?
Java使用Applet读取文件
Java使用Applet写入文件
Java中Swing应用程序applet
Java简单的图形用户界面-GUI
Java以不同的字体显示文本
Java使用GUI画一条线
Java创建框架-frame
Java使用GUI显示多边形
Java在矩形中显示文本
Java GUI显示不同形状
Java如何绘制GUI实心矩形?
Java创建GUI透明光标
Java检查GUI平滑处理状态
Java在框架中显示颜色
Java GUI显示饼图
Java使用图形用户界面绘制文本
Java编辑表-table
Java 使用prepared语句
Java使用保存点和回滚
Java同时执行数据库多个SQL命令
Java使用行方法
Java使用列方法
Java正则表达式实例
Java将字符串分割
Java搜索重复单词
Java查找出现的单词
Java最后一个词的索引
Java模式匹配
Java删除空格
Java匹配电话号码
Java计数组词
Java搜索词组
Java拆分正则表达式
Java替换第一个出现字符串
Java检查日期格式
Java验证电子邮件地址格式
Java替换所有匹配字符串
Java使每个单词的第一个字母大写
从XML创建SqlSessionFactory实例
不使用XML来创建SqlSessionFactory
从SqlSessionFactory获取SqlSession
映射SQL语句
作用域和生命周期
Mapper XML配置
properties元素
Settings元素
typeAliases 元素
typeHandlers元素
理解CacheLine与写出更好的JAVA
Java核心技术点之动态代理
更好的使用JAVA线程池
理解Java中字符流与字节流的区别
深入分析Java方法反射的实现原理
关于Java面试,你应该准备这些知识点
Java内存模型
2017年你不能错过的Java类库
Leakcanary Square的一款Android/Java内存泄漏检测工具
Java Synchronised机制
Java核心技术点之注解
JVM(8):JVM知识点总览-高级Java工程师面试必备
JVM(3):Java GC算法 垃圾收集器
JVM(1):Java 类的加载机制
解决ActiveMQ中,Java与C++交互中文乱码问题
关于Java Collections的几个常见问题
Java I/O 总结
JVM源码分析之Java对象的创建过程
JVM源码分析之Java类的加载过程
Java GC的那些事(下)
Java GC的那些事(上)
java对象头的HotSpot实现分析
面试的角度诠释Java工程师(一)
面试的角度诠释Java工程师(二)
框架开发之Java注解的妙用
谈谈Java反射机制
Java并发:volatile内存可见性和指令重排
死磕Java并发:Java内存模型之happens-before
死磕Java并发:深入分析volatile的实现原理
死磕Java并发:深入分析synchronized的实现原理
Java 10 可能对 Lambda 表达式进行升级
G1垃圾回收器中的字符串去重(Java 8 Update 20)
Java RESTful框架的性能比较
理解RxJava的线程模型
继续了解Java的纤程库 – Quasar
Java中的纤程库 – Quasar
Java豆瓣电影爬虫——抓取电影详情和电影短评数据
Java集合框架源码剖析:LinkedHashSet 和 LinkedHashMap
Java Lambda表达式初探
Java中的陷阱题
Java 9的这一基本功能,你可能从未听过
关于Java并发编程的总结和思考
几种简单的负载均衡算法及其Java代码实现
JAVA虚拟机关闭钩子(Shutdown Hook)
Java 脚本化编程指南
Java Scripting API 使用示例
Java 8 的 Nashorn 脚本引擎教程
如何开始使用 Java 机器学习
CognitiveJ —— Java 的图像分析库
Java 性能优化的五大技巧
Java 解惑:Comparable 和 Comparator 的区别
Google Java编程风格指南
java NIO详解
Java 异常处理的误区和经验总结
Java语法糖(4):内部类
Java语法糖(3):泛型
Java语法糖(2):自动装箱和自动拆箱
Java消息队列任务的平滑关闭
Java语法糖(1):可变长度参数以及foreach循环原理
2016最流行的Java EE服务器
自己写一个java.lang.reflect.Proxy代理的实现
java 如何在pdf中生成表格
如何防止单例模式被JAVA反射攻击
java虚拟机 jvm 局部变量表实战
聊聊并发-Java中的Copy-On-Write容器
java.lang.Instrument 代理Agent使用
Java开发者需要了解的移动开发编程语言
13个不容错过的Java项目
2016年7款最佳 Java 框架推荐
Java 开发者值得关注的 11 个技术博客
Redmonk发布Java框架流行度调研结果
Java 8开发的4大顶级技巧
GitHub漫游指南:10个值得你关注的Java项目
除了Guava,Java开发者还值得了解的5个谷歌类库
Java中创建对象的5种不同方法
Java性能优化全攻略
奇怪的Java题:为什么1000 == 1000返回为False,而100 == 100会返回为True?
11个最值得Java开发者收藏的网站
Java的常见误区与细节
对Java意义重大的7个性能指标
Java调优经验谈
关于Java并发编程的总结和思考
HDFS Federation设计动机与基本原理
《Effective STL》学习笔记(第三部分)
《Effective STL》学习笔记(第二部分)
《Effective STL》学习笔记(第一部分)
数据结构之位图
Thrift使用指南
Cassandra概要介绍
Cassandra部署与安装
Cassandra客户端
Cassandra数据模型
Cassandra中的各种策略
数据结构之树状数组
数据结构之伸展树
数据结构之后缀数组
数据结构之堆
浅析MRv1与MRv2的API兼容性
Apache Tez最新进展
运行在YARN上的计算框架
从传统操作系统角度理解Hadoop YARN

继续了解Java的纤程库 – Quasar

于2017-05-10由小牛君创建

分享到:


前一篇文章Java中的纤程库 – Quasar中我做了简单的介绍,现在进一步介绍这个纤程库。

Quasar还没有得到广泛的应用,搜寻整个github也就pinterest/quasar-thrift这么一个像样的使用Quasar的库,并且官方的文档也很简陋,很多地方并没有详细的介绍,和Maven的集成也不是很好。这些都限制了Quasar的进一步发展。

但是,作为目前最好用的Java coroutine的实现,它在某些情况下的性能还是表现相当出色的,希望这个项目能够得到更大的支持和快速发展。

因为Quasar文档的缺乏,所以使用起来需要不断的摸索和在论坛上搜索答案,本文将一些记录了我在Quasar使用过程中的一些探索。

1 Thread vs Quasar

虽然Java的线程的API封装的很好,使用起来非常的方便,但是使用起来也得小心。首先线程需要耗费资源,所以单个的机器上创建上万个线程很困难,其次线程之间的切换也需要耗费CPU,在线程非常多的情况下导致很多CPU资源耗费在线程切换上,通过提高线程数来提高系统的性能有时候适得其反。你可以看到现在一些优秀的框架如Netty都不会创建很多的线程,默认2倍的CPU core的线程数就已经应付的很好了,比如node.js可以使用单一的进程/线程应付高并发。

纤程使用的资源更少,它主要保存栈信息,所以一个系统中可以创建上万的纤程Fiber,而实际的纤程调度器只需要几个Java线程即可。

我们看一个性能的比较,直观的感受一下Quasar带来的吞吐率的提高。

下面这个例子中方法m1调用m2,m2调用m3,但是m2会暂停1秒钟,用来模拟实际产品中的阻塞,m3执行了一个简单的计算。
通过线程和纤程两种方式我们看看系统的吞吐率(throughput)和延迟(latency)。

public class Helloworld {
    @Suspendable
    static void m1() throws InterruptedException, SuspendExecution {
        String m = "m1";
        //System.out.println("m1 begin");
        m = m2();
        //System.out.println("m1 end");
        //System.out.println(m);
    }
    static String m2() throws SuspendExecution, InterruptedException {
        String m = m3();
        Strand.sleep(1000);
        return m;
    }
    //or define in META-INF/suspendables
    @Suspendable
    static String m3() {
        List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList());
        return l.toString();
    }
    static public void main(String[] args) throws ExecutionException, InterruptedException {
        int count = 10000;
        testThreadpool(count);
        testFiber(count);
    }
    static void testThreadpool(int count) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(count);
        ExecutorService es = Executors.newFixedThreadPool(200);
        LongAdder latency = new LongAdder();
        long t = System.currentTimeMillis();
        for (int i =0; i< count; i++) {
            es.submit(() -> {
                long start = System.currentTimeMillis();
                try {
                    m1();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (SuspendExecution suspendExecution) {
                    suspendExecution.printStackTrace();
                }
                start = System.currentTimeMillis() - start;
                latency.add(start);
                latch.countDown();
            });
        }
        latch.await();
        t = System.currentTimeMillis() - t;
        long l = latency.longValue() / count;
        System.out.println("thread pool took: " + t + ", latency: " + l + " ms");
        es.shutdownNow();
    }
    static void testFiber(int count) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(count);
        LongAdder latency = new LongAdder();
        long t = System.currentTimeMillis();
        for (int i =0; i< count; i++) {
            new Fiber<Void>("Caller", new SuspendableRunnable() {
                @Override
                public void run() throws SuspendExecution, InterruptedException {
                    long start = System.currentTimeMillis();
                    m1();
                    start = System.currentTimeMillis() - start;
                    latency.add(start);
                    latch.countDown();
                }
            }).start();
        }
        latch.await();
        t = System.currentTimeMillis() - t;
        long l = latency.longValue() / count;
        System.out.println("fiber took: " + t  + ", latency: " + l + " ms");
    }
}

运行这个程序(需要某种instrument, agent或者AOT或者其它,在下面会介绍),输出结果为:

thread pool took: 50341, latency: 1005 ms
fiber took: 1158, latency: 1000 ms

如果使用线程,执行完1万个操作需要50秒,平均延迟为1秒左右(我们故意让延迟至少1秒),线程池数量为200。(其实总时间50秒可以计算出来)
但是如果使用纤程,执行完1万个操作仅需要1.158秒,平均延迟时间为1秒,线程数量为CPU core数(缺省使用ForkJoinPool)。

可以看到,通过使用纤程,尽受限于系统的业务逻辑,我们没有办法提升业务的处理时间, 但是我们确可以极大的提高系统的吞吐率,如上面的简单的例子将10000个操作的处理时间从50秒提高到1秒,非凡的成就。

如果我们将方法m2中的Strand.sleep(1000);注释掉,这样这个例子中就没有什么阻塞了,我们看看在这种纯计算的情况下两者的表现:

thread pool took: 114, latency: 0 ms
fiber took: 180, latency: 0 ms

可以看到,纤程非但没有提升性能,反而会带来性能的下降。对于这种纯计算没有阻塞的case,Quasar并不适合。

正如官方所说:

Fibers are not meant to replace threads in all circumstances. A fiber should be used when its body (the code it executes) blocks very often waiting on other fibers (e.g. waiting for messages sent by other fibers on a channel, or waiting for the value of a dataflow-variable). For long-running computations that rarely block, traditional threads are preferable. Fortunately, as we shall see, fibers and threads interoperate very well.

2 Suspendable方法

Fiber中的run方法,如SuspendableRunnable 和 SuspendableCallable声明了SuspendExecution异常。这并不是一个真的异常,而是fiber内部工作的机制。任何运行在fiber中的可能阻塞的方法,如果声明了这个异常,就被叫做 suspendable 方法。 如果你的方法调用了一个suspendable方法,那么你的方法也是suspendable方法,所以也需要声明抛出SuspendExecution异常。

有时候不能在某个方法上声明抛出SuspendExecution异常,比如你实现某个接口,你不能更改接口的方法声明,你不得不使用其它的方法来指定suspendable方法。方法之一就是使用@Suspendable注解,在你需要指定的suspendable方法上加上这个注解就可以告诉Quasar这个方法是suspendable方法。

另一个情况就是对于第三的库,你不可能更改它们的代码,如果想指定这些库的某些方法是suspendable方法,比如java.net.URL.openStream()Ljava/io/InputStream;, 就需要另外一种解决办法,也就是在META-INF/suspendablesMETA-INF/suspendable-supers定义。
文件中每个方法占一行,具体(concrete)的suspendable方法应该写在META-INF/suspendables中,non-suspendable方法,但是有suspendable override的类、接口写在META-INF/suspendable-supers中(可以是具体类单不能是final, 接口和抽象类也可以)。
每一行应该是方法的签名的全称“full.class.name.methodName” 以及*通配符。
使用`SuspendablesScanner`可以自动增加你的方法到这些文件中,待会介绍它。

java.lang包下的方法不能标记为suspendable,其它的JDK方法则可以显示地在文件META-INF/suspendablesMETA-INF/suspendable-supers中标记为suspendable,并且设置环境变量co.paralleluniverse.fibers.allowJdkInstrumentation为true,但是很少这样使用。

还有一些特殊的情况也会被认为是suspendable的。

反射调用总是被看作是suspendable的。

Java 8 lambda也总是被看作suspendable的。

构造函数/类初始化器不能被标记为suspendable

缺省情况下synchronized和blocking thread 调用不能运行在Fiber中。这是因为它们会阻塞Fiber使用的线程,导致系统处理变慢,但是如果你非要在Fiber中使用它们,可以可以将allowMonitorsallowBlocking传给instrumentation Ant task,或者将bm传给Quasar Java agent。

3 Maven配置

Quasar依赖字节码的instrumentation, instrumentation用来修改字节码。 Quasar可以在运行时或者编译时修改字节码,下面介绍这几种实现。

1、Quasar Java Agent
Quasar java agent可以在运行时动态修改字节码,将下面一行加搭配java命令行中即可,注意把path-to-quasar-jar.jar替换成你实际的quasar java的地址。

-javaagent:path-to-quasar-jar.jar

如果你使用maven的exec task,你可以使用maven-dependency-plugin为依赖设置properties,然后在插件exec-maven-plugin中引用quasar库即可。

详细配置可以参考Specifying the Java Agent with Maven:

Quasar对gradle的支持比较好,你可以方便的使用gradle配置

这是首选的一种方式,因为在某些情况下,比如你使用第三方的库,如comsat,它们只能使用这种方式配置。

2、AOT(Ahead-of-Time)
另外一种是在编译时的时候完成instrumentation。

它是通过一个Ant Task来完成的,所以对于Maven管理的项目来说,配置起来有些麻烦。

这个Ant Task是co.paralleluniverse.fibers.instrument.InstrumentationTask,包含在quasar-core.jar中。它接受一组(fileset)classes进行instrument,但并不是传给它的所有classes都需要classes进行instrument,只有suspendable方法才有可能被instrument。它还会进行优化,有些suspendable方法可能不需要instrument。

在Maven中配置起来有些复杂,如下面所示:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-antrun-plugin</artifactId>
    <executions>
        <execution>
            <id>instrument-classes</id>
            <phase>compile</phase>
            <configuration>
                <tasks>
                    <property name="ant_classpath" refid="maven.dependency.classpath"/>
                    <taskdef name="instrumentationTask"
                             classname="co.paralleluniverse.fibers.instrument.InstrumentationTask"
                             classpath="${co.paralleluniverse:quasar-core:jar:jdk8}"/>
                    <instrumentationTask allowMonitors="true" allowBlocking="true" check="true" verbose="true" debug="true">
                        <fileset dir="${project.build.directory}/classes/" includes="**/*"/>
                    </instrumentationTask>
                </tasks>
            </configuration>
            <goals>
                <goal>run</goal>
            </goals>
        </execution>
    </executions>
</plugin>
<plugin>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>2.5.1</version>
    <executions>
        <execution>
            <id>getClasspathFilenames</id>
            <goals>
                <goal>properties</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Quasar官方并没有提供一个maven插件,好心的社区倒是提供了一个quasar-maven-plugin。所以你可以不用上面的写法,而是用下面简单的写法:

<plugin>
    <groupId>com.vlkan</groupId>
    <artifactId>quasar-maven-plugin</artifactId>
    <version>0.7.3</version>
    <configuration>
        <check>true</check>
        <debug>true</debug>
        <verbose>true</verbose>
    </configuration>
    <executions>
        <execution>
            <phase>compile</phase>
            <goals>
                <goal>instrument</goal>
            </goals>
        </execution>
    </executions>
</plugin>

3、在Web容器中
如果你使用web容器使用基于Quasar的库comsat等,比如Tomcat,则比较棘手。因为你不太像将Quasar java agent直接加到tomcat的启动脚本中,这样会instrument所有的应用,导致很多的警告。

Comsat提供了Tomcat和Jetty的解决方案。

Tomcat
对于tomcat,你可以把comsat-tomcat-loader-0.7.0-jdk8.jar或者comsat-tomcat-loader-0.7.0.jar加入到tomcat的common/lib或者lib中,然后在你的web应用META-INF/context.xml中加入:

<Loader loaderClass="co.paralleluniverse.comsat.tomcat.QuasarWebAppClassLoader" />

Jetty
如果使用Jetty,则把comsat-jetty-loader-0.7.0-jdk8.jar或者comsat-jetty-loader-0.7.0.jar加入到Jetty的lib中,然后在你的context.xml中加入<Set name="classLoader">:

<Configure id="ctx" class="org.eclipse.jetty.webapp.WebAppContext">
    <Set name="war">./build/wars/dep.war</Set>
    <!--use custom classloader in order to instrument classes by quasar-->
    <Set name="classLoader">
        <New class="co.paralleluniverse.comsat.jetty.QuasarWebAppClassLoader">
            <Arg>
                <Ref id="ctx"/>
            </Arg>
        </New>
    </Set>
</Configure>

总之,通过实现一个定制的ClassLoader实现instrumentation。

4 Auto Suspendables Detection

quasar提供了一个ant task,可以实现自动侦测suspendable方法,并可以把它们写入到`META-INF/suspendablesMETA-INF/suspendable-supers`。

但是官方并没有详细的介绍,而且也没有相应的maven插件可以使用。

我们可以看看在gradle如何使用的,我们可以把侦测结果复制到maven中使用:

apply plugin: 'java'
apply plugin: 'maven'
group = 'com.colobu.fiber'
version = '1.0'
description = """"""
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
     maven { url "http://repo.maven.apache.org/maven2" }
}
dependencies {
    compile group: 'co.paralleluniverse', name: 'quasar-core', version:'0.7.5', classifier:'jdk8'
    compile group: 'co.paralleluniverse', name: 'comsat-httpclient', version:'0.7.0'
    testCompile group: 'junit', name: 'junit', version:'4.12'
}
classes {
    doFirst {
        ant.taskdef(name: 'scanSuspendables',
                classname: 'co.paralleluniverse.fibers.instrument.SuspendablesScanner',
                classpath: "build/classes/main:build/resources/main:${configurations.runtime.asPath}")
        ant.scanSuspendables(auto: true,
                suspendablesFile: "$sourceSets.main.output.resourcesDir/META-INF/suspendables",
                supersFile: "$sourceSets.main.output.resourcesDir/META-INF/suspendable-supers",
                append: true) {
            fileset(dir: sourceSets.main.output.classesDir)
        }
    }
}

我们可以看一下官方的库comsat的一些`META-INF/suspendables`例子:

1、comsat-okhttp
/META-INF/suspendables:

com.squareup.okhttp.apache.OkApacheClient.execute

2、comsat-httpclient
/META-INF/suspendables

org.apache.http.impl.client.CloseableHttpClient.doExecute
org.apache.http.impl.client.CloseableHttpClient.execute

5 故障检测

当前quasar依赖字节码的instrumentation,所以suspendable方法必须在运行之前进行标记。

Quasar开发组和OpenJDK协作,将在JDK9中移除这个限制,将会有效地自动地实现instrumentation。

如果你忘记将一个方法标记为suspendable (throws SuspendExecution、@Suspendable或者META-INF/suspendables/META-INF/suspendable-supers),你可能会遇到一些奇怪的错误。

环境变量co.paralleluniverse.fibers.verifyInstrumentation设为true可以检查未标记的方法。但是在生产环境中不要设置它。

UnableToInstrumentException异常表明quasar不能instrument一些方法如synchronized或者阻塞的线程调用。verbose(v), debug(d) 和 check(c)可以打印出详细信息。

更多的调试可以参考:troubleshooting

6 其它

Fiber可以序列化。

Fiber也可以打印它的堆栈进行调试。

Fiber也有Actor和Channel的实现,并且可以运行在集群上。