
本文共 3941 字,大约阅读时间需要 13 分钟。
早前的旧文中,我分享了使用 java.util.concurrent.Phaser
在处理大量异步任务场景下的使用。其中用到了phaser类的重要特性 可以灵活设置同步数量,在使用过程中注册新的同步对象。
但是在后续的使用过程中遇到的一些问题,主要有一下两点:
-
注册同步等待总量有上限
private static final int MAX_PARTIES = 0xffff;
-
功能复杂,API丰富但大部分用不到,在使用过程中经常调错API
今天终于无法忍受,特别是低一点,导致大量异步任务会丢数据。之前是按照非同步方式执行大量任务,但是今天遇到了不定任务量,一时没想到这茬,导致了半个小时数据构造化为泡影。
重写思路
一怒之下,决定自己重写一个加强版。总结了设计思路如下:
-
线程安全计数,用于统计完成任务数
-
线程安全状态技术,用于统计多少任务尚未完成
-
注册方法,用于增加任务统计技术
-
完成方法,用于减少未完成数量,增加完成任务数量
-
返回各类状态信息的方法
实现这样的功能,我们就得到了一个简单但加强功能的多线程同步类,用来替代 java.util.concurrent.Phaser
,我命名为 FunPhaser
。代码如下:
-
package com.funtester.frame
-
-
import java.util.concurrent.atomic.AtomicInteger
-
-
/**
-
* 自定义同步类,避免{@link java.util.concurrent.Phaser}的不足,总数量受限于65535
-
* 用于多线程任务同步,任务完成后,调用{@link #done()}方法,任务总数减少,当任务总数为0时,调用{@link #await()}方法,等待所有任务完成
-
*/
-
class FunPhaser extends SourceCode {
-
-
/**
-
* 任务总数索引,用于标记任务完成状态
-
* 注册增加,任务完成减少
-
*/
-
AtomicInteger index
-
-
/**
-
* 任务总数,用于记录任务完成数量
-
*/
-
AtomicInteger taskNum
-
-
FunPhaser() {
-
this.index = new AtomicInteger()
-
this.taskNum = new AtomicInteger()
-
}
-
-
/**
-
* 注册任务
-
* @return
-
*/
-
def register() {
-
this.index.getAndIncrement()
-
}
-
-
/**
-
* 任务完成
-
* @return
-
*/
-
def done() {
-
this.index.getAndDecrement()
-
this.taskNum.getAndIncrement()
-
}
-
-
/**
-
* 等待所有任务完成
-
* @return
-
*/
-
def await() {
-
waitFor {index.get() == 0}
-
}
-
-
/**
-
* 获取任务完成总数
-
* @return
-
*/
-
int queryTaskNum() {
-
return taskNum.get()
-
}
-
-
}
源码解读
这个自定义同步类 FunPhaser
用于多线程任务同步,它避免了 java.util.concurrent.Phaser
的不足,即总数量受限于 65535。
FunPhaser
类有以下几个成员变量:
-
index
:任务总数索引,用于标记任务完成状态。注册增加,任务完成减少。 -
taskNum
:任务总数,用于记录任务完成数量。
FunPhaser
类提供了以下几个方法:
-
index
和taskNum
是AtomicInteger
类型的属性,用于原子地操作整数值。 -
FunPhaser()
是该类的构造函数,初始化了index
和taskNum
属性。 -
register()
方法用于注册任务,每次调用会增加index
的值,表示新增一个任务。 -
done()
方法用于标记任务完成,每次调用会减少index
的值并增加taskNum
的值。 -
await()
方法用于等待所有任务完成。它调用了waitFor
方法,等待index
的值变为 0,表示所有任务已经完成。 -
queryTaskNum()
方法用于获取任务完成的总数,返回taskNum
的值。
演示Demo
FunPhaser
类的使用方法如下:
-
import com.funtester.frame.FunPhaser;
-
import java.util.concurrent.ExecutorService;
-
import java.util.concurrent.Executors;
-
public class FunPhaserDemo {
-
public static void main(String[] args) throws InterruptedException {
-
// 创建FunPhaser实例
-
FunPhaser phaser = new FunPhaser();
-
// 创建固定大小的线程池
-
ExecutorService executorService = Executors.newFixedThreadPool(5);
-
// 注册10个任务
-
for (int i = 0; i < 10; i++) {
-
executorService.submit(new Runnable() {
-
@Override
-
public void run() {
-
// 注册任务
-
phaser.register();
-
// 模拟耗时操作
-
try {
-
Thread.sleep(1000);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
// 标记任务完成
-
phaser.done();
-
}
-
});
-
}
-
// 等待所有任务完成
-
phaser.await();
-
// 输出已完成的任务数量
-
System.out.println("已完成的任务数量: " + phaser.queryTaskNum());
-
// 关闭线程池
-
executorService.shutdown();
-
}
-
}
在这个示例中,我们创建了一个FunPhaser
对象,并使用固定大小为5的线程池来执行10个异步任务。每个任务在开始前调用register()
方法注册,完成后调用done()
方法标记。主线程通过调用await()
方法等待所有任务完成,并最终输出已完成的任务数量。
自定义关键字
在自定义关键字中的使用如下:
-
/**
-
* 使用自定义同步器{@link FunPhaser}进行多线程同步
-
*
-
* @param f 代码块
-
* @param phaser 同步器
-
*/
-
public static void fun(Closure f, FunPhaser phaser) {
-
if (phaser != null) phaser.register();
-
ThreadPoolUtil.executeSync(() -> {
-
try {
-
f.call();
-
} finally {
-
if (phaser != null) {
-
phaser.done();
-
logger.info("async task {}", phaser.queryTaskNum());
-
}
-
}
-
});
-
}
作为对照旧的实现代码如下:
-
/**
-
* 异步执行代码块,使用{@link Phaser}进行多线程同步
-
*
-
* @param f 代码块
-
* @param phaser 同步器
-
*/
-
public static void fun(Closure f, Phaser phaser) {
-
if (phaser != null) phaser.register();
-
ThreadPoolUtil.executeSync(() -> {
-
try {
-
f.call();
-
} finally {
-
if (phaser != null) {
-
phaser.arrive();
-
logger.info("异步任务完成 {}", phaser.getArrivedParties());
-
}
-
}
-
});
-
}
这两个实现代码的功能都是相同的,都是使用同步器来进行多线程任务同步。
旧的实现代码使用的是 Phaser
类。Phaser
类是一个通用的同步器,可以用于各种多线程任务同步场景。在旧的实现代码中,我们使用 register()
方法来注册任务,使用 arrive()
方法来表示任务完成。
新的实现代码使用的是 FunPhaser
类。FunPhaser
类是一个自定义的同步器,它避免了 Phaser
类的不足,即总数量受限于 65535。在新的实现代码中,我们使用 register()
方法来注册任务,使用 done()
方法来表示任务完成。
两种实现代码的对比
指标 | 旧的实现代码 | 新的实现代码 |
---|---|---|
使用到的同步器 | Phaser | FunPhaser |
总数量是否受限 | 是 | 否 |
代码简洁程度 | 较好 | 更好 |
总体而言,新的实现代码比旧的实现代码更加简洁易用,并且避免了 Phaser
类的不足。
感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:
这些资料,对于【】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取
发表评论
最新留言
关于作者
