小编给大家分享一下spark 与flume 1.6.0的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
package hgs.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
/* pom.xml中加入如下配置
* <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.1.0</version>
</dependency>
*
*/
/*flume的conf文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/logs
a1.sources.r1.fileHeader=true
a1.sinks.k1.type=avro
a1.sinks.k1.hostname= 192.168.1.9
a1.sinks.k1.port= 8888
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
#the command to start a agent
#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
*/
object SparkStreamingFlumePush {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]");
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("d:\\checkpoint")
val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
//iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一
//iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二
iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三
}
//总共有两种获取数据的方式,push和poll,这种是push即flume将数据推送给spark 该出的ip、port是spark的ip地址和port
val rds = FlumeUtils.createStream(ssc, "192.168.1.9", 8888, StorageLevel.MEMORY_ONLY)
val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" "))
.map(x=>(x,1))
.updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
package hgs.spark.streaming
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import java.net.InetAddress
import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
//spark支持1.6.0的flume版本
/* pom.xml中加入如下配置
* <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.1.0</version>
</dependency>
*
*/
/*
* flume配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.6.129
a1.sinks.k1.port = 8888
a1.channels.c1.type=memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel = c1
#the command to start a agent
#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
*/
//同时需要如下三个包 将三个包放到flume的classpath下面
/* groupId = org.apache.spark
artifactId = spark-streaming-flume-sink_2.11
version = 2.1.0
groupId = org.scala-lang
artifactId = scala-library
version = 2.11.7
groupId = org.apache.commons
artifactId = commons-lang3
version = 3.5*/
object SparkStreamingFlumePoll {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]");
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("d:\\checkpoint")
val ipSeq = Seq(new InetSocketAddress("192.168.6.129",8888))
//这种方式通过spark从flume拉取数据
val rds = FlumeUtils.createPollingStream(ssc, ipSeq, StorageLevel.MEMORY_AND_DISK)
val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
//iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一
//iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二
iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三
}
val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" "))
.map(x=>(x,1))
.updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
//遇到的错误 scala-library包在flume 的lib下面本来就有,包重复导致的冲突,删除一个
/*18 Oct 2018 20:58:32,123 WARN [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) - Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18 Oct 2018 20:58:32,128 WARN [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59) - Spark was unable to successfully process the events. Transaction is being rolled back.
18 Oct 2018 20:58:32,128 WARN [New I/O worker #1] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59) - Received an error batch - no events were received from channel! */
看完了这篇文章,相信你对“spark 与flume 1.6.0的示例代码”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/31506529/viewspace-2216840/