HDFS 是一个分布式的文件系统,它由三部分组成:NameNode(管理元数据)、DataNode(存储文件数据块)、Secondary NameNode(维护快照)。本文介绍HDFS的原理及基本使用。
原理 HDFS 写数据流程
客户端向 namenode 发出上传文件的请求,namenode 检查目标文件是否已存在,父目录是否存在,向客户端返回是否可以上传。
如果可以上传,客户端向 namenode 询问第一个 block 上传到哪几个datanode服务器。
namenode 返回 n 个datanode节点,假设为 dn1、dn2、dn3。
客户端向 dn1 请求上传 block,dn1 收到请求会向 dn2 发起请求,然后 dn2 向dn3 发起请求,将这个通信管道建立完成
dn1、dn2、dn3 逐级应答客户端
客户端开始向 dn1 上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位,dn1收到一个packet就会传给dn2,dn2传给dn3,dn1每传一个 packet 会放入一个应答队列等待应答
当一个 block 传输完成之后,客户端再次向 namenode 请求上传第二个 block 的服务器, 然后重复之前的操作直到所有 block 上传完成。
HDFS 读数据流程
客户端向 namenode 请求下载文件,namenode 通过查询元数据,找到文件块所在的 datanode 地址。
挑选一台datanode(就近原则,然后随机)服务器,请求读取数据。
datanode开始传输数据给客户端(从磁盘里面读取数据放入流,以packet为单位来做校验)。
客户端以 packet 为单位接收,先在本地缓存,然后写入目标文件。
NameNode namenode 工作流程
如果是第一次启动, namenode 进行格式化并创建 fsimage 和 edits 文件。如果不是第一次启动,直接加载镜像文件(fsimage)和编辑日志(edits)到内存。
客户端对元数据进行增删改的请求
namenode记录操作日志,更新滚动日志。
namenode在内存中对数据进行增删改查
镜像文件和编辑日志 namenode 被格式化之后,将在配置项hadoop.tmp.dir
指定的目录中产生如下文件:
1 2 3 4 5 6 7 8 9 10 11 12 name ├── current │ ├── edits_0000000000000000001-0000000000000000002 │ ├── edits_0000000000000000003-0000000000000000004 │ ├── edits_inprogress_0000000000000000005 │ ├── fsimage_0000000000000000002 │ ├── fsimage_0000000000000000002.md5 │ ├── fsimage_0000000000000000004 │ ├── fsimage_0000000000000000004.md5 │ ├── seen_txid │ └── VERSION └── in_use.lock
fsimage 文件: 元数据镜像文件,包含所有目录和inode的序列化信息
edits 文件: 编辑日志,记录所有写操作
seen_txid: 保存一个数字,是最后一个 edits 文件的序号
namenode 第一次启动会进行格式化,创建 fsimage 和 edits 等文件,之后每一次启动都会先 fsimage 文件读入内存,并依次执行 edits 文件中的更新操作,保证了内存中元数据信息是最新的同步的。
查看镜像文件: hdfs oiv -p 文件类型 -i镜像文件 -o 转换后文件输出路径
查看编辑日志: hdfs oev -p 文件类型 -i编辑日志 -o 转换后文件输出路径
fsimage 和 edits 文件合并的过程又称为滚动日志,可以在启动HDFS集群后使用命令 hdfs dfsadmin -rollEdits
进行手动滚动日志。
namenode 版本号 在配置项 hadoop.tmp.dir
指定的目录中的 ./name/currrent 目录中,还有一个 VERSION
文件,记录了 namenode 的版本号,内容形如:
1 2 3 4 5 6 namespaceID=1443494756 # HDFS 集群中 namenode 的唯一标识 clusterID=CID-0e913bcb-85ea-41cf-a758-4151dc74b7db # 集群 id cTime=1533941647598 # namenode 的创建时间 storageType=NAME_NODE # 说明该存储目录包含的是namenode的数据结构 blockpoolID=BP-1989204408-127.0.1.1-1533941647598 # 一个 block pool 的唯一标识 layoutVersion=-64
namenode 多目录配置 namenode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性
1 2 3 4 5 [hdfs-site.xml] <property> <name>dfs.namenode.name.dir</name> <value>file:///${hadoop.tmp.dir}/dfs/name1,file:///${hadoop.tmp.dir}/dfs/name2</value> </property>
SecondaryNameNode 介绍 Secondary NameNode用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。 在配置项 hadoop.tmp.dir
指定的目录中有如下文件,类似与 namenode 的本地目录:
1 2 3 4 5 6 7 8 9 10 namesecondary/ ├── current │ ├── edits_0000000000000000001-0000000000000000002 │ ├── edits_0000000000000000003-0000000000000000004 │ ├── fsimage_0000000000000000002 │ ├── fsimage_0000000000000000002.md5 │ ├── fsimage_0000000000000000004 │ ├── fsimage_0000000000000000004.md5 │ └── VERSION └── in_use.lock
在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据。
Secondary NameNode 工作流程
Secondary NameNode 询问 namenode 是否需要 checkpoint
Secondary NameNode 请求执行 checkpoint。
namenode 滚动正在写的 edits 日志
将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode
Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。
生成新的镜像文件fsimage.chkpoint
拷贝 fsimage.chkpoint 到 namenode
namenode 将 fsimage.chkpoint 重新命名成fsimage
chkpoint检查时间参数设置:
SecondaryNameNode 每隔一小时执行一次。1 2 3 4 5 [hdfs-default.xml] <property> <name>dfs.namenode.checkpoint.period</name> <value>3600</value> </property>
一分钟检查一次操作次数,当操作次数达到1百万时,SecondaryNameNode 执行一次。
1 2 3 4 5 6 7 8 9 10 11 12 [hdfs-default.xml] <property> <name>dfs.namenode.checkpoint.txns</name> <value>1000000</value> <description>操作动作次数</description> </property> <property> <name>dfs.namenode.checkpoint.check.period</name> <value>60</value> <description> 1分钟检查一次操作次数</description> </property>
DataNode DataNode 工作流程
一个数据块在 datanode 上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
DataNode 启动后向 namenode 注册,通过后周期性(1小时)的向 namenode 上报所有的块信息。
心跳是每3秒一次,心跳返回结果带有 namenode 给该 datanode 的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个 datanode 的心跳,则认为该节点不可用。
集群运行中可以安全加入和退出一些机器
DataNode 本地目录 在配置项 hadoop.tmp.dir
指定的目录中:
1 2 3 4 5 6 7 8 9 10 11 data ├── current │ ├── BP-1989204408-127.0.1.1-1533941647598 │ │ ├── current │ │ │ ├── finalized │ │ │ ├── rbw │ │ │ └── VERSION │ │ ├── scanner.cursor │ │ └── tmp │ └── VERSION └── in_use.lock
./data/current
下的版本号1 2 3 4 5 6 storageID=DS-1b998a1d-71a3-43d5-82dc-c0ff3294921b # 存储id号 clusterID=CID-1f2bf8d1-5ad2-4202-af1c-6713ab381175 # 集群id cTime=0 # datanode存储系统的创建时间 datanodeUuid=970b2daf-63b8-4e17-a514-d81741392165 # datanode的唯一标识 storageType=DATA_NODE # 存储类型 layoutVersion=-56
./data/current/BP-1989204408-127.0.1.1-1533941647598
下的版本号1 2 3 4 namespaceID=1933630176 # datanode 从 namenode 处获取的 storageID 对每个 datanode 来说是唯一的,namenode 用这个属性来区分不同 datanode。 cTime=0 # datanode存储系统的创建时间 blockpoolID=BP-97847618-192.168.10.102-1493726072779 # block pool id标识一个block pool layoutVersion=-56
数据完成性检验
DataNode 读取 block 的时计算 checksum
如果计算后的 checksum,与 block 创建时的值不一样,说明 block 已经损坏。
client 读取其他 DataNode 上的 block.
datanode 在其文件创建后周期验证 checksum
宕机超时参数 datanode 进程和所在节点挂掉后,namenode 不会立即认为改节点失效,而是经过要给超时时间后才认为 datanode 宕机。
超时时间计算方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 timeout = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval。 # dfs.namenode.heartbeat.recheck-interval 默认为 5 分钟 # dfs.heartbeat.interval 默认为 3 秒 这两个参数在 hdfs-site.xml 中配置: <property> <name>dfs.namenode.heartbeat.recheck-interval</name> <value>300000</value> </property> <property> <name> dfs.heartbeat.interval </name> <value>3</value> </property>
增加新节点 假设已存在 big-data-01
, big-data-02
, big-data-03
三个 datanode 节点,现在加入一个新节点: big-data-04
创建 dfs.hosts 文件 1 2 3 4 big-data-01 big-data-02 big-data-03 big-data-04
在 namenode 的 hdfs-site.xml 配置文件中增加 dfs.hosts 属性 1 2 3 4 <property> <name>dfs.hosts</name> <value>/usr/local/hadoop/etc/hadoop/dfs.hosts</value> </property>
刷新 namenode和resourcemanager 1 2 shell> hdfs dfsadmin -refreshNodes shell> yarn rmadmin -refreshNodes
在 namenode 的 slaves 文件中增加新主机名称 1 2 3 4 big-data-01 big-data-02 big-data-03 big-data-04
单独命令启动新的节点 1 2 shell> sbin/hadoop-daemon.sh start datanode shell> sbin/yarn-daemon.sh start nodemanager
解决数据不均衡 1 shell> ./sbin/start-balancer.sh
删除一个节点 已有节点 big-data-01
, big-data-02
, big-data-03
,big-data-04
4 个节点,删除节点big-data-04
创建 dfs.hosts.exclude 文件 在 namenode 节点 hadoop 的配置文件目录创建 dfs.hosts.exclude
文件,添加要删除的节点
在 namenode 的h dfs-site.xml 配置文件中增加 dfs.hosts.exclude
属性 1 2 3 4 <property> <name>dfs.hosts.exclude</name> <value>/usr/local/hadoop/etc/hadoop/dfs.hosts.exclude</value> </property>
刷新 namenode和resourcemanager 1 2 shell> hdfs dfsadmin -refreshNodes shell> yarn rmadmin -refreshNodes
复制数据到其他节点 web 端查看,如果操作正确,被删除的节点状态为 decommission in progress
, 此时该节点正在复制块到其他节点。
复制完成后,该节点的状态更新为 decommissioned
,停止datanode和nodemanager:
1 2 sbin/hadoop-daemon.sh stop datanode sbin/yarn-daemon.sh stop nodemanager
注意: 如果当前运行的节点不能小于副本数,否则不能删除节点,需要修改副本数后才能退役
从 include 文件中删除退役节点,再运行刷新节点的命令 更新 dfs.hosts
文件,删除 big-data-04
:
1 2 3 big-data-01 big-data-02 big-data-03
刷新 namenode 和 resourcemanager:
1 2 hdfs dfsadmin -refreshNodes yarn rmadmin -refreshNodes
从 namenode 的 slave 文件中删除退役节点 1 2 3 big-data-01 big-data-02 big-data-03
解决数据不均衡 1 shell> ./sbin/start-balancer.sh
使用 常用命令行操作 bin/hadoop fs 具体命令
or bin/hdfs dfs 具体命令
-help
输出这个命令参数1 zj@pc:~$ hadoop fs -help rm
-ls
显示目录信息1 zj@pc:~$ hadoop fs -ls /
-mkdir
在hdfs上创建目录1 zj@pc:~$ hadoop fs -mkdir -p /nba/lakers
-moveFromLocal
从本地剪切粘贴到hdfs1 2 3 4 # 会删除本地磁盘文件 zj@pc:~$ hadoop fs -moveFromLocal /home/zj/kobe.txt /nba/lakers # `-moveToLocal` 从hdfs剪切粘贴到本地(尚未实现)
-cat
显示文件内容1 2 zj@pc:~$ hadoop fs -cat /nba/lakers/kobe.txt 24
--appendToFile
追加一个文件到已经存在的文件末尾1 2 3 4 5 6 zj@pc:~$ cat kobe2.txt 8 zj@pc:~$ hadoop fs -appendToFile /home/zj/kobe2.txt /nba/lakers/kobe.txt zj@pc:~$ hadoop fs -cat /nba/lakers/kobe.txt 24 8
-tail
显示一个文件的末尾1 2 3 zj@pc:~$ hadoop fs -tail /nba/lakers/kobe.txt 24 8
-chgrp 、-chmod、-chown
修改文件所属权限(linux文件系统中的用法一样)1 2 zj@pc:~$ hadoop fs -chown zj:zj /nba/lakers/kobe.txt zj@pc:~$ hadoop fs -chmod 666 /nba/lakers/kobe.txt
-copyFromLocal
从本地文件系统中拷贝文件到hdfs路径去(等同于-put
)1 zj@pc:~$ hadoop fs -copyFromLocal ./james.txt /nba/lakers
-copyToLocal
从hdfs拷贝到本地(等同于 -get
)1 zj@pc:~$ hadoop fs -copyToLocal /nba/lakers/james.txt ./james2.txt
-cp
从hdfs的一个路径拷贝到hdfs的另一个路径1 2 zj@pc:~$ hadoop fs -mkdir /nba/heat zj@pc:~$ hadoop fs -cp /nba/lakers/james.txt /nba/heat
-mv
在hdfs目录中移动文件(修改名称)1 2 zj@pc:~$ hadoop fs -mv /nba/heat /nba/MIA zj@pc:~$ hadoop fs -mv /nba/lakers /nba/LAL
-getmerge
合并下载多个文件,所有文件内容都集中到一个文件中1 2 3 4 5 zj@pc:~$ hadoop fs -getmerge /nba/LAL/* ./lakers.txt zj@pc:~$ cat lakers.txt 23 24 8
-rm
删除文件或文件夹1 2 zj@pc:~$ hadoop fs -rm -r /nba/MIA/james.txt Deleted /nba/MIA/james.txt
-rmdir
删除空目录1 zj@pc:~$ hadoop fs -rmdir /nba/MIA/
-df
统计文件系统的可用空间信息1 zj@pc:~$ hadoop fs -df -h /
-du
统计文件夹的大小信息1 2 3 4 5 6 7 zj@pc:~$ hadoop fs -du -s -h / 127.5 K 127.5 K / zj@pc:~$ hadoop fs -du -h / 127.4 K 127.4 K /home 57 57 /nba 0 0 /tmp 0 0 /user
-count
统计一个指定目录下的文件节点数量1 2 3 zj@pc:~$ hadoop fs -count / 18 14 130512 / 嵌套文件层级 包含文件的总数
-setrep
设置hdfs中文件的副本数量1 2 zj@pc:~$ hadoop fs -setrep 3 /nba/LAL/kobe.txt Replication 3 set: /nba/LAL/kobe.txt
python 客户端操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 from hdfs import InsecureClient from hdfs.client import Client # 读取hdfs文件内容,将每行存入数组返回 def read_hdfs_file(client, filename): lines = [] with client.read(filename, encoding='utf-8', delimiter='\n') as reader: for line in reader: lines.append(line.strip()) return lines def write_hdfs_file(client, filename, content): with client.write(filename) as writer: writer.write(content) if __name__ == '__main__': client = InsecureClient('http://localhost:9870', user='zj') # 读取文件内容 # print(read_hdfs_file(client, "/nba/LAL/kobe.txt")) # 写文件,不可以有重复文件 # write_hdfs_file(client, "/nba/LAL/kobe2.txt", b"content") # 追加数据到hdfs文件 # client.write("/nba/LAL/kobe.txt", "append", overwrite=False, append=True) # 覆盖数据写到hdfs文件 # client.write("/nba/LAL/kobe.txt", "overwrite", overwrite=True, append=False) # 获取目录下所有文件和文件夹 # print(client.list("/nba/LAL")) # 获取文件或文件夹的元信息 # print(client.content("/nba/LAL")) # 获取文件或文件夹的元信息 # print(client.status("/nba/LAL")) # 移动或修改文件或文件夹 # client.rename("/nba", "/NBA") # 创建目录 # client.makedirs("/new/dir") # 删除文件或目录 # client.delete('/NBA', recursive=True) # 上传文件 # client.upload("/new", "/etc/hosts") # 下载文件或文件夹到本地 # client.download('/NBA', '/home/zj/', n_threads=5) # 获取文件夹下所有的目录以及目录下的文件, 可以指定深度 # files = client.walk("/", 3) # for dpath, _, fnames in files: # print(dpath, fnames) # 使用 content() 或 status() 判断文件是否存在, 加入参数 strict=False,如果问价不存在返回 None print(client.content("/not/exist/file", strict=False)) # None print(client.status("/not/exist/file", strict=False)) # None
其他功能 集群间数据拷贝 linux 中的 scp
命令可以在主机之间复制数据:
1 2 3 4 5 6 # 向主机发送文件 scp -r test.txt root@host:/path/test.txt # 向主机拉取文件 scp -r root@host:/path/test.txt test.txt # 两个远程主机之间复制文件 scp -r root@host1:/path/test.txt root@host2:/path/test.txt
hadoop 也提供了相似的功能用于实现 hadoop 集群之间的递归数据复制
1 shell> hadoop distcp hdfs://host1:9000/path/test.txt hdfs://host2:9000/path/test.txt
hadoop 存档 Hadoop存档文件或HAR文件,是一个更高效的文件存档工具,它将文件存入HDFS块,在减少namenode内存使用的同时允许对文件进行透明的访问。Hadoop存档文件可以用作MapReduce的输入。
将目录 /path/data
下的文件归档成一个 data.har
文件存入/path/har
:
1 2 3 4 5 6 7 8 9 # 启动 yarn 进程 shell> start-yarn.sh # 归档文件: shell> bin/hadoop archive -archiveName data.har -p /path/data /path/har # 查看归档 shell> hadoop fs -ls -R /path/har/data.har shell> hadoop fs -ls -R har:///path/har/data.har # 接归档文件 shell> hadoop fs -cp har:///path/har/data.har/* /path/data