这篇文章主要介绍“Storm中怎么定义Blots程序”,在日常操作中,相信很多人在Storm中怎么定义Blots程序问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm中怎么定义Blots程序”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
具体内容
在Storm程序开发过程之中有两个重要的核心概念:Spout、Blot。Spout会作为数据的发起点,这个数据可能来源于各种地方,例如:Kafka传递过来的消息内容,而每一个Spout接收到数据之后如果有需要则将数据传递给Blot,由多个Blot进行数据的操作处理。把每一个Blot想象为一个数据的过滤器,而最后一个Blot将作为数据的存储使用,而一般的存储设备往往是文件、Redis数据库。
在本次程序的处理里面,Spouts将使用随机数产生相应的年龄数据,而有会有三个Blot进行数据的处理,这些数据处理是有自己严格的语法要求的。如果要想实现Storm开发,则需要将Storm所有的相关库文件配置到ClassPath之中。如果要想开发Spout往往需要实现一个IRickSpout接口,但是这个接口里面的方法比较多,所以建议继承这个接口的子类:BaseRichSpout。范例:定义InfoCreateSpouts
package cn.mldn.info.spouts;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;importorg.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
@SuppressWarnings("serial")
publi cclass InfoCreateSpoutextendsBaseRichSpout{
private SpoutOutputCollector collector=null;
private String nameStr="aBcDefghIjklmnopQrStuvwxyz";//假设为用户名
private Random rand=new Random();
@SuppressWarnings("rawtypes")
@Override
public void open(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
//为Spout初始化方法,这个初始化方法只执行一次;
this.collector=collector;//接收初始化方法中的SpoutOutputCollector对象 }
@Override
publicvoidnextTuple()
{
//执行Spout程序时会自动找到此方法,此方法为发送消息
//从正常的开发角度而言,此处的数据应该由消息系统传递过来
String nameInfo=String.valueOf(this.nameStr.charAt(rand.nextInt(nameStr.length())));intageInfo=this.rand.nextInt(150);//随机生成一个年龄
//最终如果要进行数据的发送,结构:name、age,所有的Spouts的数据要交给Blot完成。
this.collector.emit(newValues(nameInfo,ageInfo));
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclare){
//需要定义每一个传输中的数据保存的名称
declare.declare(newFields("name","age"));
//向后发送Tuple的时候此为信息的文字标注}
}
在整个的代码里面,nextTuple()为系统自动调用,Spout产生Blot所需要的数据。
定义Blots程序
在本处理流程之中需要有三个Blot,作用分别如下:
·AgeJudementBlot:判断传入数据的年龄是成年人(ADULT)还是年轻人(YOUNG);·NameUpperBlot:因为姓名有大写和小写,为了统一管理,信息都变为大写;·FinalBlot:进行数据的保存处理。但是需要注意的是,此时定义的只是一个个独立的Blot,彼此之间的联系需要通过程序来完成。范例:进行年龄判断的Blot实现.
package cn.mldn.info.blots;import org.apache.storm.topology.BasicOutputCollector;importorg.apache.storm.topology.OutputFieldsDeclarer;importorg.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;@SuppressWarnings("serial")public class AgeJudementBlotextendsBaseBasicBolt{ @Override public void execute(Tupletuple,BasicOutputCollectorcollector){
范例:处理姓名大小写的操作
范例:增加一个输出到文件的Blot
这些定义的Blot一定要接收Spout传来的数据,但是这些Blot之间没有直接的联系,所有的关系都必须通过程序动态配置。
编写测试程序
Storm最大的好处是直接提供了本地的windows模拟测试,但是在配置过程里面需要配置这些bolts关系。范例:编写测试程序
到此,关于“Storm中怎么定义Blots程序”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。