Spark Streaming 热点搜索词滑动统计(pyspark)
创建时间:
字数:539
使用滑动窗口操作,实现热点搜索词的实时统计
Window 滑动窗口操作
普通的 DStream 操作都是操作一个 batch, 而 Window 操作用于同时处理多个 batch.把合并在一起的多个 batch 称为一个 Window, Window 中的 batch 个数称为窗口长度(window length), 两次 Window 操作之间的间隔称为滑动间隔(sliding interval). 窗口长度和滑动间隔都要设置为 batch 间隔的整数倍.
比如一个Spark Streaming编写的 wordcount 程序 batch 间隔为 5 秒, 窗口长度为 30 秒, 滑动间隔为 10 秒. 就意味着每隔 10 秒, 处理一次之前 6 个 batch 的 wordcount 算子.
案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| from pyspark import SparkContext from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "window_hotword") ssc = StreamingContext(sc, 5) ssc.checkpoint("hdfs://127.0.0.1:9000/window_checkpoint")
# 接收实时的搜索词数据, 格式 "name word" search_logs_dstream = ssc.socketTextStream("127.0.0.1", 9999) # 将日志转换为只有一个搜索词 search_words_dstream = search_logs_dstream.map(lambda line: line.split(" ")[1]) # 将搜索词映射为 (word, 1) 的格式 search_word_pairs_dstream = search_words_dstream.map(lambda word: (word, 1))
# 对 search_word_pairs_dstream 进行窗口操作 """ python 的 reduceByKeyAndWindow 和 scala/java 的略有不同, scala/java 只传一个进行reduce的函数 python 需要传两个, 第一个表示对新进入 window 的 batch 进行的reduce 操作, 第二个表示对离开 window 的 batch 进行的 reduce 操作 第二个参数可以为None, 这样就表示要对window中所有的 batch 执行 reduce 操作,这样相对来说会降低性能,特别是在滑动间隔比较长的时候 """ search_word_count_dstream = search_word_pairs_dstream.reduceByKeyAndWindow(lambda a, b: a+b, lambda a, b: a-b, 30, 10)
def transform(rdd): # 将 rdd 转换为 (count, word) 格式 count_word_rdd = rdd.map(lambda row: (row[1], row[0])) # 对 key 进行倒序排序 sorted_rdd = count_word_rdd.sortByKey(False) # 将 rdd 转换为 (word, count) 格式 word_count_rdd = sorted_rdd.map(lambda row: (row[1], row[0])) # 取前三 top3_word_count = word_count_rdd.take(3) rdd = word_count_rdd.filter(lambda row: row in top3_word_count) # for word_count in top3_word_count: # print("***********" + str(word_count) + "*************") return rdd
# 对 window 中的词频进行排序 final_dstream = search_word_count_dstream.transform(lambda rdd: transform(rdd))
final_dstream.pprint()
ssc.start() ssc.awaitTermination()
|
文章标题:Spark Streaming 热点搜索词滑动统计(pyspark)
文章字数:539
本文作者:Waterandair
发布时间:2018-04-25, 11:20:47
最后更新:2019-12-28, 14:03:59
原始链接:https://waterandair.github.io/2018-04-25-spark-streaming-window.html
版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。