使用Python操作hdfs

使用python的hdfs库操作hdfs

官网

安装

1
pip install hdfs

导入

1
from hdfs.client import Client

建立连接

1
client = Client("http://127.0.0.1:50070", root="/", timeout=100, session=False)

参数说明:
classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)
url:ip:端口
root:指定的hdfs根目录
proxy:制定登陆的用户身份
timeout:设置的超时时间
seesion:requests.Session instance, used to emit all requests.

常见错误

这里使用的是root用户登录,能看查看文件列表,但上传删除文件时会有以下错误

1
hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/test":root:supergroup:drwxr-xr-x

解决办法是:在配置文件hdfs-site.xml中加入

1
2
3
4
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>

重启集群即可

如果报连接不上错误
改一下本机的hosts文件,把集群的主机名和ip都写入hosts文件中
如我的集群信息是master,slave1,salve2
在hosts文件添加如下信息

1
2
3
10.4.20.100 master
10.4.20.101 slave1
10.4.20.102 slave2

window的hosts文件在c:\windows\system32\drivers\etc文件夹下
linux的hosts文件在/etc/hosts

常用操作

获取文件信息

1
client.status("/")

参数:status(hdfs_path, strict=True)
hdfs_path:就是hdfs路径
strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回None

list——获取指定路径的子目录信息

1
client.list("/")

参数:list(hdfs_path, status=False)

makedirs——创建目录

1
client.makedirs("/test")

参数:makedirs(hdfs_path, permission=None)
permission:设置权限

rename—重命名

1
client.rename("/test","/new_name")

delete—删除

1
client.delete("/new_name")

参数:delete(hdfs_path, recursive=False)
recursive:删除文件和其子目录,设置为False如果不存在或者删除一个不为空的文件,则会抛出异常,默认为False

upload——上传数据

1
client.upload("/test","/opt/bigdata/hadoop/NOTICE.txt")

参数:upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None, chunk_size=65536,progress=None, cleanup=True, **kwargs)
overwrite:是否是覆盖性上传文件
n_threads:启动的线程数目
temp_dir:当overwrite=true时,远程文件一旦存在,则会在上传完之后进行交换
chunk_size:文件上传的大小区间
progress:回调函数来跟踪进度,为每一chunk_size字节。它将传递两个参数,文件上传的路径和传输的字节数。一旦完成,-1将作为第二个参数
cleanup:如果在上传任何文件时发生错误,则删除该文件

download——下载

1
client.download("/test/NOTICE.txt","/home")

参数:download(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None, **kwargs)
参考上传 upload

read——读取文件

1
2
with client.read("/test/NOTICE.txt") as reader:
print(reader)

参数:read(*args, **kwds)
hdfs_path:hdfs路径
offset:设置开始的字节位置
length:读取的长度(字节为单位)
buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。
encoding:制定编码
chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个类似文件的对象
delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。
progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与- 1作为第二个参数。

简单封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#读取hdfs文件内容,将每行存入数组返回
def read_hdfs_file(client,filename):
#with client.read('samples.csv', encoding='utf-8', delimiter='\n') as reader:
# for line in reader:
#pass
lines = []
with client.read(filename, encoding='utf-8', delimiter='\n') as reader:
for line in reader:
#pass
#print line.strip()
lines.append(line.strip())
return lines

#创建目录
def mkdirs(client,hdfs_path) :
client.makedirs(hdfs_path)

#删除hdfs文件
def delete_hdfs_file(client,hdfs_path):
client.delete(hdfs_path)

#上传文件到hdfs
def put_to_hdfs(client,local_path,hdfs_path):
client.upload(hdfs_path, local_path,cleanup=True)

#从hdfs获取文件到本地
def get_from_hdfs(client,hdfs_path,local_path):
download(hdfs_path, local_path, overwrite=False)

#追加数据到hdfs文件
def append_to_hdfs(client,hdfs_path,data):
client.write(hdfs_path, data,overwrite=False,append=True)

#覆盖数据写到hdfs文件
def write_to_hdfs(client,hdfs_path,data):
client.write(hdfs_path, data,overwrite=True,append=False)

#移动或者修改文件
def move_or_rename(client,hdfs_src_path, hdfs_dst_path):
client.rename(hdfs_src_path, hdfs_dst_path)

#返回目录下的文件
def list(client,hdfs_path):
return client.list(hdfs_path, status=False)
打赏

请我喝杯咖啡吧~

支付宝
微信