这篇文章主要讲解了“Storm的Transactional Topology怎么配置”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Storm的Transactional Topology怎么配置”吧!
○ 是一个每个tuple仅被处理一次的框架
○ 由Storm0.7引入,于Storm0.9被弃用,被triden取而代之
○ 底层依靠spout\bolt\topology\stream抽象的一个特性
基于Storm处理tuple失败时会重发(replay),如何确保replay的记录不被重复记录,换句话说就是如何保证tuple仅被处理一次,这就依赖于一个称作强顺序性的思想。
强顺序性:每个tuple与一个transaction id相关联,transaction id实际就是一个数字,每一个tuple都有一个按照顺序的transaction id(例如:tuple1的transaction id 为 1,tuple2的transaction id 为 2,...以此类推),只有当前的tuple处理并存储完毕,下一个tuple(处于等待状态)才能进行存储,tuple被存储时连同transaction id一并存储,此时考虑两种情况:
tuple处理失败时:重新发送一个和原来一模一样的transaction id
tuple处理成功时:发送的transaction id会和存储的transaction id对比,如果不存在transaction id,表示第一次记录,直接存储;如果发现存在,则忽略该tuple。
这一思想是由Kafka开发者提出来的。
基于上面的一个优化,将一批tuple直接打包成一个batch,然后分配一个transaction id ,让batch与batch之间保证强顺序性,且batch内部的tuples可以并行。
两个步骤:
1、并行计算batch中的tuple数量
2、batch强顺序性存储
在batch强顺序性存储的同时让其他等待存储的batch内部进行并行运算,不必等到下一个batch存储时才进行内部运算。
在Storm上面的两个步骤表现为processing阶段和commit阶段。
使用Transactional Topology时,storm提供如下操作:
将需要处理的状态如:transaction id 、batch meta等状态信息放在zookeeper
指定某个时间段执行processing操作和commit操作
storm使用acking框架自动检测batch被成功或失败处理,然后相应的重发(replay)
通过对普通的bolt进行包装,提供一套对batch处理的API、协调工作(即某个时刻处理某个processing或者commit),并且storm会自动清除中间结果
Transactional Topology是可以完全重发一个特定batch的消息队列系统,在 Kakfa中正是有这样的需求,为此Storm在storm-contrib里面的Storm-Kafka中为Kafka实现了一个事务性的spout。
计算来自输入流中tuple的个数
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3); builder.setBolt("partial-count", new BatchCount(), 5) .shuffleGrouping("spout"); builder.setBolt("sum", new UpdateGlobalCount()) .globalGrouping("partial-count");
○ 通过TransactionalTopologyBuilder类构建Transactional
参数:
Transaction ID:transactional topology的ID,在zookeeper中用于保存进度状态,重启topology时可以直接从执行的进度开始执行而不用重头到尾又执行一遍
Spout ID:位于整个Topology的Spout的ID
Spout Object:Transactional中的Spout对象
Spout:Trasactional中的Spout的并行数
○ MemoryTransactionalSpout用于从一个内存变量中读取数据
DATA:数据
tuple fields:字段
tupleNum:在batch中最大的tuple数
○ Bolts
第一个Bolt采用随机分组的方式随机分发到各个task
public static class BatchCount extends BaseBatchBolt { Object _id; BatchOutputCollector _collector; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _count++; } @Override public void finishBatch() { _collector.emit(new Values(_id, _count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "count")); } }
BatchBolt对象运行在BatchBoltExecutor中,BatchBoltExecutor负责BatchBolt对象的创建和清理
BatchBolt的ID在context对象中,该ID是一个TransactionAttempt对象.
BatchBolt在DRPC中也可以使用,只是txid类型不一样,如果在Transactional Topology中使用BatchBolt,可以继承BaseTransactionalBolt.
在Tranasctional Topology中所有的Tuple都必须以TransactionAttempt作为第一个field,然后storm才能根据该field判断Tuple所属的BatchBolt,所以在发射Tuple必须满足此条件。
TransactionAttempt对象中有两个属性:
transaction id:强顺序性,无论重发多少次都是一样的数字
attempt id:对每一个Batch标识的ID,每次重发都其值不一致,通过该ID可以区分每次重发的Tuple的不同版本
第二个Bolt使用GlobalGrouping汇总batch中的tuple数
public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter { TransactionAttempt _attempt; BatchOutputCollector _collector; int _sum = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { _collector = collector; _attempt = attempt; } @Override public void execute(Tuple tuple) { _sum+=tuple.getInteger(1); } @Override public void finishBatch() { Value val = DATABASE.get(GLOBAL_COUNT_KEY); Value newval; if(val == null || !val.txid.equals(_attempt.getTransactionId())) { newval = new Value(); newval.txid = _attempt.getTransactionId(); if(val==null) { newval.count = _sum; } else { newval.count = _sum + val.count; } DATABASE.put(GLOBAL_COUNT_KEY, newval); } else { newval = val; } _collector.emit(new Values(_attempt, newval.count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "sum")); } }
ICommitter接口:实现该接口的Bolt会在commit阶段调用finishBatch方法,该方法的调用会按照强顺序性,此外还可以使用TransactionalTopologyBuilder的setCommiterBolt来添加Bolt实现和该接口一样的功能。
executor方法:在processing阶段和commit阶段都可以执行。
关于更多的transactional topology例子可以看看storm-starter中的TransactionalWords类,该例子会在一个事务中更新多个数据库
BaiscBolt:该Bolt不跟batch中的tuples交互,仅基于单个传来的tuple和产生新的tuple
BatchBolt:该Bolt处理batch中的tuples,对于每一个tuple调用executor方法,整个batch完成时调用finishBatch方法
被Committer标记的Bolt:在commit阶段才调用finishBatch方法,commit具有强顺序性,标记Bolt为commit阶段执行finishBatch的方法有两种:1、实现ICommiter接口。2、TransactionalTopologyBuilder的setCommiterBolt来添加Bolt。
红色轮廓的Bolt被标记过为commit
Spout向Bolt A发送整个Batch
Bolt A处理完整个Batch之后调用finishBatch方法分别向Bolt B 和 Bolt C发送Batch
Bolt B接收到Bolt A传递过来的tuple进行处理(此时还尚未处理完毕)不会调用finishBatch方法
Bolt C接口Bolt A传递的tuple,尽管处理完Bolt A传递来的tuple,但是由于Bolt B还尚未commit,所以Bolt C处于等待Bolt B commit的状态,不会调用finishBatch方法
Bolt D接收来自Bolt C调用executor方法时发送的所有tuple
此时一旦Bolt B进行commit进行finishBatch操作,那么Bolt C就会确认接收到所有Bolt B的tuple,Bolt C也调用finishBatch方法,最终Bolt D也接收到所有来自Bolt C的batch。
在这里尽管Bolt D是一个committer,它在接收到整个batch的tuple之后不需要等待第二个commit信号。因为它是在commit阶段接收到的整个batch,它会调用finishBatch来完成整个事务。
注意,当使用transactional topology的时候你不需要显式地去做任何的acking或者anchoring,storm在背后都做掉了。(storm对transactional topolgies里面的acking机制进行了高度的优化)
在使用普通bolt的时候, 你可以通过调用OutputCollector的fail方法来fail这个tuple所在的tuple树。Transactional Topology对用户隐藏了acking框架, 它提供一个不同的机制来fail一个batch(从而使得这个batch被replay):只要抛出一个FailedException就可以了。跟普通的异常不一样, 这个异常只会导致当前的batch被replay, 而不会使整个进程崩溃掉。
TransactionalSpout接口跟普通的Spout接口完全不一样。一个TransactionalSpout的实现会发送一批一批(batch)的tuple, 而且必须保证同一批次tuples的transaction id始终一样。
在transactional topology运行的时候, transactional spout看起来是这样的一个结构:
coordinator是一个普通的storm的spout——它一直为事务的batch发射tuple。
Emitter则像一个普通的storm bolt,它负责为每个batch实际发射tuple,emitter以all grouping的方式订阅coordinator的”batch emit”流。
关于如何实现一个TransactionalSpout的细节可以参见Javadoc
一种常见的TransactionalSpout是那种从多个queue broker读取数据然后再发射的tuple。比如TransactionalKafkaSpout就是这样工作的。IPartitionedTransactionalSpout把这些管理每个分区的状态以保证可以replay的幂等性的工作都自动化掉了。更多可以参考Javadoc。
Transactional Topologies有两个重要的配置:
Zookeeper:默认情况下,transactional topology会把状态信息保存在一个zookeeper里面(协调集群的那个)。你可以通过这两个配置来指定其它的zookeeper:”transactional.zookeeper.servers” 和 “transactional.zookeeper.port“。
同时活跃的batch数量:你必须设置同时处理的batch数量,你可以通过”topology.max.spout.pending” 来指定, 如果你不指定,默认是1。
Transactional Topologies的实现是非常优雅的。管理提交协议,检测失败并且串行提交看起来很复杂,但是使用storm的原语来进行抽象是非常简单的。
1、transactional spout是一个子topology, 它由一个coordinator spout和一个emitter bolt组成。
2、coordinator是一个普通的spout,并行度为1;emitter是一个bolt,并行度为P,使用all分组方式连接到coordinator的“batch”流上。
3、coordinator使用一个acking框架决定什么时候一个batch被成功执行(process)完成,然后去决定一个batch什么时候被成功提交(commit)。
感谢各位的阅读,以上就是“Storm的Transactional Topology怎么配置”的内容了,经过本文的学习后,相信大家对Storm的Transactional Topology怎么配置这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。