Spark 读取和转存数据(python)
创建时间:2018-01-01 07:34
字数:2.1k
介绍 spark core, spark sql 和 spark streaming 读取多种数据源及数据转存
spark rdd 读取和转存数据 通过集合创建rdd(测试用) 1 2 list = ["hadoop", "spark", "hive"] rdd = sc.parallelize(list)
本地文件系统读写 1 2 3 4 5 # 读文件/目录 rdd = sc.textFile("file:///home/zj/word.txt") # 存文件 rdd.saveAsTextFile("file:///home/zj/word_bak.txt") #
注意存文件的时候, wrod_bak.txt
并不是文件而是一个文件夹
1 2 3 shell> ls /home/zj/word_bak.txt part-00000 _SUCCESS
part-00000 中就是存数据的文件
分布式文件系统HDFS的数据读写 1 2 3 4 5 6 7 # 上传文件到 hdfs shell> hdfs dfs -put /home/zj/word.txt /user/zj/word.txt # 读文件/目录 rdd = sc.textFile("hdfs://localhost:9000/user/zj/word.txt") rdd = sc.textFile("/user/zj/word.txt") # 存文件 rdd.saveAsTextFile("word_bak.txt")
同样的,这里存的 word_bak.txt
也是一个目录
1 2 3 shell> hsfs dfs -ls /user/zj/word_bak.txt part-00000 _SUCCESS
读取 json 格式文件 需要使用map操作对读取到的json文件内容进行转换
1 2 3 4 # /home/zjprople.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
1 2 3 4 5 6 7 from pyspark import SparkContext import json sc = SparkContext('local','json') inputFile = "file:///home/zj/people.json" jsonStrs = sc.textFile(inputFile) result = jsonStrs.map(lambda s : json.loads(s)) result.foreach(print)
Spark Sql 读取和转存数据 读取本地 json 文件 1 2 spark=SparkSession.builder.getOrCreate() df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
从 RDD 转换到 DataFrame 利用反射机制推断 RDD 模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from pyspark.sql import SparkSession from pyspark.sql.types import Row spark = SparkSession\ .builder \ .appName("reflect rdd to dataFrame") \ .getOrCreate() sc = spark.sparkContext # Michael, 29 # Andy, 30 # Justin, 19 lines = sc.textFile("file:///usr/local/spark-2.2.1/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <=19") teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name)
以编程的方式指定Schema 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from pyspark.sql import SparkSession from pyspark.sql.types import StructField, StructType, StringType spark = SparkSession\ .builder\ .appName("coding_rdd")\ .getOrCreate() sc = spark.sparkContext lines = sc.textFile("file:///usr/local/spark-2.2.1/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: (p[0], p[1].strip())) # 定义 schema schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) schemaPeople = spark.createDataFrame(people, schema) # 必须注册为临时表才能供下面查询使用 schemaPeople.createOrReplaceTempView("people") results = spark.sql("SELECT name FROM people") results.show()
转存 dataFrame 为格式化文件 直接转为格式化文件 1 2 3 peopleDF = spark.read.format("json").load("file:///usr/local/spark-2.2.1/examples/src/main/resources/people.json") # 转为为 csv 文件 peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark-2.2.1/examples/src/main/resources/newpeople.csv")
先转为 rdd, 再转为文件 1 peopleDF.rdd.saveAsTextFile("file:///usr/local/spark-2.2.1/examples/src/main/resources/newpeople.txt")
读写 parquet 文件 读取 parquet 文件 Parquet 是许多其他数据处理系统支持的 columnar format (列式存储)
1 2 3 4 parquetFileDF = spark.read.parquet("file:///usr/local/spark-2.2.1/examples/src/main/resources/users.parquet" parquetFileDF.createOrReplaceTempView("parquetFile") namesDF = spark.sql("SELECT * FROM parquetFile") namesDF.rdd.foreach(lambda person: print(person.name))
转存 parquet 文件 1 peopleDF.write.parquet("file:///usr/local/spark-2.2.1/examples/src/main/resources/newpeople.parquet")
重新加载转存的 parquet 文件 1 peopleDF = spark.read.parquet("file:///usr/local/spark/myCode/people.parquet")
通过 jdbc 读写 mysql 环境准备 使用 jdbc 连接 mysql 必须要导入需要的ja包, 比如 mysql-connector-java-5.1.46.jar
下载地址: https://dev.mysql.com/downloads/connector/j/
下载后解压到指定目录 /usr/local/spark-2.2.1/jars/
编辑 spark 根目录下的 /conf/spark-env.sh, 把目标jar包写入环境变量 SPARK_DIST_CLASSPATH1 export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):/usr/local/spark-2.2.1/jars/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar
MYSQL 环境示例准备
1 2 3 4 5 6 7 8 9 10 11 12 13 MySQL [none]> create database spark; MySQL [none]> use spark; MySQL [spark]> create table people(id int unsigned auto_increment primary key, name char(20), age int(4); MySQL [spark]> insert into people (name, age) values ("Mchael", 29),("Andy", 30),("Justn", 19); MySQL [spark]> select * from people; +----+--------+------+ | id | name | age | +----+--------+------+ | 1 | Mchael | 29 | | 2 | Andy | 30 | | 3 | Justn | 19 | +----+--------+------+ 3 rows in set (0.00 sec)
spark sql 从 mysql 读取数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("coding_rdd")\ .getOrCreate() jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://localhost:3306/spark") \ .option("dbtable", "people") \ .option("user", "root") \ .option("password", "000000") \ .option("driver", "com.mysql.jdbc.Driver") \ .load() jdbcDF.show()
执行: python3 datasource_jdbc.py
1 2 3 4 5 6 7 +---+------+---+ | id| name|age| +---+------+---+ | 1|Mchael| 29| | 2| Andy| 30| | 3| Justn| 19| +---+------+---+
如果不想在 /conf/spark-env.sh 中写入环境变量,也可以使用 spark-submit 指令指定 jar 包的地址:
1 spark-submit --driver-class-path /usr/local/spark-2.2.1/jars/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar datasource_jdbc.py
spark sql 写数据到 mysql 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from pyspark.sql import SparkSession from pyspark.sql.types import Row spark = SparkSession\ .builder\ .appName("write_DF_to_mysql")\ .getOrCreate() sc = spark.sparkContext # mmysql 表id字段设置为 AUTO_INCREMENT, 这里就可以不填充 id 字段 peoplelist = [ "zj 25", "kobe 41" ] peopleRDD = sc.parallelize(peoplelist) people = peopleRDD\ .map(lambda l: l.split())\ .map(lambda p: Row(name=p[0], age=p[1])) peopleDF = spark.createDataFrame(people) peopleDF.write\ .format("jdbc")\ .option("url", "jdbc:mysql://localhost:3306/spark")\ .option("dbtable", "people") \ .option("user", "root") \ .option("password", "000000") \ .option("driver", "com.mysql.jdbc.Driver") \ .mode("append")\ .save() # 第二种写法 # prop = { # 'user': 'root', # 'password': '000000', # 'driver': 'com.mysql.jdbc.Driver' # } # peopleDF.write\ # .jdbc("jdbc:mysql://localhost:3306/spark",'people','append', prop)
1 2 3 4 5 6 7 8 9 MySQL [spark]> select * from people; +----+--------+------+ | id | name | age | +----+--------+------+ | 1 | Mchael | 29 | | 2 | Andy | 30 | | 3 | Justn | 19 | | 4 | zj | 25 | | 5 | kobe | 41 |
spark streaming 读取数据 spark streaming 基本步骤
定义输入源
通过对DStream应用转换操作和输出操作来定义流计算。
用streamingContext.start()来开始接收数据和处理流程。
通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
可以通过streamingContext.stop()来手动结束流计算进程。
从文件流读取数据 spark 可以读取一个文件或文件夹下的增量数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from pyspark import SparkContext from pyspark.streaming import StreamingContext #local[*] 中必须设置大于1的并行数量 sc = SparkContext("local[2]", "streaming") # 设置每次计算的时间间隔 ssc = StreamingContext(sc, 5) lines = ssc.textFileStream('file:///home/zj/logs') words = lines.flatMap(lambda l: l.split()) wordsPair = words.map(lambda x: (x, 1)) wordscount = wordsPair.reduceByKey(lambda a, b: a + b) wordscount.pprint() ssc.start() ssc.awaitTermination()
执行后,向 /home/zj/logs
中添加一个新的包含内容的文件, spark 就可以实时读取到增量数据,进行计算
从 socket流 中读取数据 Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。
先执行 nc -lk 9999
, 然后再执行下面的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from pyspark.streaming import StreamingContext from pyspark import SparkContext sc = SparkContext("local[2]", "streaming_socket") ssc = StreamingContext(sc, 10) lines = ssc.socketTextStream("localhost", 9999) wordcount = lines\ .flatMap(lambda l: l.split())\ .map(lambda w: (w, 1))\ .reduceByKey(lambda a, b: a + b) wordcount.pprint() ssc.start() ssc.awaitTermination()
使用 list 或 rdds 创建输入流(用于测试) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import time from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext("local[4]", "streaming_rdds") ssc = StreamingContext(sc, 1) queue = [] for i in range(10): queue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)] wordcount = ssc.queueStream(queue).map(lambda x: (x % 10, 1)).reduceByKey(lambda a, b: a + b) wordcount.pprint() ssc.start() time.sleep(10) # stopGracefully, 等待所有任务完成 ssc.stop(stopSparkContext=True, stopGraceFully=True)
kafka 作为数据源 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils sc = SparkContext("local[4]", "streaming-kafka") ssc = StreamingContext(sc, batchDuration=3) zkQuorum = "172.17.0.2:2181" group_id = "group-5" topics = { "test": 1 } kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group_id, topics) word_counts = kafkaStream\ .map(lambda x: x[1])\ .flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ .reduceByKey(lambda a, b: a + b) word_counts.pprint() ssc.start() ssc.awaitTermination()
文章标题: Spark 读取和转存数据(python)
文章字数: 2.1k
本文作者: Waterandair
发布时间: 2018-01-01, 07:34:19
最后更新: 2019-12-28, 14:03:59
原始链接: https://waterandair.github.io/2018-01-01-pyspark-read-write.html
版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。