Spark Streaming 使用 checkpoint 做恢复

  1. 在 Wordcount 程序中加入 checkpoint 机制
  2. 实验日志
    1. 第一次启动
    2. CTRL+C 停止后重新启动

Streaming 程序对可用性要求往往非常高,为此,spark 提供了 checkpoint 机制对 streaming 程序进行容灾处理

在 Wordcount 程序中加入 checkpoint 机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import os
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 在Spark Streaming中, 无法从 checkpoint 恢复 Accumulators 和 Broadcast 变量 . 如果启用 checkpoint 并使用 Accumulators 或
# Broadcast 变量 , 则必须为 Accumulators 和 Broadcast 变量创建延迟实例化的单例实例, 以便在 driver 重新启动失败后重新实例化.
# Get or register a Broadcast variable
def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']


# Get or register an Accumulator
def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']


def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded from the new checkpoint
print("Creating new context")
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
ssc = StreamingContext(sc, 10)

lines = ssc.socketTextStream(host, port)
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)

# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
return False
else:
return True

counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
print(counts)
print("Dropped %d word(s) totally" % droppedWordsCounter.value)
print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")

wordCounts.foreachRDD(echo)
return ssc


if __name__ == "__main__":
if len(sys.argv) != 5:
print("Usage: recoverable_network_wordcount.py <hostname> <port> "
"<checkpoint-directory> <output-file>", file=sys.stderr)
sys.exit(-1)
host, port, checkpoint, output = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint, lambda: createContext(host, int(port), output))
ssc.sparkContext.setLogLevel("ERROR")
ssc.start()
ssc.awaitTermination()

实验日志

1
nc -lk 9999
1
spark-submit project/daily-learning/learn-spark/python/base_demo/streaming/checkpoint_recover.py 127.0.0.1 9999 ~/checkpoint ~/out
第一次启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2018-06-09 08:07:25 WARN  Checkpoint:66 - Checkpoint directory /home/zj/checkpoint does not exist (第一次没有checkpoint,会创建 contex)
Creating new context
2018-06-09 08:07:25 INFO SparkContext:54 - Running Spark version 2.3.1
2018-06-09 08:07:26 INFO SparkContext:54 - Submitted application: PythonStreamingRecoverableNetworkWordCount
...

Counts at time 2018-06-09 08:07:30 []
Dropped 0 word(s) totally
Appending to /home/zj/out
Counts at time 2018-06-09 08:07:40 [('', 1), ('e', 1), ('d', 1), ('f', 1)]
Dropped 3 word(s) totally
Appending to /home/zj/out
Counts at time 2018-06-09 08:07:50 [('g', 1), ('e', 1), ('d', 1), ('f', 1)]
Dropped 6 word(s) totally
Appending to /home/zj/out
Counts at time 2018-06-09 08:08:00 []
Dropped 6 word(s) totally
Appending to /home/zj/out
CTRL+C 停止后重新启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
(发现 checkpoint)
2018-06-09 08:15:06 INFO CheckpointReader:54 - Checkpoint files found: hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773280000,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773280000.bk,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773270000,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773270000.bk,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773260000,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773260000.bk,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773250000,hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773250000.bk
(尝试加载 checkpoint)
2018-06-09 08:15:06 INFO CheckpointReader:54 - Attempting to load checkpoint from file hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773280000
2018-06-09 08:15:06 INFO Checkpoint:54 - Checkpoint for time 1533773280000 ms validated
(成功读取 checkpoint)
2018-06-09 08:15:06 INFO CheckpointReader:54 - Checkpoint successfully loaded from file hdfs://localhost:9000/home/zj/checkpoint/checkpoint-1533773280000
2018-06-09 08:15:06 INFO CheckpointReader:54 - Checkpoint was generated at time 1533773280000 ms
2018-06-09 08:15:06 INFO SparkContext:54 - Running Spark version 2.3.1
2018-06-09 08:15:06 INFO SparkContext:54 - Submitted application: PythonStreamingRecoverableNetworkWordCount
...

(继续接着上一次停止的位置开始执行)
Counts at time 2018-06-09 08:08:00 []
Dropped 0 word(s) totally
Appending to /home/zj/out
Counts at time 2018-06-09 08:08:10 []
Dropped 0 word(s) totally
Appending to /home/zj/out
Counts at time 2018-06-09 08:08:20 []
...

文章标题:Spark Streaming 使用 checkpoint 做恢复

文章字数:976

本文作者:Waterandair

发布时间:2018-06-10, 11:20:47

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-06-10-spark-streaming-checkpoint.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github