from pyspark import SparkContext from pyspark.streaming import StreamingContext
# 在Spark Streaming中, 无法从 checkpoint 恢复 Accumulators 和 Broadcast 变量 . 如果启用 checkpoint 并使用 Accumulators 或 # Broadcast 变量 , 则必须为 Accumulators 和 Broadcast 变量创建延迟实例化的单例实例, 以便在 driver 重新启动失败后重新实例化. # Get or register a Broadcast variable def getWordBlacklist(sparkContext): if ('wordBlacklist' not in globals()): globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) return globals()['wordBlacklist']
# Get or register an Accumulator def getDroppedWordsCounter(sparkContext): if ('droppedWordsCounter' not in globals()): globals()['droppedWordsCounter'] = sparkContext.accumulator(0) return globals()['droppedWordsCounter']
def createContext(host, port, outputPath): # If you do not see this printed, that means the StreamingContext has been loaded from the new checkpoint print("Creating new context") if os.path.exists(outputPath): os.remove(outputPath) sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount") ssc = StreamingContext(sc, 10)
def echo(time, rdd): # Get or register the blacklist Broadcast blacklist = getWordBlacklist(rdd.context) # Get or register the droppedWordsCounter Accumulator droppedWordsCounter = getDroppedWordsCounter(rdd.context)
# Use blacklist to drop words and use droppedWordsCounter to count them def filterFunc(wordCount): if wordCount[0] in blacklist.value: droppedWordsCounter.add(wordCount[1]) return False else: return True
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) print(counts) print("Dropped %d word(s) totally" % droppedWordsCounter.value) print("Appending to " + os.path.abspath(outputPath)) with open(outputPath, 'a') as f: f.write(counts + "\n")
(发现 checkpoint) 2018-06-09 08:15:06 INFO CheckpointReader:54 - Checkpoint files found: hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773280000,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773280000.bk,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773270000,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773270000.bk,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773260000,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773260000.bk,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773250000,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773250000.bk (尝试加载 checkpoint) 2018-06-09 08:15:06 INFO CheckpointReader:54 - Attempting to load checkpoint from file hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773280000 2018-06-09 08:15:06 INFO Checkpoint:54 - Checkpoint for time 1533773280000 ms validated (成功读取 checkpoint) 2018-06-09 08:15:06 INFO CheckpointReader:54 - Checkpoint successfully loaded from file hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773280000 2018-06-09 08:15:06 INFO CheckpointReader:54 - Checkpoint was generated at time 1533773280000 ms 2018-06-09 08:15:06 INFO SparkContext:54 - Running Spark version 2.3.1 2018-06-09 08:15:06 INFO SparkContext:54 - Submitted application: PythonStreamingRecoverableNetworkWordCount ...
(继续接着上一次停止的位置开始执行) Counts at time 2018-06-09 08:08:00 [] Dropped 0 word(s) totally Appending to /home/zj/out Counts at time 2018-06-09 08:08:10 [] Dropped 0 word(s) totally Appending to /home/zj/out Counts at time 2018-06-09 08:08:20 [] ...