大数据培训新三板挂牌机构 股票代码:837906 | EN CN
Java是什么?
Java历史
Java语言特点
C++ VS Java比较
Java工厂设计模式
Java抽象工厂模式
Java单例模式
Java建造者(Builder)模式
Java原型模式
Java适配器模式
Java桥接模式
Java获取网络文件大小
Java套接字到单一的客户端
Java连接套接字
Java URL部分
Java URL连接日期
Java下载网页
Java主机指定IP地址
Java确定本地IP地址
Java检查端口占用
Java查找代理服务器设置
Java创建Socket
Java线程实例
Java检查线程活着
Java如何检查一个线程停止或没有?
Java解决死锁实例
Java如何获取正在运行的线程的优先级?
Java如何监视线程的状态?
Java获取线程名称
Java线程生产者消费者问题
Java如何设置线程的优先级?
Java如何停止线程一会儿?
Java如何暂停线程?
Java获取线程ID
Java如何检查线程的优先级?
Java显示所有正在运行的线程?
Java显示线程状态
Java中断一个线程
Java Applet实例
Java创建Applet
Java使用Applet创建横幅
Java使用Applet显示时钟?
Java在一个Applet创建不同形状
Java如何使用Applet填充形状的颜色?
Java使用Applet跳转到一个链接
Java在Applet创建事件监听器
Java使用Applet显示图像
Java使用Applet在新窗口中打开链接
Java使用Applet播放声音?
Java使用Applet读取文件
Java使用Applet写入文件
Java中Swing应用程序applet
Java简单的图形用户界面-GUI
Java以不同的字体显示文本
Java使用GUI画一条线
Java创建框架-frame
Java使用GUI显示多边形
Java在矩形中显示文本
Java GUI显示不同形状
Java如何绘制GUI实心矩形?
Java创建GUI透明光标
Java检查GUI平滑处理状态
Java在框架中显示颜色
Java GUI显示饼图
Java使用图形用户界面绘制文本
Java编辑表-table
Java 使用prepared语句
Java使用保存点和回滚
Java同时执行数据库多个SQL命令
Java使用行方法
Java使用列方法
Java正则表达式实例
Java将字符串分割
Java搜索重复单词
Java查找出现的单词
Java最后一个词的索引
Java模式匹配
Java删除空格
Java匹配电话号码
Java计数组词
Java搜索词组
Java拆分正则表达式
Java替换第一个出现字符串
Java检查日期格式
Java验证电子邮件地址格式
Java替换所有匹配字符串
Java使每个单词的第一个字母大写
从XML创建SqlSessionFactory实例
不使用XML来创建SqlSessionFactory
从SqlSessionFactory获取SqlSession
映射SQL语句
作用域和生命周期
Mapper XML配置
properties元素
Settings元素
typeAliases 元素
typeHandlers元素
理解CacheLine与写出更好的JAVA
Java核心技术点之动态代理
更好的使用JAVA线程池
理解Java中字符流与字节流的区别
深入分析Java方法反射的实现原理
关于Java面试,你应该准备这些知识点
Java内存模型
2017年你不能错过的Java类库
Leakcanary Square的一款Android/Java内存泄漏检测工具
Java Synchronised机制
Java核心技术点之注解
JVM(8):JVM知识点总览-高级Java工程师面试必备
JVM(3):Java GC算法 垃圾收集器
JVM(1):Java 类的加载机制
解决ActiveMQ中,Java与C++交互中文乱码问题
关于Java Collections的几个常见问题
Java I/O 总结
JVM源码分析之Java对象的创建过程
JVM源码分析之Java类的加载过程
Java GC的那些事(下)
Java GC的那些事(上)
java对象头的HotSpot实现分析
面试的角度诠释Java工程师(一)
面试的角度诠释Java工程师(二)
框架开发之Java注解的妙用
谈谈Java反射机制
Java并发:volatile内存可见性和指令重排
死磕Java并发:Java内存模型之happens-before
死磕Java并发:深入分析volatile的实现原理
死磕Java并发:深入分析synchronized的实现原理
Java 10 可能对 Lambda 表达式进行升级
G1垃圾回收器中的字符串去重(Java 8 Update 20)
Java RESTful框架的性能比较
理解RxJava的线程模型
继续了解Java的纤程库 – Quasar
Java中的纤程库 – Quasar
Java豆瓣电影爬虫——抓取电影详情和电影短评数据
Java集合框架源码剖析:LinkedHashSet 和 LinkedHashMap
Java Lambda表达式初探
Java中的陷阱题
Java 9的这一基本功能,你可能从未听过
关于Java并发编程的总结和思考
几种简单的负载均衡算法及其Java代码实现
JAVA虚拟机关闭钩子(Shutdown Hook)
Java 脚本化编程指南
Java Scripting API 使用示例
Java 8 的 Nashorn 脚本引擎教程
如何开始使用 Java 机器学习
CognitiveJ —— Java 的图像分析库
Java 性能优化的五大技巧
Java 解惑:Comparable 和 Comparator 的区别
Google Java编程风格指南
java NIO详解
Java 异常处理的误区和经验总结
Java语法糖(4):内部类
Java语法糖(3):泛型
Java语法糖(2):自动装箱和自动拆箱
Java消息队列任务的平滑关闭
Java语法糖(1):可变长度参数以及foreach循环原理
2016最流行的Java EE服务器
自己写一个java.lang.reflect.Proxy代理的实现
java 如何在pdf中生成表格
如何防止单例模式被JAVA反射攻击
java虚拟机 jvm 局部变量表实战
聊聊并发-Java中的Copy-On-Write容器
java.lang.Instrument 代理Agent使用
Java开发者需要了解的移动开发编程语言
13个不容错过的Java项目
2016年7款最佳 Java 框架推荐
Java 开发者值得关注的 11 个技术博客
Redmonk发布Java框架流行度调研结果
Java 8开发的4大顶级技巧
GitHub漫游指南:10个值得你关注的Java项目
除了Guava,Java开发者还值得了解的5个谷歌类库
Java中创建对象的5种不同方法
Java性能优化全攻略
奇怪的Java题:为什么1000 == 1000返回为False,而100 == 100会返回为True?
11个最值得Java开发者收藏的网站
Java的常见误区与细节
对Java意义重大的7个性能指标
Java调优经验谈
关于Java并发编程的总结和思考
HDFS Federation设计动机与基本原理
《Effective STL》学习笔记(第三部分)
《Effective STL》学习笔记(第二部分)
《Effective STL》学习笔记(第一部分)
数据结构之位图
Thrift使用指南
Cassandra概要介绍
Cassandra部署与安装
Cassandra客户端
Cassandra数据模型
Cassandra中的各种策略
数据结构之树状数组
数据结构之伸展树
数据结构之后缀数组
数据结构之堆
浅析MRv1与MRv2的API兼容性
Apache Tez最新进展
运行在YARN上的计算框架
从传统操作系统角度理解Hadoop YARN

