测试思路:
首先,创建网络数据源数据发送器(程序一);
其次,创建spark接收数据程序(程序二);
接着,将程序一打包,放在服务器上执行。这里有三个参数分别是:所要发送的数据文件,通过哪个端口号发送,每隔多少毫秒发送一次数据;
最后,运行spark程序,这里每隔5秒处理一次数据。有两个参数:监听的端口号,每隔多少毫秒接收一次数据。
观察效果。
程序一:
sparkStreaming
import java.io.PrintWriter
import java.net.ServerSocket
import scala.io.Source
object SalaSimulation {
(length: ) = {
java.util.Random
rdm = Random
rdm.nextInt(length)
}
(args: Array[]){
(args.length != ){
System..println()
System.()
}
filename = args()
lines = Source.(filename).getLines.toList
filerow = lines.length
listener = ServerSocket(args().toInt)
(){
socket = listener.accept()
Thread(){
= {
(+socket.getInetAddress)
out = PrintWriter(socket.getOutputStream())
(){
Thread.(args().toLong)
content = lines((filerow))
(content)
out.write(content +)
out.flush()
}
socket.close()
}
}.start()
}
}
}
程序二:
sparkStreaming
import org.apache.log4j.{LoggerLevel}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{SecondsStreamingContext}
import org.apache.spark.{SparkContextSparkConf}
import org.apache.spark.streaming.StreamingContext._
object NetworkWordCount {
def main(args: Array[]){
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
conf = SparkConf().setAppName().setMaster()
sc = SparkContext(conf)
ssc = StreamingContext(sc())
lines = ssc.socketTextStream(args()args().toIntStorageLevel.)
words = lines.flatMap(_.split())
wordCounts = words.map(x=>(x)).reduceByKey(_+_)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。