Spark Streaming 热点搜索词滑动统计(pyspark)

  1. Window 滑动窗口操作
  2. 案例

使用滑动窗口操作,实现热点搜索词的实时统计

Window 滑动窗口操作

image

普通的 DStream 操作都是操作一个 batch, 而 Window 操作用于同时处理多个 batch.把合并在一起的多个 batch 称为一个 Window, Window 中的 batch 个数称为窗口长度(window length), 两次 Window 操作之间的间隔称为滑动间隔(sliding interval). 窗口长度和滑动间隔都要设置为 batch 间隔的整数倍.

比如一个Spark Streaming编写的 wordcount 程序 batch 间隔为 5 秒, 窗口长度为 30 秒, 滑动间隔为 10 秒. 就意味着每隔 10 秒, 处理一次之前 6 个 batch 的 wordcount 算子.

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "window_hotword")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("hdfs://127.0.0.1:9000/window_checkpoint")

# 接收实时的搜索词数据, 格式 "name word"
search_logs_dstream = ssc.socketTextStream("127.0.0.1", 9999)
# 将日志转换为只有一个搜索词
search_words_dstream = search_logs_dstream.map(lambda line: line.split(" ")[1])
# 将搜索词映射为 (word, 1) 的格式
search_word_pairs_dstream = search_words_dstream.map(lambda word: (word, 1))

# 对 search_word_pairs_dstream 进行窗口操作
"""
python 的 reduceByKeyAndWindow 和 scala/java 的略有不同,
scala/java 只传一个进行reduce的函数
python 需要传两个, 第一个表示对新进入 window 的 batch 进行的reduce 操作, 第二个表示对离开 window 的 batch 进行的 reduce 操作
第二个参数可以为None, 这样就表示要对window中所有的 batch 执行 reduce 操作,这样相对来说会降低性能,特别是在滑动间隔比较长的时候
"""
search_word_count_dstream = search_word_pairs_dstream.reduceByKeyAndWindow(lambda a, b: a+b, lambda a, b: a-b, 30, 10)


def transform(rdd):
# 将 rdd 转换为 (count, word) 格式
count_word_rdd = rdd.map(lambda row: (row[1], row[0]))
# 对 key 进行倒序排序
sorted_rdd = count_word_rdd.sortByKey(False)
# 将 rdd 转换为 (word, count) 格式
word_count_rdd = sorted_rdd.map(lambda row: (row[1], row[0]))
# 取前三
top3_word_count = word_count_rdd.take(3)
rdd = word_count_rdd.filter(lambda row: row in top3_word_count)
# for word_count in top3_word_count:
# print("***********" + str(word_count) + "*************")
return rdd


# 对 window 中的词频进行排序
final_dstream = search_word_count_dstream.transform(lambda rdd: transform(rdd))

final_dstream.pprint()

ssc.start()
ssc.awaitTermination()

文章标题:Spark Streaming 热点搜索词滑动统计(pyspark)

文章字数:539

本文作者:Waterandair

发布时间:2018-04-25, 11:20:47

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-04-25-spark-streaming-window.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github