Java消息队列任务的平滑关闭

于2017-05-10由小牛君创建

分享到:


1.问题背景

对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。

那么问题来了,当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?

正常来说,订阅者程序关闭后,消息会在发送者队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。唯一可能丢失的消息,就是在关闭的一瞬间,已经从队列中取出但还没有处理完毕的消息。

因此我们需要一套平滑关闭的机制,保证在重启的时候,消息可以正常处理完成。

2.问题分析

平滑关闭的思路如下:

  1. 在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中
  2. 关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)
  3. 程序退出

关闭消息订阅:一般消息队列的客户端都提供关闭连接的方法,具体可以自行查看api

关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated(),方法判断线程池是否已经关闭.

那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?

在Linux中,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l查看kill 命令的其它信号量,比如使用 12) SIGUSR2 信号量

我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。

伪代码如下

 //注册linux kill信号量  kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
    @Override
    public void handle(Signal signal) {
        //关闭订阅者
        //关闭线程池
        //退出
    }
});

下面通过一个demo模拟相关逻辑操作

首先模拟一个生产者,每秒生产5个消息

然后模拟一个订阅者,收到消息后交给线程池进行处理,线程池固定4个线程,每个消息处理时间1秒,这样线程池每秒会积压1个消息。

package com.lujianing.demo;

