Spark 读取和转存数据(python)

介绍 spark core, spark sql 和 spark streaming 读取多种数据源及数据转存

spark rdd 读取和转存数据

通过集合创建rdd(测试用)

1
2
list = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(list)

本地文件系统读写

1
2
3
4
5
# 读文件/目录
rdd = sc.textFile("file:///home/zj/word.txt")
# 存文件
rdd.saveAsTextFile("file:///home/zj/word_bak.txt")
#

注意存文件的时候, wrod_bak.txt 并不是文件而是一个文件夹

1
2
3
shell> ls /home/zj/word_bak.txt
part-00000
_SUCCESS

part-00000 中就是存数据的文件

分布式文件系统HDFS的数据读写

1
2
3
4
5
6
7
# 上传文件到 hdfs
shell> hdfs dfs -put /home/zj/word.txt /user/zj/word.txt
# 读文件/目录
rdd = sc.textFile("hdfs://localhost:9000/user/zj/word.txt")
rdd = sc.textFile("/user/zj/word.txt")
# 存文件
rdd.saveAsTextFile("word_bak.txt")

同样的,这里存的 word_bak.txt 也是一个目录

1
2
3
shell> hsfs dfs -ls /user/zj/word_bak.txt
part-00000
_SUCCESS

读取 json 格式文件

需要使用map操作对读取到的json文件内容进行转换

1
2
3
4
# /home/zjprople.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
1
2
3
4
5
6
7
from pyspark import SparkContext
import json
sc = SparkContext('local','json')
inputFile = "file:///home/zj/people.json"
jsonStrs = sc.textFile(inputFile)
result = jsonStrs.map(lambda s : json.loads(s))
result.foreach(print)

Spark Sql 读取和转存数据

读取本地 json 文件

1
2
spark=SparkSession.builder.getOrCreate()
df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")

从 RDD 转换到 DataFrame

利用反射机制推断 RDD 模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from pyspark.sql import SparkSession
from pyspark.sql.types import Row

spark = SparkSession\
.builder \
.appName("reflect rdd to dataFrame") \
.getOrCreate()
sc = spark.sparkContext
# Michael, 29
# Andy, 30
# Justin, 19
lines = sc.textFile("file:///usr/local/spark-2.2.1/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <=19")

teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
以编程的方式指定Schema
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType
spark = SparkSession\
.builder\
.appName("coding_rdd")\
.getOrCreate()

sc = spark.sparkContext

