本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
Hive表中存有UserPhone跟LinkPhone 两个字段。 通过SparkSQL计算出UserPhone之间通讯录相似度>=80%的记录数据。
相似度 = A跟B的交集/A的通讯录大小。
注意依赖之间的适配性,选择合适的版本。同时一般可能会吧Hive中conf/hive-site.xml配置文件拷贝一份到 IDEA目录
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sowhat.demo</groupId>
<artifactId>PhoneBookSimilaryCal</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- <properties>-->
<!-- <mysql.version>6.0.5</mysql.version>-->
<!-- <spring.version>4.3.6.RELEASE</spring.version>-->
<!-- <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>-->
<!-- <log4j.version>1.2.17</log4j.version>-->
<!-- <quartz.version>2.2.3</quartz.version>-->
<!-- <slf4j.version>1.7.22</slf4j.version>-->
<!-- <hibernate.version>5.2.6.Final</hibernate.version>-->
<!-- <camel.version>2.18.2</camel.version>-->
<!-- <config.version>1.10</config.version>-->
<!-- <jackson.version>2.8.6</jackson.version>-->
<!-- <servlet.version>3.0.1</servlet.version>-->
<!-- <net.sf.json.version>2.4</net.sf.json.version>-->
<!-- <activemq.version>5.14.3</activemq.version>-->
<!-- <spark.version>2.1.1</spark.version>-->
<!-- <scala.version>2.11.8</scala.version>-->
<!-- <hadoop.version>2.7.3</hadoop.version>-->
<!-- </properties>-->
<properties>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11.8</scala.compat.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.7.2</hadoop.version>
<hbase.version>1.0</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
<build>
<finalName>PhoneBookSimilaryCal</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<!-- 这个组件让我们不用再 在项目上add frame 选择scala了,可以自动创建 *.scala 文件 -->
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.sowhat.PhoneBookSimilaryCal</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!--名字任意 -->
<phase>package</phase>
<!-- 绑定到package生命周期阶段上 -->
<goals>
<goal>single</goal>
<!-- 只运行一次 -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.sowhat
/**
* @author sowhat
* @create 2020-07-02 16:30
*/
import java.security.MessageDigest
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.{Logger, LoggerFactory}
object PhoneBookSimilaryCal {
def MD5(input: String): String = {
var md5: MessageDigest = null
try {
md5 = MessageDigest.getInstance("MD5")
} catch {
case e: Exception => {
e.printStackTrace
println(e.getMessage)
}
}
val byteArray: Array[Byte] = input.getBytes
val md5Bytes: Array[Byte] = md5.digest(byteArray)
var hexValue: String = ""
for (i <- 0 to md5Bytes.length - 1) {
val str: Int = (md5Bytes(i).toInt) & 0xff
if (str < 16) {
hexValue = hexValue + "0"
}
hexValue = hexValue + Integer.toHexString(str)
}
return hexValue.toString
}
def Yesterday = {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val cal: Calendar = Calendar.getInstance()
cal.add(Calendar.DATE, -1)
dateFormat.format(cal.getTime)
}
def OneYearBefore = {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal: Calendar = Calendar.getInstance()
cal.add(Calendar.YEAR, -1)
dateFormat.format(cal.getTime())
}
def SixMonthBefore = {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal: Calendar = Calendar.getInstance()
cal.add(Calendar.MONTH, -6)
dateFormat.format(cal.getTime)
}
def ThreeMonthBefore = {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal: Calendar = Calendar.getInstance()
cal.add(Calendar.MONTH, -3)
dateFormat.format(cal.getTime)
}
def OneMonthBefore = {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal: Calendar = Calendar.getInstance()
cal.add(Calendar.MONTH, -1)
dateFormat.format(cal.getTime)
}
private val logger: Logger = LoggerFactory.getLogger(PhoneBookSimilaryCal.getClass)
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "yjy_research") // sparkSQL用到Hadoop的东西,所以权限用户要注意哦
val spark: SparkSession = SparkSession.builder().appName("phoneBookSimilaryCal")
.config("spark.sql.shuffle.partitions", "1000")
.config("spark.default.parallelism", "3000")
.config("spark.driver.maxResultSize", "40g")
//.conf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.shuffle.io.maxRetries", "20")
.config("spark.shuffle.io.retryWait", "10s")
.config("spark.storage.memoryFraction", "0.5")
.config("spark.shuffle.memoryFraction", "0.5")
.config("executor-cores", "5")
.config("spark.executor.instances", "10")
.config("spark.executor.cores.config", "3000")
.config("spark.executor.instances", "20")
.config("spark.executor.memory", "40g")
.config("spark.driver.memory", "40g")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport().getOrCreate() // 开启Hive table
spark.sql("use dm_kg")
val sqlText: String = "select user_phone,phone from user_phone_with_phone_message where user_phone not in( '59400a197e9bf5fbb2fbee0456b66cd6','f7e82e195810a01688db2eeecb8e56c9') and etl_date>'" + SixMonthBefore + "'"
println(sqlText)
val df: DataFrame = spark.sql(sqlText)
val rdd: RDD[Row] = df.rdd
def getUserPhoneAndPhone(iter: Iterator[Row]) = {
var res: List[(String, String)] = List[(String, String)]()
while (iter.hasNext) {
val row: Row = iter.next()
res = res.::(row.getString(0), row.getString(1))
}
res.iterator
}
val userPhone_Phone: RDD[(String, String)] = rdd.mapPartitions(getUserPhoneAndPhone)
userPhone_Phone.persist(StorageLevel.MEMORY_AND_DISK_SER)
val userPhone_num: RDD[(String, Long)] = userPhone_Phone.map(x => (x._1, 1L)).reduceByKey(_ + _, 3000)
def dealUserPhoneNum(iter: Iterator[(String, Long)]) = {
var res: List[(String, String)] = List[(String, String)]()
while (iter.hasNext) {
val row: (String, Long) = iter.next()
res.::=(row._1, row._1.concat("_").concat(row._2.toString))
}
res.iterator
}
val userPhone_userPhoneNum: RDD[(String, String)] = userPhone_num.mapPartitions(dealUserPhoneNum)
val userPhone_Phone_userPhoneNum: RDD[(String, (String, String))] = userPhone_Phone.join(userPhone_userPhoneNum, 3000)
val userPhone_Phone_userPhoneNum_filter: RDD[(String, (String, String))] = userPhone_Phone_userPhoneNum.filter(x => x._2._2.split("_")(1).toLong != 1)
def getSecondTuple(iter: Iterator[(String, (String, String))]) = {
var res = List[(String, String)]()
while (iter.hasNext) {
val tuple: (String, (String, String)) = iter.next()
res.::=(tuple._2)
}
res.iterator
}
val phone_userPhoneNum: RDD[(String, String)] = userPhone_Phone_userPhoneNum_filter.mapPartitions(getSecondTuple)
val phone_userPhoneListWithSize: RDD[(String, (List[String], Int))] = phone_userPhoneNum.combineByKey(
(x: String) => (List(x), 1),
(old: (List[String], Int), x: String) => (x :: old._1, old._2 + 1),
(par1: (List[String], Int), par2: (List[String], Int)) => (par1._1 ::: par2._1, par1._2 + par2._2)
) // 结果 (联系电话,(对应用户电话List,List大小))
val userPhoneList: RDD[List[String]] = phone_userPhoneListWithSize.filter(x => (x._2._2 < 1500 && x._2._2 > 1)).map(_._2._1)
// 通讯录大小 (1,1500) 筛查出来
val userPhone_userPhone: RDD[List[String]] = userPhoneList.flatMap(_.sorted.combinations(2))
// https://blog.csdn.net/aomao4913/article/details/101274895
val userPhone_userPhone_Num: RDD[((String, String), Int)] = userPhone_userPhone.map(x => ((x(0), x(1)), 1)).reduceByKey(_ + _, 3000)
// 获得 (UserPhone1,UserPhone2),LinkNum
def dealData(iter: Iterator[((String, String), Int)]) = {
var res = List[(String, String, Int)]()
while (iter.hasNext) {
val row: ((String, String), Int) = iter.next()
val line = row._1.toString.split(",") // (userPhone_num,userPhone_num)
res.::=(line(0).replace("(", ""), line(1).replace(")", ""), row._2)
}
res.iterator
}
val userPhone_num_with_userPhone_num_with_commonNum: RDD[(String, String, Int)] = userPhone_userPhone_Num.mapPartitions(dealData)
def FirstToSecond(iter: Iterator[(String, String, Int)]) = {
var res = List[(String, String, Long, Int)]()
while (iter.hasNext) {
val cur: (String, String, Int) = iter.next
val itemList1: Array[String] = cur._1.toString.split("_")
val itemList2: Array[String] = cur._2.toString.split("_")
res.::=(itemList1(0), itemList2(0), itemList1(1).toLong, cur._3)
}
res.iterator
} // userPhone1,userPhone2,userPhone1BookNum,CommonNum
def SecondToFirst(iter: Iterator[(String, String, Int)]) = {
var res = List[(String, String, Long, Int)]()
while (iter.hasNext) {
val cur: (String, String, Int) = iter.next
val itemList1: Array[String] = cur._1.toString.split("_")
val itemList2: Array[String] = cur._2.toString.split("_")
res.::=(itemList2(0), itemList1(0), itemList2(1).toLong, cur._3)
}
res.iterator
} // userPhone2,userPhone1,userPhone2BookNum,CommonNum
val userPhone1_userPhone2_userPhone1BookNum_CommonNum_1: RDD[(String, String, Long, Int)] = userPhone_num_with_userPhone_num_with_commonNum.mapPartitions(FirstToSecond).filter(_._3 > 1)
val userPhone2_userPhone1_userPhone2BookNum_CommonNum_2: RDD[(String, String, Long, Int)] = userPhone_num_with_userPhone_num_with_commonNum.mapPartitions(SecondToFirst).filter(_._3 > 1)
val userPhone1_userPhone2_userPhone1BookNum_CommonNum: RDD[(String, String, Long, Int)] = userPhone2_userPhone1_userPhone2BookNum_CommonNum_2.union(userPhone1_userPhone2_userPhone1BookNum_CommonNum_1)
def finalDeal(iter: Iterator[(String, String, Long, Int)]) = {
var res = List[(String, Long, String, String, Long, String)]()
while (iter.hasNext) {
val cur: (String, String, Long, Int) = iter.next()
res.::=(cur._1.toString, cur._4 * 100 / cur._3, cur._2.toString, "Similar_phoneBook", cur._3, Yesterday)
}
res.iterator
} // user_phone1,percent,user_phone2,label,userPhone1BookNum,CalDate
val userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate: RDD[(String, Long, String, String, Long, String)] = userPhone1_userPhone2_userPhone1BookNum_CommonNum.mapPartitions(finalDeal)
val userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter: RDD[(String, Long, String, String, Long, String)] = userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate.filter(_._2 >= 80)
import spark.implicits._
val finalResult: DataFrame = userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter.toDF()
printf("·:" + userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter.collect().length)
spark.sql("drop table if exists sowhat_similar_phonebook_result")
spark.sql("CREATE TABLE IF NOT EXISTS sowhat_similar_phonebook_result" +
"(startId string comment '起始节点ID'," +
"similar_percent string comment '相似度'," +
"endId string comment '终止节点ID'," +
"type string comment '边的类型'," +
"telbook_num long comment '通讯录个数'," +
"etl_date Date comment 'etl日期') " +
"row format delimited fields terminated by ',' ")
logger.info("created table similar_phonebook_result")
finalResult.createOrReplaceTempView("resultMessage")
spark.sql("insert into sowhat_similar_phonebook_result select * from resultMessage")
spark.sql("select count(1) from sowhat_similar_phonebook_result").show()
spark.stop()
}
}
spark集群启动脚本命令:
time sshpass -p passpwrd ssh user@ip " nohup
spark-submit --name "sowhatJob" --master yarn --deploy-mode client \
--conf spark.cleaner.periodicGC.interval=120 --conf spark.executor.memory=20g \
--conf spark.num.executors=20 --conf spark.driver.memory=20g --conf spark.sql.shuffle.partitions=1500 \
--conf spark.network.timeout=100000000 --queue root.kg \ (Hadoop集群中YARN队列)
--class com.sowhat.PhoneBookSimilaryCal PhoneBookSimilaryCal1.jar "
“Spark通讯录相似度计算怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4511602/blog/4826084