使用 transform 实现黑名单实时过滤操作
DStream 提供的 join 操作, 只能合并其他 DStream, 但是在一些场景下,需要 DStream 中 每个 batch RDD 和另外的 RDD 进行 join 操作, 比如在使用 spark Streaming 进行实时访问日志处理的时候,需要过滤掉已知的非法访问
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| from pyspark.streaming import StreamingContext from pyspark import SparkContext
sc = SparkContext("local[2]", "transformBlacklist") ssc = StreamingContext(sc, 5)
blacklist = [ ("tom", True), ("jerry", True) ] # 模拟一个黑名单 (姓名, 是否启用) blacklistRdd = sc.parallelize(blacklist) # 接收实时访问日志, 这里简化为格式为 (date, username) logDStream = ssc.socketTextStream("127.0.0.1", 9999)
# 把 (data, username) 转为 (username, (date, username)) userLogDStream = logDStream.map(lambda row: (row.split(" ")[1], row))
def transform(rdd): joinedRdd = rdd.leftOuterJoin(blacklistRdd) filteredRdd = joinedRdd.filter(lambda row: row[1][1] is not True) validLogRdd = filteredRdd.map(lambda row: row[1]) return validLogRdd
validLogDStream = userLogDStream.transform(lambda rdd: transform(rdd))
validLogDStream.pprint()
ssc.start() ssc.awaitTermination()
|