Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树。当acker发现一个Tuple树已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail()。
Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。
我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple。但很遗憾的是,框架不会自动重新发送,需要我们自己手工编码实现。后续给大家实战案例!
什么是Tuple树?
Spout类代码如下:
package les19.Ack_Fail;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class AckSpout implements IRichSpout{
/**
*
*/
private static final long serialVersionUID = 1L;
FileInputStream fis;
InputStreamReader isr;
BufferedReader br;
private ConcurrentHashMap<Object, Values> _pending;//线程安全的Map,存储emit过的tuple
private ConcurrentHashMap<Object, Integer> fail_pending;//存储失败的tuple和其失败次数
SpoutOutputCollector collector = null;
String str = null;
@Override
public void nextTuple() {
try {
while ((str = this.br.readLine()) != null) {
// 过滤动作
UUID msgId = UUID.randomUUID();
String arr[] = str.split("\t");
String date = arr[2].substring(0, 10);
String orderAmt = arr[1];
Values val = new Values(date,orderAmt);
this._pending.put(msgId, val);
collector.emit(val, msgId);
System.out.println("_pending.size()="+_pending.size());
}
} catch (Exception e) {
// TODO: handle exception
}
}
@Override
public void close() {
// TODO Auto-generated method stub
try {
br.close();
isr.close();
fis.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
@Override
//初始化函数
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.collector = collector;
this.fis = new FileInputStream("order.log");
this.isr = new InputStreamReader(fis, "UTF-8");
this.br = new BufferedReader(isr);
_pending = new ConcurrentHashMap<Object, Values>();
fail_pending = new ConcurrentHashMap<Object, Integer>();
} catch (Exception e) {
e.printStackTrace();
}
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("date","orderAmt"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
System.out.println("_pending size 共有:"+_pending.size());
System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass());
this._pending.remove(msgId);
System.out.println("_pending size 剩余:"+_pending.size());
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
System.out.println("spout fail:"+msgId.toString());
Integer fail_count = fail_pending.get(msgId);//获取该Tuple失败的次数
if (fail_count == null) {
fail_count = 0;
}
fail_count ++ ;
if (fail_count>=3) {
//重试次数已满,不再进行重新emit
fail_pending.remove(msgId);
}else {
//记录该tuple失败次数
fail_pending.put(msgId, fail_count);
//重发
this.collector.emit(this._pending.get(msgId), msgId);
}
}
}
Bolt如下:
package les19.Ack_Fail;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class AckBolt implements IRichBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
TopologyContext context = null;
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
int num = 0;
String url = null;
String session_id = null;
String date = null;
String province_id = null;
@Override
public void execute(Tuple input) {
try {
date = input.getStringByField("date") ;
Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt"));
collector.emit(input,new Values(date,orderAmt));//注意参数,第一个参数是Tuple本身
collector.ack(input);
// Thread.sleep(300);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
//初始化,对应spout的open函数
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method
this.context = context ;
this.collector = collector ;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("date","orderAmt")) ;
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
TOPO类如下:
package les19.Ack_Fail;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
public class Ack_FailTopo {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new AckSpout(), 1);
builder.setBolt("bolt", new AckBolt(), 1).shuffleGrouping("spout");
Config conf = new Config() ;
//conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
conf.setDebug(false);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());
}
}
}
想了解更多,见我的51CTO上的Storm视频教程http://edu.51cto.com/course/course_id-9041.html
,本节来自第18-19讲。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。