本篇文章给大家分享的是有关使用CountDownLatch怎么实现一个并发框架,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
目录结构
package com.**.**.base.support.executor;import lombok.NoArgsConstructor;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.util.Assert;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import java.util.stream.Collectors;/** * @Author lbm * @Date 2019/10/9 10:32 上午 * @Description YJ执行器 **/@NoArgsConstructorpublic class YjExecutor {private static final Log log = LogFactory.getLog(YjExecutor.class);private static final int FIRST_RETURN = 1;private CountDownLatch latch;private List<ExecuteUnit> executeUnits = new ArrayList<>();private Executor pool = Executors.newFixedThreadPool(10);public static YjExecutor build() {return new YjExecutor();
}public YjExecutor add(ExecuteUnit paramUnit) {
paramCheck(paramUnit);executeUnits.add(paramUnit);return this;
}public YjExecutor addAll(List<ExecuteUnit> paramUnits) {
paramCheck(paramUnits);this.executeUnits.addAll(paramUnits);return this;
}/** * 第一个执行完返回 * * @return 执行结果 */ public ReturnT firstPriorityReturn() {this.latch = new CountDownLatch(FIRST_RETURN);
List<ReturnT> results = this.submit();return results.stream()
.filter(res -> res != null && res.getRes() != null)
.findFirst()
.orElse(null);
}/** * 所有执行完才返回 * * @return 执行结果 */ public List<ReturnT> allReturn() {this.latch = new CountDownLatch(executeUnits.size());return submit();
}/** * 参数校验 * * @param paramUnits 参数列表 */ private void paramCheck(List<ExecuteUnit> paramUnits) {
Assert.notNull(paramUnits, "Argument can not be null !!");for (ExecuteUnit paramUnit : paramUnits) {
paramCheck(paramUnit);
}
}/** * 参数校验 * * @param paramUnit 单个参数 */ private void paramCheck(ExecuteUnit paramUnit) {
Assert.notNull(paramUnit, "Argument can not be null !!");
Assert.notNull(paramUnit.getFunc(), "Argument function can not be null !!");
}/** * 执行 */ private void execute() {try {for (ExecuteUnit executeUnit : executeUnits) {pool.execute(executeUnit.getYjRunner());
}latch.await();
} catch (Exception e) {log.error("execute error {}", e);
}
}/** * 提交 * * @return 返回执行结果 */ private List<ReturnT> submit() {
buildExecuteUnit();
execute();return executeUnits.stream()
.map(ExecuteUnit::getResult)
.collect(Collectors.toList());
}/** * 创建执行单元 */ private void buildExecuteUnit() {for (ExecuteUnit executeUnit : executeUnits) {
executeUnit.setYjRunner(executeUnit.yjRunnerBuild(latch));
}
}
}
package com.**.**.base.support.executor;import lombok.Builder;import lombok.Data;/** * @Author lbm * @Date 2019/10/9 10:10 上午 * @Description 执行结果 **/@Data@Builderpublic class ReturnT<R> {private ExecuteEnum resCode;private R res;
}
package com.**.**.base.support.executor;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.CountDownLatch;import java.util.function.Function;/** * @Author lbm * @Date 2019/10/9 7:33 下午 * @Description 执行单元 **/@Data@Builderpublic class ExecuteUnit<T, R> {private static Logger log = LoggerFactory.getLogger(ExecuteUnit.class);private Function<T, R> func;private T param;private ReturnT<R> result;private YjRunner yjRunner;public YjRunner yjRunnerBuild(CountDownLatch latch) {return new YjRunner(latch, this);
}@AllArgsConstructor public static class YjRunner implements Runnable {private CountDownLatch latch;private ExecuteUnit executeUnit;@Override public void run() {
ReturnT returnT = ReturnT.builder()
.resCode(ExecuteEnum.SUCCESS)
.build();try {
Object res = executeUnit.getFunc().apply(executeUnit.getParam());
returnT.setRes(res);
} catch (Exception e) {log.error("param: {} execute error: {}", executeUnit.getParam(), e);
returnT.setResCode(ExecuteEnum.FAIL);
} finally {executeUnit.setResult(returnT);this.latch.countDown();
}
}
}
}
package com.**.**.base.support.executor;/** * @Author lbm * @Date 2019/10/9 2:54 下午 * @Description 执行结果 **/public enum ExecuteEnum {/** * 执行结果 */ SUCCESS,FAIL;
}
package com.**.**.**.serviceTest;import com.alibaba.fastjson.JSONObject;import com.**.**.base.support.executor.ExecuteUnit;import com.**.**.base.support.executor.ReturnT;import com.**.**.base.support.executor.YjExecutor;import com.**.**.api.domain.SysUser;import com.**.**.server.BaseTest;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;import java.util.function.Function;/** * @Author lbm * @Date 2019/10/9 10:45 上午 * @Description yj执行器测试 **/public class ExecutorTest extends BaseTest {private Logger logger = LoggerFactory.getLogger(this.getClass());private Function<Object, SysUser> func1 = (a) -> {
SysUser user = new SysUser();
user.setId((Long) a);return user;
};private Function<Long, Long> func2 = (a) -> {try {
Thread.sleep(5000L);
} catch (Exception e) {logger.error("error {}", e.getMessage());
}return (a * a);
};private Function<Long, Long> func3 = (a) -> {try {
Thread.sleep(6000L);
} catch (Exception e) {
e.printStackTrace();logger.error("error {}", e.getMessage());
}return a - a;
};@Test public void allReturnTest() {
List<ExecuteUnit> executeUnits = new ArrayList<>();
ExecuteUnit<Long, Long> unit1 = ExecuteUnit.<Long, Long>builder()
.func(func2)
.param(20L)
.build();
executeUnits.add(unit1);
executeUnits.add(ExecuteUnit.<Object, SysUser>builder()
.func(func1)
.param(1L)
.build());
executeUnits.add(ExecuteUnit.<Long, Long>builder()
.func(func3)
.param(22L)
.build());
List<ReturnT> results = YjExecutor.build()
.addAll(executeUnits)
.allReturn();for (ReturnT res : results) {logger.info(JSONObject.toJSONString(res));
}
}@Test public void firstPriorityReturnTest() {
List<ExecuteUnit> executeUnits = new ArrayList<>();
executeUnits.add(ExecuteUnit.<Object, SysUser>builder()
.func(func1)
.param(1L)
.build());
executeUnits.add(ExecuteUnit.<Long, Long>builder()
.func(func3)
.param(22L)
.build());
ReturnT result = YjExecutor.build()
.add(ExecuteUnit.<Long, Long>builder()
.func(func2)
.param(20L)
.build())
.addAll(executeUnits)
.firstPriorityReturn();logger.info(JSONObject.toJSONString(result));
}
}
以上就是使用CountDownLatch怎么实现一个并发框架,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4227761/blog/3120812