Spark Streaming 与 spark sql 结合实时统计点击量top3商品

  1. 说明
  2. 案例

Spark Streaming 结合 Spark Sql,每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3热门的商品

说明

Spark Streaming最强大的地方在于,可以与SparkCore、SparkSQL整合使用,通过transform、foreachRDD等算子可以将DStream中的RDD使用Spark Core执行批处理操作, RDD 也可以转换为临时表,这样就可以使 Spark Streaming 和 Spark SQL结合使用。

案例

每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3热门的商品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/ python3
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.types import Row
from pyspark.sql import SparkSession

"""
案例: top3 热门商品实时统计
"""
sc = SparkContext("local[2]", "steaming_sql_top3")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("hdfs://127.0.0.1:9000/checkpoint/steaming_sql_top3/")

# 接收点击数据 "username product category"
product_click_logs_dstream = ssc.socketTextStream("127.0.0.1", 9999)

# 映射为 (category_product, 1), 方便进行window操作统计每种类别的每个商品的点击次数
category_pro_dstream = product_click_logs_dstream.map(lambda row: (row.split(" ")[2] + "_" + row.split(" ")[1], 1))

# 执行 window 操作, 每隔 60 秒统计每个种类每个商品的点击次数
category_pro_counts_dstream = category_pro_dstream.reduceByKeyAndWindow(lambda a, b: a+b, lambda a, b: a-b, 60, 10)


def top3(rdd):
# 使用反射推断Schema
rdd = rdd.map(lambda row: Row(category=row[0].split("_")[0], product=row[0].split("_")[1], click_count=row[1]))
spark = SparkSession \
.builder \
.appName("steaming_sql_top3") \
.getOrCreate()
# rdd 转换为 dataFrame
category_pro_counts_schema = spark.createDataFrame(rdd)
# 注册为临时表
category_pro_counts_schema.createOrReplaceTempView("product_click_log")
# top3
top3_product_df = spark.sql(
"SELECT * FROM ("
"SELECT *, row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "
"FROM product_click_log ) tmp "
"WHERE rank <= 3")
top3_product_df.show()


category_pro_counts_dstream.foreachRDD(lambda rdd: top3(rdd))

ssc.start()
ssc.awaitTermination()

"""
测试数据: nc -lk 9999
zj iphone1 phone
zj iphone2 phone
zj iphone2 phone
zj iphone3 phone
zj iphone3 phone
zj iphone3 phone
zj iphone4 phone
zj iphone4 phone
zj iphone4 phone
zj iphone4 phone
zj car1 car
zj car2 car
zj car2 car
zj car3 car
zj car3 car
zj car3 car
zj car4 car
zj car4 car
zj car4 car
zj car4 car
zj shoes1 shoes
zj shoes2 shoes
zj shoes2 shoes
zj shoes3 shoes
zj shoes3 shoes
zj shoes3 shoes
zj shoes4 shoes
zj shoes4 shoes
zj shoes4 shoes
zj shoes4 shoes
"""

"""
返回结果:
+--------+-----------+-------+----+
|category|click_count|product|rank|
+--------+-----------+-------+----+
| shoes| 4| shoes4| 1|
| shoes| 3| shoes3| 2|
| shoes| 2| shoes2| 3|
| phone| 4|iphone4| 1|
| phone| 3|iphone3| 2|
| phone| 2|iphone2| 3|
| car| 4| car4| 1|
| car| 3| car3| 2|
| car| 2| car2| 3|
+--------+-----------+-------+----+
"""

文章标题:Spark Streaming 与 spark sql 结合实时统计点击量top3商品

文章字数:581

本文作者:Waterandair

发布时间:2018-05-02, 11:20:47

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-05-02-spark-streaming-sql-top3.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github