这篇文章主要讲解了“DAG实现任务调度以及优化”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“DAG实现任务调度以及优化”吧!
github:https://github.com/smartxing/algorithm
1 有向图的构建
DAG dag = new DAG();
dag.addVertex("A");
dag.addVertex("B");
dag.addVertex("C");
dag.addVertex("D");
dag.addEdge("A", "B");
dag.addEdge("A", "C");
System.out.println(dag);
2 拓扑排序检测图中是否有环
public boolean isCircularity() {
Set<Object> set = inDegree.keySet();
//入度表
Map<Object, AtomicInteger> inDegree = set.stream().collect(Collectors
.toMap(k -> k, k -> new AtomicInteger(this.inDegree.get(k).size())));
//入度为0的节点
Set sources = getSources();
LinkedList<Object> queue = new LinkedList();
queue.addAll(sources);
while (!queue.isEmpty()) {
Object o = queue.removeFirst();
outDegree.get(o)
.forEach(so -> {
if (inDegree.get(so).decrementAndGet() == 0) {
queue.add(so);
}
});
}
return inDegree.values().stream().filter(x -> x.intValue() > 0).count() > 0;
}
3 stage优化
eg
如果任务存在如下的关系 , task1 执行完后执行 task2 ,task2 执行完后执行task3 ...
Task1 -> Task2 -> Task3 -> Task4
这些task 本来就要串行执行的 可以把这些task 打包在一块 减少线程上下文的切换
eg : 复杂一点的DAG:
/**
* H
* \
* G
* \
* A -> B
* \
* C- D -E - F-> J
*
*
*
* 优化后得 ==>
*
* (H,G)
* \
* A -> B
* \
* (C,D,E) - (F,J)
*
*/
详见chain方法: 关键代码如下
private void chain_(Set sources, final LinkedHashSetMultimap foutChain, final LinkedHashSetMultimap finChain) {
sources.forEach(sourceNode -> {
ArrayList<Object> maxStage = Lists.newArrayList();
findMaxStage(sourceNode, maxStage);
if (maxStage.size() > 1) { //存在需要合并的stage
addVertex(foutChain, finChain, maxStage);//添加一个新节点
Object o = maxStage.get(maxStage.size() - 1); //最后一个节点
reChain_(foutChain, finChain, maxStage, o);
}
if (maxStage.size() == 1) {
//不存在需要合并的stage
addVertex(foutChain, finChain, sourceNode);//添加一个新节点
Set subNodes = outDegree.get(sourceNode);
addSubNodeage(foutChain, finChain, sourceNode, subNodes);
}
});
}
4 测试DAG 执行
测试程序: 详见 DAGExecTest
1 新建一个task 只打印一句话
public static class Task implements Runnable {
private String taskName;
public Task(final String taskName) {
this.taskName = taskName;
}
@Override public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("i am running my name is " + taskName + " finish ThreadID: " + Thread.currentThread().getId());
}
public String getTaskName() {
return taskName;
}
@Override public String toString() {
return taskName;
}
}
2 构建DAG
DAG dag = DAG.create();
Task a = new Task("a");
Task b = new Task("b");
Task c = new Task("c");
Task d = new Task("d");
Task e = new Task("e");
Task f = new Task("f");
Task g = new Task("g");
Task h = new Task("h");
Task j = new Task("j");
dag.addVertex(a);
dag.addVertex(b);
dag.addVertex(c);
dag.addVertex(d);
dag.addVertex(e);
dag.addVertex(f);
dag.addVertex(g);
dag.addVertex(h);
dag.addVertex(j);
dag.addEdge(h, g);
dag.addEdge(g, b);
dag.addEdge(a, b);
dag.addEdge(b, f);
dag.addEdge(c, d);
dag.addEdge(d, e);
dag.addEdge(e, f);
dag.addEdge(f, j);
构建完成后如图
* H
* \
* G
* \
* A -> B
* \
* C- D -E - F-> J
3 stage 切分
DAG chain = dag.chain();
执行完图入下:
* (H,G)
* \
* A -> B
* \
* (C,D,E) - (F,J)
4 执行 DAG DAGExecTest 最终结果打印如下如下:
可以发现有3个Stage stage1 包含3个task task分别在不同的线程里面执行
其中c-d-e g-c f-j是经过优化的在同一个线程里面执行,减少了不必要的上下文切换
i am running my name is a finish ThreadID: 10
i am running my name is c finish ThreadID: 11
i am running my name is h finish ThreadID: 12
i am running my name is d finish ThreadID: 11
i am running my name is g finish ThreadID: 12
i am running my name is e finish ThreadID: 11
stage 结束 : task detached:a, task chain c-d-e task chain h-g
-----------------------------------------------
i am running my name is b finish ThreadID: 14
stage 结束 : task detached:b,
-----------------------------------------------
i am running my name is f finish ThreadID: 11
i am running my name is j finish ThreadID: 11
stage 结束 : task chain f-j
测试执行关键代码如下:
chain.execute(col -> {
Set set = (Set) col;
List<CompletableFuture> completableFutures = Lists.newArrayList();
StringBuilder sb = new StringBuilder();
set.stream().forEach(x -> {
if (x instanceof Task) {
CompletableFuture<Void> future = CompletableFuture.runAsync((Task) x, executorService);
completableFutures.add(future);
sb.append(" task detached:" + ((Task) x).getTaskName()).append(",");
}
if (x instanceof List) {
List<Task> taskList = (List) x;
CompletableFuture<Void> future = CompletableFuture.runAsync(()->
taskList.forEach(Task::run));
completableFutures.add(future);
sb.append(
" task chain " + Joiner.on("-").join(taskList.stream().map(Task::getTaskName).collect(Collectors.toList())));
}
});
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();
System.out.println("stage 结束 : " + sb.toString());
System.out.println("-----------------------------------------------");
});
感谢各位的阅读,以上就是“DAG实现任务调度以及优化”的内容了,经过本文的学习后,相信大家对DAG实现任务调度以及优化这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/xliangbo/blog/3101671