lines = sc.textFile("file:///usr/local/spark-2.2.1/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# 定义 schema
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

schemaPeople = spark.createDataFrame(people, schema)
# 必须注册为临时表才能供下面查询使用
schemaPeople.createOrReplaceTempView("people")
results = spark.sql("SELECT name FROM people")
results.show()

转存 dataFrame 为格式化文件

直接转为格式化文件
1
2
3
peopleDF = spark.read.format("json").load("file:///usr/local/spark-2.2.1/examples/src/main/resources/people.json")
# 转为为 csv 文件
peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark-2.2.1/examples/src/main/resources/newpeople.csv")
先转为 rdd, 再转为文件
1
peopleDF.rdd.saveAsTextFile("file:///usr/local/spark-2.2.1/examples/src/main/resources/newpeople.txt")

读写 parquet 文件

读取 parquet 文件

Parquet 是许多其他数据处理系统支持的 columnar format (列式存储)

1
2
3
4
parquetFileDF = spark.read.parquet("file:///usr/local/spark-2.2.1/examples/src/main/resources/users.parquet"
parquetFileDF.createOrReplaceTempView("parquetFile")
namesDF = spark.sql("SELECT * FROM parquetFile")
namesDF.rdd.foreach(lambda person: print(person.name))
转存 parquet 文件
1
peopleDF.write.parquet("file:///usr/local/spark-2.2.1/examples/src/main/resources/newpeople.parquet")
重新加载转存的 parquet 文件
1
peopleDF = spark.read.parquet("file:///usr/local/spark/myCode/people.parquet")

通过 jdbc 读写 mysql

环境准备

使用 jdbc 连接 mysql 必须要导入需要的ja包, 比如 mysql-connector-java-5.1.46.jar

  1. 下载地址: https://dev.mysql.com/downloads/connector/j/
  2. 下载后解压到指定目录 /usr/local/spark-2.2.1/jars/
  3. 编辑 spark 根目录下的 /conf/spark-env.sh, 把目标jar包写入环境变量 SPARK_DIST_CLASSPATH
    1
    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):/usr/local/spark-2.2.1/jars/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar
    MYSQL 环境示例准备
1
2
3
4
5
6
7
8
9
10
11
12
13
MySQL [none]> create database spark;
MySQL [none]> use spark;
MySQL [spark]> create table people(id int unsigned auto_increment primary key, name char(20), age int(4);
MySQL [spark]> insert into people (name, age) values ("Mchael", 29),("Andy", 30),("Justn", 19);
MySQL [spark]> select * from people;
+----+--------+------+
| id | name | age |
+----+--------+------+
| 1 | Mchael | 29 |
| 2 | Andy | 30 |
| 3 | Justn | 19 |
+----+--------+------+
3 rows in set (0.00 sec)
spark sql 从 mysql 读取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pyspark.sql import SparkSession

spark = SparkSession\
.builder\
.appName("coding_rdd")\
.getOrCreate()

jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/spark") \
.option("dbtable", "people") \
.option("user", "root") \
.option("password", "000000") \
.option("driver", "com.mysql.jdbc.Driver") \
.load()

jdbcDF.show()

执行: python3 datasource_jdbc.py

1
2
3
4
5
6
7
+---+------+---+
| id| name|age|
+---+------+---+
| 1|Mchael| 29|
| 2| Andy| 30|
| 3| Justn| 19|
+---+------+---+

如果不想在 /conf/spark-env.sh 中写入环境变量,也可以使用 spark-submit 指令指定 jar 包的地址:

1
spark-submit --driver-class-path /usr/local/spark-2.2.1/jars/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar datasource_jdbc.py
spark sql 写数据到 mysql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyspark.sql import SparkSession
from pyspark.sql.types import Row

spark = SparkSession\
.builder\
.appName("write_DF_to_mysql")\
.getOrCreate()
sc = spark.sparkContext

# mmysql 表id字段设置为 AUTO_INCREMENT, 这里就可以不填充 id 字段
peoplelist = [
"zj 25",
"kobe 41"
]
peopleRDD = sc.parallelize(peoplelist)
people = peopleRDD\
.map(lambda l: l.split())\
.map(lambda p: Row(name=p[0], age=p[1]))

peopleDF = spark.createDataFrame(people)

peopleDF.write\
.format("jdbc")\
.option("url", "jdbc:mysql://localhost:3306/spark")\
.option("dbtable", "people") \
.option("user", "root") \
.option("password", "000000") \
.option("driver", "com.mysql.jdbc.Driver") \
.mode("append")\
.save()

# 第二种写法
# prop = {
# 'user': 'root',
# 'password': '000000',
# 'driver': 'com.mysql.jdbc.Driver'
# }
# peopleDF.write\
# .jdbc("jdbc:mysql://localhost:3306/spark",'people','append', prop)
1
2
3
4
5
6
7
8
9
MySQL [spark]> select * from people;
+----+--------+------+
| id | name | age |
+----+--------+------+
| 1 | Mchael | 29 |
| 2 | Andy | 30 |
| 3 | Justn | 19 |
| 4 | zj | 25 |
| 5 | kobe | 41 |

spark streaming 读取数据

spark streaming 基本步骤

  1. 定义输入源
  2. 通过对DStream应用转换操作和输出操作来定义流计算。
  3. 用streamingContext.start()来开始接收数据和处理流程。
  4. 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
  5. 可以通过streamingContext.stop()来手动结束流计算进程。

从文件流读取数据

spark 可以读取一个文件或文件夹下的增量数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

#local[*] 中必须设置大于1的并行数量
sc = SparkContext("local[2]", "streaming")
# 设置每次计算的时间间隔
ssc = StreamingContext(sc, 5)
lines = ssc.textFileStream('file:///home/zj/logs')
words = lines.flatMap(lambda l: l.split())
wordsPair = words.map(lambda x: (x, 1))
wordscount = wordsPair.reduceByKey(lambda a, b: a + b)
wordscount.pprint()

ssc.start()
ssc.awaitTermination()

执行后,向 /home/zj/logs 中添加一个新的包含内容的文件, spark 就可以实时读取到增量数据,进行计算

从 socket流 中读取数据

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。

先执行 nc -lk 9999, 然后再执行下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pyspark.streaming import StreamingContext
from pyspark import SparkContext

sc = SparkContext("local[2]", "streaming_socket")
ssc = StreamingContext(sc, 10)

lines = ssc.socketTextStream("localhost", 9999)
wordcount = lines\
.flatMap(lambda l: l.split())\
.map(lambda w: (w, 1))\
.reduceByKey(lambda a, b: a + b)

wordcount.pprint()

ssc.start()
ssc.awaitTermination()

使用 list 或 rdds 创建输入流(用于测试)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[4]", "streaming_rdds")
ssc = StreamingContext(sc, 1)

queue = []

for i in range(10):
queue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]

wordcount = ssc.queueStream(queue).map(lambda x: (x % 10, 1)).reduceByKey(lambda a, b: a + b)
wordcount.pprint()

ssc.start()
time.sleep(10)

# stopGracefully, 等待所有任务完成
ssc.stop(stopSparkContext=True, stopGraceFully=True)

kafka 作为数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext("local[4]", "streaming-kafka")
ssc = StreamingContext(sc, batchDuration=3)

zkQuorum = "172.17.0.2:2181"
group_id = "group-5"
topics = {
"test": 1
}

kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group_id, topics)

word_counts = kafkaStream\
.map(lambda x: x[1])\
.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a + b)

word_counts.pprint()

ssc.start()
ssc.awaitTermination()

文章标题:Spark 读取和转存数据(python)

文章字数:2.1k

本文作者:Waterandair

发布时间:2018-01-01, 07:34:19

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-01-01-pyspark-read-write.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github