storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
package hgs.core.sk;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
//参考如下
//https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html
//https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52
public class StormKafkaMainTest {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//该类将传入的kafka记录转换为storm的tuple
ByTopicRecordTranslator<String,String> brt =
new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7"));
//设置要消费的topic即test7
brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7"));
//类似之前的SpoutConfig
KafkaSpoutConfig<String,String> ksc = KafkaSpoutConfig
//bootstrapServers 以及topic(test7)
.builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7")
//设置group.id
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test")
//设置开始消费的气势位置
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
//设置提交消费边界的时长间隔
.setOffsetCommitPeriodMs(10_000)
//Translator
.setRecordTranslator(brt)
.build();
builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2);
builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout");
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(0);
try {
StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
/* LocalCluster cu = new LocalCluster();
cu.submitTopology("test", config, builder.createTopology());*/
}
}
class MyboltO extends BaseRichBolt{
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
//这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容
String out = input.getString(0);
System.out.println(out);
//collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hgs</groupId>
<artifactId>core.sk</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>core.sk</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!--
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.3</version>
</dependency>
-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-monitor</artifactId>
<version>1.2.2</version>
</dependency> -->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency> -->
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<!-- 尝试了很多次 都会有这个错误:
java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272)
最后修改为kafka相应的kafka-clients版本后问题得到解决,应该是该出的问题
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<configuration>
<archive>
<manifest>
<!-- 我运行这个jar所运行的主类 -->
<mainClass>hgs.core.sk.StormKafkaMainTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>
<!-- 必须是这样写 -->
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
//以下为lambda表达式,因为在上面用大了,所以在这儿记录一下,以免以后看不懂
import java.util.UUID;
import org.junit.jupiter.api.Test;
public class TEst {
@Test
public void sysConfig() {
String[] ags = {"his is my first storm program so i hope it will success",
"i love bascketball",
"the day of my birthday i was alone"};
String uuid = UUID.randomUUID().toString();
String nexttuple= ags[new Random().nextInt(ags.length)];
System.out.println(nexttuple);
}
@Test
public void lambdaTest() {
int b = 100;
//该出返回10*a的值、
//"(a) -> 10*a" 相当于 new testinter<T>();
printPerson((a) -> 10*a) ;
}
void printPerson( testinter<Integer> t) {
//穿过来的t需要一个参数a 即下面借口中定义的方法sysoutitems(int a )
System.out.println(t.sysoutitems(100));
};
}
//定义接口,在lambda表达式运用中,必须为借口,并且借口只能有一个方法
interface testinter<T>{
T sysoutitems(int a );
//void aAndb(int a, int b );
}
看完上述内容,你们掌握storm-kafka-client使用的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/31506529/viewspace-2215095/