- 浏览: 375753 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
Nabulio:
写的详细,特殊语法学习到了
jdk1.5-1.9新特性 -
wooddawn:
您好,最近在做个足球数据库系统,用到了betbrain的数据表 ...
javascript深入理解js闭包 -
lwpan:
很受启发 update也可以
mysql 的delete from 子查询限制 -
wuliaolll:
不错,总算找到原因了
mysql 的delete from 子查询限制
这个故事源自一个很简单的想法:创建一个对开发人员友好的、简单轻量的线程间通讯框架,完全不用锁、同步器、信号量、等待和通知,在Java里开发一个轻量、无锁的线程内通讯框架;并且也没有队列、消息、事件或任何其他并发专用的术语或工具。
只用普通的老式Java接口实现POJO的通讯。
它可能跟Akka的类型化actor类似,但作为一个必须超级轻量,并且要针对单台多核计算机进行优化的新框架,那个可能有点过了。
相关厂商内容
高德地图街景API发布,开放全部街景数据及功能(FREE)
《婚恋交友中的推荐系统应用》——世纪佳缘研发中心总监吴金龙确认QCon分享
《你应该更新的Java知识》——JavaOne Duke大奖得主郑晔北京QCon分享话题确认
2013~2014年度InfoQ读者深度调查火热进行中
《PM2.5的大数据分析》—— 英特尔中国研究院首席架构师姜小凡确认QCon分享
当actor跨越不同JVM实例(在同一台机器上,或分布在网络上的不同机器上)的进程边界时,Akka框架很善于处理进程间的通讯。
但对于那种只需要线程间通讯的小型项目而言,用Akka类型化actor可能有点儿像用牛刀杀鸡,不过类型化actor仍然是一种理想的实现方式。
我花了几天时间,用动态代理,阻塞队列和缓存线程池创建了一个解决方案。
图一是这个框架的高层次架构:
图一: 框架的高层次架构
SPSC队列是指单一生产者/单一消费者队列。MPSC队列是指多生产者/单一消费者队列。
派发线程负责接收Actor线程发送的消息,并把它们派发到对应的SPSC队列中去。
接收到消息的Actor线程用其中的数据调用相应的actor实例中的方法。借助其他actor的代理,actor实例可以将消息发送到MPSC队列中,然后消息会被发送给目标actor线程。
我创建了一个简单的例子来测试,就是下面这个打乒乓球的程序:
public interface PlayerA (
void pong(long ball); //发完就忘的方法调用
}
public interface PlayerB {
void ping(PlayerA playerA, long ball); //发完就忘的方法调用
}
public class PlayerAImpl implements PlayerA {
@Override
public void pong(long ball) {
}
}
public class PlayerBImpl implements PlayerB {
@Override
public void ping(PlayerA playerA, long ball) {
playerA.pong(ball);
}
}
public class PingPongExample {
public void testPingPong() {
// 管理器隐藏了线程间通讯的复杂性
// 控制actor代理,actor实现和线程
ActorManager manager = new ActorManager();
// 在管理器内注册actor实现
manager.registerImpl(PlayerAImpl.class);
manager.registerImpl(PlayerBImpl.class);
//创建actor代理。代理会将方法调用转换成内部消息。
//会在线程间发给特定的actor实例。
PlayerA playerA = manager.createActor(PlayerA.class);
PlayerB playerB = manager.createActor(PlayerB.class);
for(int i = 0; i < 1000000; i++) {
playerB.ping(playerA, i);
}
}
经过测试,速度大约在每秒500,000 次乒/乓左右;还不错吧。然而跟单线程的运行速度比起来,我突然就感觉没那么好了。在 单线程 中运行的代码每秒速度能达到20亿 (2,681,850,373)!
居然差了5,000 多倍。太让我失望了。在大多数情况下,单线程代码的效果都比多线程代码更高效。
我开始找原因,想看看我的乒乓球运动员们为什么这么慢。经过一番调研和测试,我发现是阻塞队列的问题,我用来在actor间传递消息的队列影响了性能。
图 2: 只有一个生产者和一个消费者的SPSC队列
所以我发起了一场竞赛,要将它换成Java里最快的队列。我发现了Nitsan Wakart的 博客 。他发了几篇文章介绍单一生产者/单一消费者(SPSC)无锁队列的实现。这些文章受到了Martin Thompson的演讲 终极性能的无锁算法的启发。
跟基于私有锁的队列相比,无锁队列的性能更优。在基于锁的队列中,当一个线程得到锁时,其它线程就要等着锁被释放。而在无锁的算法中,某个生产者线程生产消息时不会阻塞其它生产者线程,消费者也不会被其它读取队列的消费者阻塞。
在Martin Thompson的演讲以及在Nitsan的博客中介绍的SPSC队列的性能简直令人难以置信—— 超过了100M ops/sec。比JDK的并发队列实现还要快10倍 (在4核的 Intel Core i7 上的性能大约在 8M ops/sec 左右)。
我怀着极大的期望,将所有actor上连接的链式阻塞队列都换成了无锁的SPSC队列。可惜,在吞吐量上的性能测试并没有像我预期的那样出现大幅提升。不过很快我就意识到,瓶颈并不在SPSC队列上,而是在多个生产者/单一消费者(MPSC)那里。
用SPSC队列做MPSC队列的任务并不那么简单;在做put操作时,多个生产者可能会覆盖掉彼此的值。SPSC 队列就没有控制多个生产者put操作的代码。所以即便换成最快的SPSC队列,也解决不了我的问题。
为了处理多个生产者/单一消费者的情况,我决定启用LMAX Disruptor ——一个基于环形缓冲区的高性能进程间消息库。
图3: 单一生产者和单一消费者的LMAX Disruptor
借助Disruptor,很容易实现低延迟、高吞吐量的线程间消息通讯。它还为生产者和消费者的不同组合提供了不同的用例。几个线程可以互不阻塞地读取环形缓冲中的消息:
图 4: 单一生产者和两个消费者的LMAX Disruptor
下面是有多个生产者写入环形缓冲区,多个消费者从中读取消息的场景。
图 5: 两个生产者和两个消费者的LMAX Disruptor
经过对性能测试的快速搜索,我找到了 三个发布者和一个消费者的吞吐量测试。 这个真是正合我意,它给出了下面这个结果:
LinkedBlockingQueue
Disruptor
Run 0
4,550,625 ops/sec
11,487,650 ops/sec
Run 1
4,651,162 ops/sec
11,049,723 ops/sec
Run 2
4,404,316 ops/sec
11,142,061 ops/sec
在3 个生产者/1个 消费者场景下, Disruptor要比LinkedBlockingQueue快两倍多。然而这跟我所期望的性能上提升10倍仍有很大差距。
这让我觉得很沮丧,并且我的大脑一直在搜寻解决方案。就像命中注定一样,我最近不在跟人拼车上下班,而是改乘地铁了。突然灵光一闪,我的大脑开始将车站跟生产者消费者对应起来。在一个车站里,既有生产者(车和下车的人),也有消费者(同一辆车和上车的人)。
我创建了 Railway类,并用AtomicLong追踪从一站到下一站的列车。我先从简单的场景开始,只有一辆车的铁轨。
public class RailWay {
private final Train train = new Train();
// stationNo追踪列车并定义哪个车站接收到了列车
private final AtomicInteger stationIndex = new AtomicInteger();
// 会有多个线程访问这个方法,并等待特定车站上的列车
public Train waitTrainOnStation(final int stationNo) {
while (stationIndex.get() % stationCount != stationNo) {
Thread.yield(); // 为保证高吞吐量的消息传递,这个是必须的。
//但在等待列车时它会消耗CPU周期
}
// 只有站号等于stationIndex.get() % stationCount时,这个忙循环才会返回
return train;
}
// 这个方法通过增加列车的站点索引将这辆列车移到下一站
public void sendTrain() {
stationIndex.getAndIncrement();
}
}
为了测试,我用的条件跟在Disruptor性能测试中用的一样,并且也是测的SPSC队列——测试在线程间传递long值。我创建了下面这个Train类,其中包含了一个long数组:
public class Train {
//
public static int CAPACITY = 2*1024;
private final long[] goodsArray; // 传输运输货物的数组
private int index;
public Train() {
goodsArray = new long[CAPACITY];
}
public int goodsCount() { //返回货物数量
return index;
}
public void addGoods(long i) { // 向列车中添加条目
goodsArray[index++] = i;
}
public long getGoods(int i) { //从列车中移走条目
index--;
return goodsArray[i];
}
}
然后我写了一个简单的测试 :两个线程通过列车互相传递long值。
图 6: 使用单辆列车的单一生产者和单一消费者Railway
public void testRailWay() {
final Railway railway = new Railway();
final long n = 20000000000l;
//启动一个消费者进程
new Thread() {
long lastValue = 0;
@Override
public void run() {
while (lastValue < n) {
Train train = railway.waitTrainOnStation(1); //在#1站等列车
int count = train.goodsCount();
for (int i = 0; i < count; i++) {
lastValue = train.getGoods(i); // 卸货
}
railway.sendTrain(); //将当前列车送到第一站
}
}
}.start();
final long start = System.nanoTime();
long i = 0;
while (i < n) {
Train train = railway.waitTrainOnStation(0); // 在#0站等列车
int capacity = train.getCapacity();
for (int j = 0; j < capacity; j++) {
train.addGoods((int)i++); // 将货物装到列车上
}
railway.sendTrain();
if (i % 100000000 == 0) { //每隔100M个条目测量一次性能
final long duration = System.nanoTime() - start;
final long ops = (i * 1000L * 1000L * 1000L) / duration;
System.out.format("ops/sec = %,d\n", ops);
System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);
System.out.format("latency nanos = %.3f%n\n",
duration / (float)(i) * (float)Train.CAPACITY);
}
}
}
在不同的列车容量下运行这个测试,结果惊着我了:
容量
吞吐量: ops/sec
延迟: ns
1
5,190,883
192.6
2
10,282,820
194.5
32
104,878,614
305.1
256
344,614,640
742. 9
2048
608,112,493
3,367.8
32768
767,028,751
42,720.7
在列车容量达到32,768时,两个线程传送消息的吞吐量达到了767,028,751 ops/sec。比Nitsan博客中的SPSC队列快了几倍。
继续按铁路列车这个思路思考,我想知道如果有两辆列车会怎么样?我觉得应该能提高吞吐量,同时还能降低延迟。每个车站都会有它自己的列车。当一辆列车在第一个车站装货时,第二辆列车会在第二个车站卸货,反之亦然。
图 7: 使用两辆列车的单一生产者和单一消费者Railway
下面是吞吐量的结果:
容量
吞吐量: ops/sec
延时: ns
1
7,492,684
133.5
2
14,754,786
135.5
32
174,227,656
183.7
256
613,555,475
417.2
2048
940,144,900
2,178.4
32768
797,806,764
41,072.6
结果是惊人的;比单辆列车的结果快了1.4倍多。列车容量为一时,延迟从192.6纳秒降低到133.5纳秒;这显然是一个令人鼓舞的迹象。
因此我的实验还没结束。列车容量为2048的两个线程传递消息的延迟为2,178.4 纳秒,这太高了。我在想如何降低它,创建一个有很多辆列车 的例子:
图 8: 使用多辆列车的单一生产者和单一消费者Railway
我还把列车容量降到了1个long值,开始玩起了列车数量。下面是测试结果:
列车数量
吞吐量: ops/sec
延迟: ns
2
10,917,951
91.6
32
31,233,310
32.0
256
42,791,962
23.4
1024
53,220,057
18.8
32768
71,812,166
13.9
用32,768 列车在线程间发送一个long值的延迟降低到了13.9 纳秒。通过调整列车数量和列车容量,当延时不那么高,吞吐量不那么低时,吞吐量和延时就达到了最佳平衡。
对于单一生产者和单一消费者(SPSC)而言,这些数值很棒;但我们怎么让它在有多个生产者和消费者时也能生效呢?答案很简单,添加更多的车站!
图 9:一个生产者和两个消费者的Railway
每个线程都等着下一趟列车,装货/卸货,然后把列车送到下一站。在生产者往列车上装货时,消费者在从列车上卸货。列车周而复始地从一个车站转到另一个车站。
为了测试单一生产者/多消费者(SPMC) 的情况,我创建了一个有8个车站的Railway测试。 一个车站属于一个生产者,而另外7个车站属于消费者。结果是:
列车数量 = 256 ,列车容量 = 32:
ops/sec = 116,604,397 延迟(纳秒) = 274.4
列车数量= 32,列车容量= 256:
ops/sec = 432,055,469 延迟(纳秒) = 592.5
如你所见,即便有8个工作线程,测试给出的结果也相当好-- 32辆容量为256个long的列车吞吐量为432,055,469 ops/sec。在测试期间,所有CPU内核的负载都是100%。
图 10:在测试有8个车站的Railway 期间的CPU 使用情况
在玩这个Railway算法时,我几乎忘了我最初的目标:提升多生产者/单消费者情况下的性能。
图 11:三个生产者和一个消费者的 Railway
我创建了3个生产者和1个消费者的新测试。每辆列车一站一站地转圈,而每个生产者只给每辆车装1/3容量的货。消费者取出每辆车上三个生产者给出的全部三项货物。性能测试给出的平均结果如下所示:
ops/sec = 162,597,109 列车/秒 = 54,199,036 延迟(纳秒) = 18.5
结果相当棒。生产者和消费者工作的速度超过了160M ops/sec。
为了填补差异,下面给出相同情况下的Disruptor结果- 3个生产者和1个消费者:
Run 0, Disruptor=11,467,889 ops/sec
Run 1, Disruptor=11,280,315 ops/sec
Run 2, Disruptor=11,286,681 ops/sec
Run 3, Disruptor=11,254,924 ops/sec
下面是另一个批量消息的Disruptor 3P:1C 测试 (10 条消息每批):
Run 0, Disruptor=116,009,280 ops/sec
Run 1, Disruptor=128,205,128 ops/sec
Run 2, Disruptor=101,317,122 ops/sec
Run 3, Disruptor=98,716,683 ops/sec;
最后是用带LinkedBlockingQueue 实现的Disruptor 在3P:1C场景下的测试结果:
Run 0, BlockingQueue=4,546,281 ops/sec
Run 1, BlockingQueue=4,508,769 ops/sec
Run 2, BlockingQueue=4,101,386 ops/sec
Run 3, BlockingQueue=4,124,561 ops/sec
如你所见,Railway方式的平均吞吐量是162,597,109 ops/sec,而Disruptor在同样的情况下的最好结果只有128,205,128 ops/sec。至于 LinkedBlockingQueue,最好的结果只有4,546,281 ops/sec。
Railway算法为事件批处理提供了一种可以显著增加吞吐量的简易办法。通过调整列车容量或列车数量,很容易达成想要的吞吐量/延迟。
另外, 当同一个线程可以用来消费消息,处理它们并向环中返回结果时,通过混合生产者和消费者,Railway也能用来处理复杂的情况:
图 12: 混合生产者和消费者的Railway
最后,我会提供一个经过优化的超高吞吐量 单生产者/单消费者测试:
图 13:单个生产者和单个消费者的Railway
它的平均结果为:吞吐量超过每秒15亿 (1,569,884,271)次操作,延迟为1.3 微秒。如你所见,本文开头描述的那个规模相同的单线程测试的结果是每秒2,681,850,373。
你自己想想结论是什么吧。
我希望将来再写一篇文章,阐明如何用Queue和 BlockingQueue接口支持Railway算法,用来处理不同的生产者和消费者组合。敬请关注。
关于作者
Aliaksei Papou 是Specific集团的首席软件工程师和架构师,那是一家位于奥地利维也纳的软件开发公司。Aliaksei有超过10年的小型和大型企业应用软件开发经验。他有一个坚定的信念:编写并发代码不应该这么难。
原文英文链接:Inter-thread communications in Java at the speed of light
感谢侯伯薇对本文的审校。
给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ)或者腾讯微博(@InfoQ)关注我们,并与我们的编辑和其他读者朋友交流。
只用普通的老式Java接口实现POJO的通讯。
它可能跟Akka的类型化actor类似,但作为一个必须超级轻量,并且要针对单台多核计算机进行优化的新框架,那个可能有点过了。
相关厂商内容
高德地图街景API发布,开放全部街景数据及功能(FREE)
《婚恋交友中的推荐系统应用》——世纪佳缘研发中心总监吴金龙确认QCon分享
《你应该更新的Java知识》——JavaOne Duke大奖得主郑晔北京QCon分享话题确认
2013~2014年度InfoQ读者深度调查火热进行中
《PM2.5的大数据分析》—— 英特尔中国研究院首席架构师姜小凡确认QCon分享
当actor跨越不同JVM实例(在同一台机器上,或分布在网络上的不同机器上)的进程边界时,Akka框架很善于处理进程间的通讯。
但对于那种只需要线程间通讯的小型项目而言,用Akka类型化actor可能有点儿像用牛刀杀鸡,不过类型化actor仍然是一种理想的实现方式。
我花了几天时间,用动态代理,阻塞队列和缓存线程池创建了一个解决方案。
图一是这个框架的高层次架构:
图一: 框架的高层次架构
SPSC队列是指单一生产者/单一消费者队列。MPSC队列是指多生产者/单一消费者队列。
派发线程负责接收Actor线程发送的消息,并把它们派发到对应的SPSC队列中去。
接收到消息的Actor线程用其中的数据调用相应的actor实例中的方法。借助其他actor的代理,actor实例可以将消息发送到MPSC队列中,然后消息会被发送给目标actor线程。
我创建了一个简单的例子来测试,就是下面这个打乒乓球的程序:
public interface PlayerA (
void pong(long ball); //发完就忘的方法调用
}
public interface PlayerB {
void ping(PlayerA playerA, long ball); //发完就忘的方法调用
}
public class PlayerAImpl implements PlayerA {
@Override
public void pong(long ball) {
}
}
public class PlayerBImpl implements PlayerB {
@Override
public void ping(PlayerA playerA, long ball) {
playerA.pong(ball);
}
}
public class PingPongExample {
public void testPingPong() {
// 管理器隐藏了线程间通讯的复杂性
// 控制actor代理,actor实现和线程
ActorManager manager = new ActorManager();
// 在管理器内注册actor实现
manager.registerImpl(PlayerAImpl.class);
manager.registerImpl(PlayerBImpl.class);
//创建actor代理。代理会将方法调用转换成内部消息。
//会在线程间发给特定的actor实例。
PlayerA playerA = manager.createActor(PlayerA.class);
PlayerB playerB = manager.createActor(PlayerB.class);
for(int i = 0; i < 1000000; i++) {
playerB.ping(playerA, i);
}
}
经过测试,速度大约在每秒500,000 次乒/乓左右;还不错吧。然而跟单线程的运行速度比起来,我突然就感觉没那么好了。在 单线程 中运行的代码每秒速度能达到20亿 (2,681,850,373)!
居然差了5,000 多倍。太让我失望了。在大多数情况下,单线程代码的效果都比多线程代码更高效。
我开始找原因,想看看我的乒乓球运动员们为什么这么慢。经过一番调研和测试,我发现是阻塞队列的问题,我用来在actor间传递消息的队列影响了性能。
图 2: 只有一个生产者和一个消费者的SPSC队列
所以我发起了一场竞赛,要将它换成Java里最快的队列。我发现了Nitsan Wakart的 博客 。他发了几篇文章介绍单一生产者/单一消费者(SPSC)无锁队列的实现。这些文章受到了Martin Thompson的演讲 终极性能的无锁算法的启发。
跟基于私有锁的队列相比,无锁队列的性能更优。在基于锁的队列中,当一个线程得到锁时,其它线程就要等着锁被释放。而在无锁的算法中,某个生产者线程生产消息时不会阻塞其它生产者线程,消费者也不会被其它读取队列的消费者阻塞。
在Martin Thompson的演讲以及在Nitsan的博客中介绍的SPSC队列的性能简直令人难以置信—— 超过了100M ops/sec。比JDK的并发队列实现还要快10倍 (在4核的 Intel Core i7 上的性能大约在 8M ops/sec 左右)。
我怀着极大的期望,将所有actor上连接的链式阻塞队列都换成了无锁的SPSC队列。可惜,在吞吐量上的性能测试并没有像我预期的那样出现大幅提升。不过很快我就意识到,瓶颈并不在SPSC队列上,而是在多个生产者/单一消费者(MPSC)那里。
用SPSC队列做MPSC队列的任务并不那么简单;在做put操作时,多个生产者可能会覆盖掉彼此的值。SPSC 队列就没有控制多个生产者put操作的代码。所以即便换成最快的SPSC队列,也解决不了我的问题。
为了处理多个生产者/单一消费者的情况,我决定启用LMAX Disruptor ——一个基于环形缓冲区的高性能进程间消息库。
图3: 单一生产者和单一消费者的LMAX Disruptor
借助Disruptor,很容易实现低延迟、高吞吐量的线程间消息通讯。它还为生产者和消费者的不同组合提供了不同的用例。几个线程可以互不阻塞地读取环形缓冲中的消息:
图 4: 单一生产者和两个消费者的LMAX Disruptor
下面是有多个生产者写入环形缓冲区,多个消费者从中读取消息的场景。
图 5: 两个生产者和两个消费者的LMAX Disruptor
经过对性能测试的快速搜索,我找到了 三个发布者和一个消费者的吞吐量测试。 这个真是正合我意,它给出了下面这个结果:
LinkedBlockingQueue
Disruptor
Run 0
4,550,625 ops/sec
11,487,650 ops/sec
Run 1
4,651,162 ops/sec
11,049,723 ops/sec
Run 2
4,404,316 ops/sec
11,142,061 ops/sec
在3 个生产者/1个 消费者场景下, Disruptor要比LinkedBlockingQueue快两倍多。然而这跟我所期望的性能上提升10倍仍有很大差距。
这让我觉得很沮丧,并且我的大脑一直在搜寻解决方案。就像命中注定一样,我最近不在跟人拼车上下班,而是改乘地铁了。突然灵光一闪,我的大脑开始将车站跟生产者消费者对应起来。在一个车站里,既有生产者(车和下车的人),也有消费者(同一辆车和上车的人)。
我创建了 Railway类,并用AtomicLong追踪从一站到下一站的列车。我先从简单的场景开始,只有一辆车的铁轨。
public class RailWay {
private final Train train = new Train();
// stationNo追踪列车并定义哪个车站接收到了列车
private final AtomicInteger stationIndex = new AtomicInteger();
// 会有多个线程访问这个方法,并等待特定车站上的列车
public Train waitTrainOnStation(final int stationNo) {
while (stationIndex.get() % stationCount != stationNo) {
Thread.yield(); // 为保证高吞吐量的消息传递,这个是必须的。
//但在等待列车时它会消耗CPU周期
}
// 只有站号等于stationIndex.get() % stationCount时,这个忙循环才会返回
return train;
}
// 这个方法通过增加列车的站点索引将这辆列车移到下一站
public void sendTrain() {
stationIndex.getAndIncrement();
}
}
为了测试,我用的条件跟在Disruptor性能测试中用的一样,并且也是测的SPSC队列——测试在线程间传递long值。我创建了下面这个Train类,其中包含了一个long数组:
public class Train {
//
public static int CAPACITY = 2*1024;
private final long[] goodsArray; // 传输运输货物的数组
private int index;
public Train() {
goodsArray = new long[CAPACITY];
}
public int goodsCount() { //返回货物数量
return index;
}
public void addGoods(long i) { // 向列车中添加条目
goodsArray[index++] = i;
}
public long getGoods(int i) { //从列车中移走条目
index--;
return goodsArray[i];
}
}
然后我写了一个简单的测试 :两个线程通过列车互相传递long值。
图 6: 使用单辆列车的单一生产者和单一消费者Railway
public void testRailWay() {
final Railway railway = new Railway();
final long n = 20000000000l;
//启动一个消费者进程
new Thread() {
long lastValue = 0;
@Override
public void run() {
while (lastValue < n) {
Train train = railway.waitTrainOnStation(1); //在#1站等列车
int count = train.goodsCount();
for (int i = 0; i < count; i++) {
lastValue = train.getGoods(i); // 卸货
}
railway.sendTrain(); //将当前列车送到第一站
}
}
}.start();
final long start = System.nanoTime();
long i = 0;
while (i < n) {
Train train = railway.waitTrainOnStation(0); // 在#0站等列车
int capacity = train.getCapacity();
for (int j = 0; j < capacity; j++) {
train.addGoods((int)i++); // 将货物装到列车上
}
railway.sendTrain();
if (i % 100000000 == 0) { //每隔100M个条目测量一次性能
final long duration = System.nanoTime() - start;
final long ops = (i * 1000L * 1000L * 1000L) / duration;
System.out.format("ops/sec = %,d\n", ops);
System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);
System.out.format("latency nanos = %.3f%n\n",
duration / (float)(i) * (float)Train.CAPACITY);
}
}
}
在不同的列车容量下运行这个测试,结果惊着我了:
容量
吞吐量: ops/sec
延迟: ns
1
5,190,883
192.6
2
10,282,820
194.5
32
104,878,614
305.1
256
344,614,640
742. 9
2048
608,112,493
3,367.8
32768
767,028,751
42,720.7
在列车容量达到32,768时,两个线程传送消息的吞吐量达到了767,028,751 ops/sec。比Nitsan博客中的SPSC队列快了几倍。
继续按铁路列车这个思路思考,我想知道如果有两辆列车会怎么样?我觉得应该能提高吞吐量,同时还能降低延迟。每个车站都会有它自己的列车。当一辆列车在第一个车站装货时,第二辆列车会在第二个车站卸货,反之亦然。
图 7: 使用两辆列车的单一生产者和单一消费者Railway
下面是吞吐量的结果:
容量
吞吐量: ops/sec
延时: ns
1
7,492,684
133.5
2
14,754,786
135.5
32
174,227,656
183.7
256
613,555,475
417.2
2048
940,144,900
2,178.4
32768
797,806,764
41,072.6
结果是惊人的;比单辆列车的结果快了1.4倍多。列车容量为一时,延迟从192.6纳秒降低到133.5纳秒;这显然是一个令人鼓舞的迹象。
因此我的实验还没结束。列车容量为2048的两个线程传递消息的延迟为2,178.4 纳秒,这太高了。我在想如何降低它,创建一个有很多辆列车 的例子:
图 8: 使用多辆列车的单一生产者和单一消费者Railway
我还把列车容量降到了1个long值,开始玩起了列车数量。下面是测试结果:
列车数量
吞吐量: ops/sec
延迟: ns
2
10,917,951
91.6
32
31,233,310
32.0
256
42,791,962
23.4
1024
53,220,057
18.8
32768
71,812,166
13.9
用32,768 列车在线程间发送一个long值的延迟降低到了13.9 纳秒。通过调整列车数量和列车容量,当延时不那么高,吞吐量不那么低时,吞吐量和延时就达到了最佳平衡。
对于单一生产者和单一消费者(SPSC)而言,这些数值很棒;但我们怎么让它在有多个生产者和消费者时也能生效呢?答案很简单,添加更多的车站!
图 9:一个生产者和两个消费者的Railway
每个线程都等着下一趟列车,装货/卸货,然后把列车送到下一站。在生产者往列车上装货时,消费者在从列车上卸货。列车周而复始地从一个车站转到另一个车站。
为了测试单一生产者/多消费者(SPMC) 的情况,我创建了一个有8个车站的Railway测试。 一个车站属于一个生产者,而另外7个车站属于消费者。结果是:
列车数量 = 256 ,列车容量 = 32:
ops/sec = 116,604,397 延迟(纳秒) = 274.4
列车数量= 32,列车容量= 256:
ops/sec = 432,055,469 延迟(纳秒) = 592.5
如你所见,即便有8个工作线程,测试给出的结果也相当好-- 32辆容量为256个long的列车吞吐量为432,055,469 ops/sec。在测试期间,所有CPU内核的负载都是100%。
图 10:在测试有8个车站的Railway 期间的CPU 使用情况
在玩这个Railway算法时,我几乎忘了我最初的目标:提升多生产者/单消费者情况下的性能。
图 11:三个生产者和一个消费者的 Railway
我创建了3个生产者和1个消费者的新测试。每辆列车一站一站地转圈,而每个生产者只给每辆车装1/3容量的货。消费者取出每辆车上三个生产者给出的全部三项货物。性能测试给出的平均结果如下所示:
ops/sec = 162,597,109 列车/秒 = 54,199,036 延迟(纳秒) = 18.5
结果相当棒。生产者和消费者工作的速度超过了160M ops/sec。
为了填补差异,下面给出相同情况下的Disruptor结果- 3个生产者和1个消费者:
Run 0, Disruptor=11,467,889 ops/sec
Run 1, Disruptor=11,280,315 ops/sec
Run 2, Disruptor=11,286,681 ops/sec
Run 3, Disruptor=11,254,924 ops/sec
下面是另一个批量消息的Disruptor 3P:1C 测试 (10 条消息每批):
Run 0, Disruptor=116,009,280 ops/sec
Run 1, Disruptor=128,205,128 ops/sec
Run 2, Disruptor=101,317,122 ops/sec
Run 3, Disruptor=98,716,683 ops/sec;
最后是用带LinkedBlockingQueue 实现的Disruptor 在3P:1C场景下的测试结果:
Run 0, BlockingQueue=4,546,281 ops/sec
Run 1, BlockingQueue=4,508,769 ops/sec
Run 2, BlockingQueue=4,101,386 ops/sec
Run 3, BlockingQueue=4,124,561 ops/sec
如你所见,Railway方式的平均吞吐量是162,597,109 ops/sec,而Disruptor在同样的情况下的最好结果只有128,205,128 ops/sec。至于 LinkedBlockingQueue,最好的结果只有4,546,281 ops/sec。
Railway算法为事件批处理提供了一种可以显著增加吞吐量的简易办法。通过调整列车容量或列车数量,很容易达成想要的吞吐量/延迟。
另外, 当同一个线程可以用来消费消息,处理它们并向环中返回结果时,通过混合生产者和消费者,Railway也能用来处理复杂的情况:
图 12: 混合生产者和消费者的Railway
最后,我会提供一个经过优化的超高吞吐量 单生产者/单消费者测试:
图 13:单个生产者和单个消费者的Railway
它的平均结果为:吞吐量超过每秒15亿 (1,569,884,271)次操作,延迟为1.3 微秒。如你所见,本文开头描述的那个规模相同的单线程测试的结果是每秒2,681,850,373。
你自己想想结论是什么吧。
我希望将来再写一篇文章,阐明如何用Queue和 BlockingQueue接口支持Railway算法,用来处理不同的生产者和消费者组合。敬请关注。
关于作者
Aliaksei Papou 是Specific集团的首席软件工程师和架构师,那是一家位于奥地利维也纳的软件开发公司。Aliaksei有超过10年的小型和大型企业应用软件开发经验。他有一个坚定的信念:编写并发代码不应该这么难。
原文英文链接:Inter-thread communications in Java at the speed of light
感谢侯伯薇对本文的审校。
给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ)或者腾讯微博(@InfoQ)关注我们,并与我们的编辑和其他读者朋友交流。
发表评论
-
将json格式的字符数组转为List对象
2015-08-10 15:18 864使用的是json-lib.jar包 将json格式的字符数组 ... -
用httpPost对JSON发送和接收的例子
2015-08-10 11:16 1057HTTPPost发送JSON: private static ... -
zookeeper适用场景:zookeeper解决了哪些问题
2015-07-31 18:01 708问题导读: 1.master挂机 ... -
java泛型
2015-07-29 10:48 730什么是泛型? 泛型(Ge ... -
Java线程Dump分析工具--jstack
2015-06-23 11:09 672jstack用于打印出给定的java进程ID或core fil ... -
什么是spark?
2015-04-10 09:37 432关于Spark: Spark是UC Berkeley AM ... -
dubbo 教程
2015-04-09 19:21 734先给出阿里巴巴dubbo的 ... -
jre/bin目录下面工具说明
2015-03-20 16:45 608jre/bin目录下面工具说明 ... -
JVM系列三:JVM参数设置、分析
2015-01-30 11:18 655不管是YGC还 ... -
jstat使用
2015-01-27 11:11 645jstat 1. jstat -gc pid ... -
查看java堆栈情况(cpu占用过高)
2015-01-27 11:10 6911. 确定占用cpu高的线程id: 方法一: 直接使用 ps ... -
慎用ArrayList的contains方法,使用HashSet的contains方法代替
2015-01-20 14:14 1104在启动一个应用的时候,发现其中有一处数据加载要数分钟,刚开始 ... -
Java虚拟机工作原理详解
2015-01-16 10:00 682一、类加载器首先来 ... -
jdk1.5-1.9新特性
2014-11-11 10:22 82321.51.自动装箱与拆箱:2.枚举(常用来设计单例模式 ... -
java动态代理(JDK和cglib)
2014-09-24 15:51 438JAVA的动态代理 代理模式 代理模式是常用的java设计 ... -
Java动态代理机制详解(JDK 和CGLIB,Javassist,ASM)
2014-09-24 15:45 657class文件简介及加载 Java编译器编译 ... -
怎么用github下载资源
2014-09-24 11:18 4131、下载github:到http://windows. ... -
maven项目时jar包没有到lib目录下
2014-09-01 20:05 2414在建项目时路径都设置好了,为什么在eclipse中运行mav ... -
使用并行计算大幅提升递归算法效率
2014-08-27 15:04 569前言: 无论什么样的 ... -
JAVA 实现FTP
2014-08-22 14:41 669一个JAVA 实现FTP功能的代码,包括了服务器的设置模块, ...
相关推荐
JAVA100例之实例64 JAVA线程间通讯
Java的多线程-线程间的通信.doc
彻底明白Java的多线程-线程间的通信.doc
Java线程Java线程Java线程Java线程Java线程Java线程
管道(pipe)流是一种特殊的流,用于在不同线程(threads)间直接传送数据。...通过使用管道,实现不同线程间的通讯。本文在简要介绍管道的基本概念及管道的创建与使用后,并以一个具体的实例pipeapp加以详细说明。
disruptor:高性能Java线程间通讯库
Java线程Java线程Java线程Java线程Java线程Java线程Java线程Java线程Java线程Java线程Java线程Java线程Java线程Java线程Java线程
android 线程间通讯
Java线程讲解Java线程讲解Java线程讲解Java线程讲解Java线程讲解Java线程讲解Java线程讲解Java线程讲解Java线程讲解Java线程讲解
Java 多线程间的通讯.doc Java 多线程间的通讯.doc
jeromq-0.3.5.jar 线程间通讯
电子书相关:包含4个有关JAVA线程的电子书(几乎涵盖全部有关线程的书籍) OReilly.Java.Threads.3rd.Edition.Sep.2004.eBook-DDU Java Thread Programming (Sams) java线程第二版中英文 java线程第二版中英文 ...
Java多线程机制 9.1 Java中的线程 9.2 Thread的子类创建线程 9.3 使用Runable接口 9.4 线程的常用方法 9.5 GUI线程 9.6 线程同步 9.7 在同步方法中使用wait()、notify 和notifyAll()方法 9.8 挂起、恢复和终止线程 ...
Java的线程和Java AppletJava的线程和Java AppletJava的线程和Java AppletJava的线程和Java AppletJava的线程和Java Applet
Java线程:概念与原理 Java线程:创建与启动 Java线程:线程栈模型与线程的变量 Java线程:线程状态的转换 Java线程:线程的同步与锁 Java线程:线程的交互 Java线程:线程的调度-休眠 Java线程:线程的调度-优先级 ...
Java 模拟线程并发 Java, 模拟线程并发,线程,并发 Java, 模拟线程并发,线程,并发 Java, 模拟线程并发,线程,并发 Java, 模拟线程并发,线程,并发
单线程 单线程 单线程 单线程 单线程 单线程
java多线程PPT 多线程基本概念 创建线程的方式 线程的挂起与唤醒 多线程问题
java 线程java 线程java 线程java 线程java 线程java 线程java 线程java 线程java 线程
java多线程每个线程挨着打印ABC的4种实现方式,有4个线程t1、t2、t3、t4,t1打印A后t2打印A再t3打印A再t4打印A,然后从新回到t1打印B再t2打印B...t4打印B... 4个线程轮流打印abc... 一个线程可以理解为一个人,打印...