import sun.misc.Signal;
import sun.misc.SignalHandler;
import java.util.concurrent.*;

/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */
public class MsgClient {

    //模拟消息队列订阅者 同时4个线程处理
    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
    //模拟消息队列生产者
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
    //用于判断是否关闭订阅
    private static volatile boolean isClose = false;

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
        producer(queue);
        consumer(queue);
    }

    //模拟消息队列生产者
    private static void producer(final BlockingQueue  queue){
        //每200毫秒向队列中放入一个消息
        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
            public void run() {
                queue.offer("");
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }

    //模拟消息队列消费者 生产者每秒生产5个   消费者4个线程消费1个1秒  每秒积压1个
    private static void consumer(final BlockingQueue queue) throws InterruptedException {
        while (!isClose){
            getPoolBacklogSize();
            //从队列中拿到消息
            final String msg = (String)queue.take();
            //放入线程池处理
            if(!THREAD_POOL.isShutdown()) {
                THREAD_POOL.execute(new Runnable() {
                    public void run() {
                        try {
                            //System.out.println(msg);
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }

    //查看线程池堆积消息个数
    private static long getPoolBacklogSize(){
        long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
        System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
        return backlog;
    }

    static {
        String osName = System.getProperty("os.name").toLowerCase();
        if(osName != null && osName.indexOf("window") == -1) {
            //注册linux kill信号量  kill -12
            Signal sig = new Signal("USR2");
            Signal.handle(sig, new SignalHandler() {
                @Override
                public void handle(Signal signal) {
                    System.out.println("收到kill消息,执行关闭操作");
                    //关闭订阅消费
                    isClose = true;
                    //关闭线程池,等待线程池积压消息处理
                    THREAD_POOL.shutdown();
                    //判断线程池是否关闭
                    while (!THREAD_POOL.isTerminated()) {
                        try {
                            //每200毫秒 判断线程池积压数量
                            getPoolBacklogSize();
                            TimeUnit.MILLISECONDS.sleep(200L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("订阅者关闭,线程池处理完毕");
                    System.exit(0);
                }
            });
        }
    }
}

当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

输入图片说明

另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id

ps -fe|grep MsgClient

输入图片说明

当我们执行kill -12 pid的时候 可以看到关闭业务逻辑

平滑关闭

3.问题总结

在部门的实际业务中,消息队列的消息量还是挺大的,某些业务高峰时每秒有几百的消息量,因此对消息的处理要保证速度,避免消息积压,也可以通过负载解决单个订阅节点的压力。

在某些业务场景中,对消息的完整性要求不那么高,那么就不用考虑重启时的一点损耗。反之,就需要好好思考和设计了。

ps:以后会恢复写博客的习惯,争取每周都更新,如有兴趣,欢迎关注


2016-11-17补充

ThreadPoolExecutorgetQueue().size()方法,返回的为线程池队列中积压的消息数

getTaskCount() - getCompletedTaskCount(),返回的为线程池队列积压的和正在处理中的消息数


2016-11-19补充

感谢 @纳兰清风 的提醒

Java中可以通过调用Runtime.getRuntime().addShutdownHook()方法,在Jvm退出时触发回调

kill -15 pid 可以触发调用对应的钩子方法

今天看重启脚本中确实也有 kill -15的命令 但是不知道两者是有关系的


2016-11-20补充

kill -1 pid 和 kill -2 pid 都能触发钩子方法

SIGTERM,SIGINT,SIGHUP三种信号都会触发shutdownhook