2019独角兽企业重金招聘Python工程师标准>>>
package com.hhb.spark.core
import org.apache.spark.{SparkConf, SparkContext}
import com.alibaba.fastjson.JSON
/**
* 标签生成器
* Created by dell on 2017/9/20.
*/
object TagGenerator {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("TagGenerator").setMaster("local")
.set("spark.testing.memory", "2147480000")
val sc = new SparkContext(conf)
// 加载文件
val rdd1 = sc.textFile("c://temptags.txt")
// 过滤文件
val rdd2 = rdd1.map(line => {
val arr = line.split("\\t")// 切割
val busId = arr(0)
val text = arr(1)
// 将字符串转为json对象
val jo = JSON.parseObject(text)
val jarr = jo.getJSONArray("extInfoList")
if(jarr != null && jarr.size() > 0){
val v1 = jarr.getJSONObject(0)
val arr2 = v1.getJSONArray("values")
if (arr2 != null && arr2.size() > 0){
var str = ""
var i = 0
while (i < arr2.size()) {
str = str + arr2.getString(i) + ","
i += 1
}
(busId, str.substring(0, str.length - 1))
}
else (busId, "")
}
else (busId, "")
})
// 过滤,没有评论的过滤掉
val rdd3 = rdd2.filter(t => {
t._2 != null && !"".equals(t._2)
})
// 按照value压扁
val rdd4 = rdd3.flatMapValues(_.split(","))
// 重组key busId-comm, 1
val rdd5 = rdd4.map(t=>{
(t._1 + "-" + t._2, 1)
})
// 聚合
val rdd6 = rdd5.reduceByKey(_ + _)
// 变换成(busId, (comm, count))
val rdd7 = rdd6.map(t => {
val arr = t._1.split("-")
(arr(0), (arr(1), t._2) :: Nil)
})
// 安装busId进行聚合,values是list
val rdd8 = rdd7.reduceByKey(_ ++ _)
// 按key降序排序
val rdd9 = rdd8.map(t => {
val x = t._2.sortBy(t=>{
t._2
}).reverse.take(5)
(t._1, x)
})
val rdd99 = rdd9.sortBy(t => {
t._2(0)._2
},false,1)
// 变换成(busId, )
val rdd10 = rdd99.map(t =>{
val col = t._2
var desc = "";
for (tt <- col){
desc = desc + tt._1 + "(" + tt._2 + "),"
}
(t._1,desc.substring(0, desc.length-1))
})
rdd10.foreach(println)
sc.stop()
}
}