Hadoop编程调用HDFS

浏览: 2091

hadoop-hdfs-api

前言

HDFS 全称Hadoop分步文件系统(Hadoop Distributed File System),是Hadoop的核心部分之一。要实现MapReduce的分步式算法时,数据必需提前放在HDFS上。因此,对于HDFS的操作就变得非常重要。Hadoop的命令行,提供了一套完整命令接口,就像Linux命令一样方便使用。

不过,有时候我们还需要在程序中直接访问HDFS,我们可以通过API的方式进行HDFS操作。

目录

  1. 系统环境
  2. ls操作
  3. rmr操作
  4. mkdir操作
  5. copyFromLocal操作
  6. cat操作
  7. copyToLocal操作
  8. 创建一个新文件,并写入内容

1. 系统环境

Hadoop集群环境

Linux Ubuntu 64bit Server 12.04.2 LTS

Java 1.6.0_29

Hadoop 1.1.2

前言

HDFS 全称Hadoop分步文件系统(Hadoop Distributed File System),是Hadoop的核心部分之一。要实现MapReduce的分步式算法时,数据必需提前放在HDFS上。因此,对于HDFS的操作就变得非常重要。Hadoop的命令行,提供了一套完整命令接口,就像Linux命令一样方便使用。

不过,有时候我们还需要在程序中直接访问HDFS,我们可以通过API的方式进行HDFS操作。

目录

  1. 系统环境
  2. ls操作
  3. rmr操作
  4. mkdir操作
  5. copyFromLocal操作
  6. cat操作
  7. copyToLocal操作
  8. 创建一个新文件,并写入内容

1. 系统环境

Hadoop集群环境

Linux Ubuntu 64bit Server 12.04.2 LTS

Java 1.6.0_29

Hadoop 1.1.2

如何搭建Hadoop集群环境? 请参考文章:Hadoop历史版本安装

开发环境

Win7 64bit

Java 1.6.0_45

Maven 3

Hadoop 1.1.2

Eclipse Juno Service Release 2

如何用Maven搭建Win7的Hadoop开发环境? 请参考文章:用Maven构建Hadoop项目

注:hadoop-core-1.1.2.jar,已重新编译,已解决了Win远程调用Hadoop的问题,请参考文章:Hadoop历史版本安装

Hadooop命令行:java FsShell


    ~ hadoop fs

    Usage: java FsShell
    [-ls ]
    [-lsr ]
    [-du ]
    [-dus ]
    [-count[-q] ]
    [-mv ]
    [-cp ]
    [-rm [-skipTrash] ]
    [-rmr [-skipTrash] ]
    [-expunge]
    [-put ... ]
    [-copyFromLocal ... ]
    [-moveFromLocal ... ]
    [-get [-ignoreCrc] [-crc] ]
    [-getmerge [addnl]]
    [-cat ]
    [-text ]
    [-copyToLocal [-ignoreCrc] [-crc] ]
    [-moveToLocal [-crc] ]
    [-mkdir ]
    [-setrep [-R] [-w] ]
    [-touchz ]
    [-test -[ezd] ]
    [-stat [format] ]
    [-tail [-f] ]
    [-chmod [-R] PATH...]
    [-chown [-R] [OWNER][:[GROUP]] PATH...]
    [-chgrp [-R] GROUP PATH...]
    [-help [cmd]]

    上面列出了30个命令,我只实现了一部分的HDFS的命令!

    新建文件:HdfsDAO.java,用来调用HDFS的API。


    public class HdfsDAO {

    //HDFS访问地址
    private static final String HDFS = "hdfs://192.168.1.210:9000/";

    public HdfsDAO(Configuration conf) {
    this(HDFS, conf);
    }

    public HdfsDAO(String hdfs, Configuration conf) {
    this.hdfsPath = hdfs;
    this.conf = conf;
    }

    //hdfs路径
    private String hdfsPath;
    //Hadoop系统配置
    private Configuration conf;

    //启动函数
    public static void main(String[] args) throws IOException {
    JobConf conf = config();
    HdfsDAO hdfs = new HdfsDAO(conf);
    hdfs.mkdirs("/tmp/new/two");
    hdfs.ls("/tmp/new");
    }

    //加载Hadoop配置文件
    public static JobConf config(){
    JobConf conf = new JobConf(HdfsDAO.class);
    conf.setJobName("HdfsDAO");
    conf.addResource("classpath:/hadoop/core-site.xml");
    conf.addResource("classpath:/hadoop/hdfs-site.xml");
    conf.addResource("classpath:/hadoop/mapred-site.xml");
    return conf;
    }

    //API实现
    public void cat(String remoteFile) throws IOException {...}
    public void mkdirs(String folder) throws IOException {...}

    ...
    }

    2. ls操作

    说明:查看目录文件

    对应Hadoop命令:


    ~ hadoop fs -ls /
    Found 3 items
    drwxr-xr-x - conan supergroup 0 2013-10-03 05:03 /home
    drwxr-xr-x - Administrator supergroup 0 2013-10-03 13:49 /tmp
    drwxr-xr-x - conan supergroup 0 2013-10-03 09:11 /user

    Java程序:


    public void ls(String folder) throws IOException {
    Path path = new Path(folder);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    FileStatus[] list = fs.listStatus(path);
    System.out.println("ls: " + folder);
    System.out.println("==========================================================");
    for (FileStatus f : list) {
    System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
    }
    System.out.println("==========================================================");
    fs.close();
    }

    public static void main(String[] args) throws IOException {
    JobConf conf = config();
    HdfsDAO hdfs = new HdfsDAO(conf);
    hdfs.ls("/");
    }

    控制台输出:

    ls: /
    ==========================================================
    name: hdfs://192.168.1.210:9000/home, folder: true, size: 0
    name: hdfs://192.168.1.210:9000/tmp, folder: true, size: 0
    name: hdfs://192.168.1.210:9000/user, folder: true, size: 0
    ==========================================================

    3. mkdir操作

    说明:创建目录,可以创建多级目录

    对应Hadoop命令:


    ~ hadoop fs -mkdir /tmp/new/one
    ~ hadoop fs -ls /tmp/new
    Found 1 items
    drwxr-xr-x - conan supergroup 0 2013-10-03 15:35 /tmp/new/one

    Java程序:


    public void mkdirs(String folder) throws IOException {
    Path path = new Path(folder);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    if (!fs.exists(path)) {
    fs.mkdirs(path);
    System.out.println("Create: " + folder);
    }
    fs.close();
    }

    public static void main(String[] args) throws IOException {
    JobConf conf = config();
    HdfsDAO hdfs = new HdfsDAO(conf);
    hdfs.mkdirs("/tmp/new/two");
    hdfs.ls("/tmp/new");
    }

    控制台输出:


    Create: /tmp/new/two
    ls: /tmp/new
    ==========================================================
    name: hdfs://192.168.1.210:9000/tmp/new/one, folder: true, size: 0
    name: hdfs://192.168.1.210:9000/tmp/new/two, folder: true, size: 0
    ==========================================================

    4. rmr操作

    说明:删除目录和文件

    对应Hadoop命令:


    ~ hadoop fs -rmr /tmp/new/one
    Deleted hdfs://master:9000/tmp/new/one

    ~ hadoop fs -ls /tmp/new
    Found 1 items
    drwxr-xr-x - Administrator supergroup 0 2013-10-03 15:38 /tmp/new/two

    Java程序:


    public void rmr(String folder) throws IOException {
    Path path = new Path(folder);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    fs.deleteOnExit(path);
    System.out.println("Delete: " + folder);
    fs.close();
    }

    public static void main(String[] args) throws IOException {
    JobConf conf = config();
    HdfsDAO hdfs = new HdfsDAO(conf);
    hdfs.rmr("/tmp/new/two");
    hdfs.ls("/tmp/new");
    }

    控制台输出:


    Delete: /tmp/new/two
    ls: /tmp/new
    ==========================================================
    ==========================================================

    5. copyFromLocal操作

    说明:复制本地文件系统到HDFS

    对应Hadoop命令:


    ~ hadoop fs -copyFromLocal /home/conan/datafiles/item.csv /tmp/new/

    ~ hadoop fs -ls /tmp/new/
    Found 1 items
    -rw-r--r-- 1 conan supergroup 210 2013-10-03 16:07 /tmp/new/item.csv

    Java程序:


    public void copyFile(String local, String remote) throws IOException {
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    fs.copyFromLocalFile(new Path(local), new Path(remote));
    System.out.println("copy from: " + local + " to " + remote);
    fs.close();
    }

    public static void main(String[] args) throws IOException {
    JobConf conf = config();
    HdfsDAO hdfs = new HdfsDAO(conf);
    hdfs.copyFile("datafile/randomData.csv", "/tmp/new");
    hdfs.ls("/tmp/new");
    }

    控制台输出:


    copy from: datafile/randomData.csv to /tmp/new
    ls: /tmp/new
    ==========================================================
    name: hdfs://192.168.1.210:9000/tmp/new/item.csv, folder: false, size: 210
    name: hdfs://192.168.1.210:9000/tmp/new/randomData.csv, folder: false, size: 36655
    ==========================================================

    6. cat操作

    说明:查看文件内容

    对应Hadoop命令:


    ~ hadoop fs -cat /tmp/new/item.csv
    1,101,5.0
    1,102,3.0
    1,103,2.5
    2,101,2.0
    2,102,2.5
    2,103,5.0
    2,104,2.0
    3,101,2.5
    3,104,4.0
    3,105,4.5
    3,107,5.0
    4,101,5.0
    4,103,3.0
    4,104,4.5
    4,106,4.0
    5,101,4.0
    5,102,3.0
    5,103,2.0
    5,104,4.0
    5,105,3.5
    5,106,4.0

    Java程序:


    public void cat(String remoteFile) throws IOException {
    Path path = new Path(remoteFile);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    FSDataInputStream fsdis = null;
    System.out.println("cat: " + remoteFile);
    try {
    fsdis =fs.open(path);
    IOUtils.copyBytes(fsdis, System.out, 4096, false);
    } finally {
    IOUtils.closeStream(fsdis);
    fs.close();
    }
    }

    public static void main(String[] args) throws IOException {
    JobConf conf = config();
    HdfsDAO hdfs = new HdfsDAO(conf);
    hdfs.cat("/tmp/new/item.csv");
    }

    控制台输出:


    cat: /tmp/new/item.csv
    1,101,5.0
    1,102,3.0
    1,103,2.5
    2,101,2.0
    2,102,2.5
    2,103,5.0
    2,104,2.0
    3,101,2.5
    3,104,4.0
    3,105,4.5
    3,107,5.0
    4,101,5.0
    4,103,3.0
    4,104,4.5
    4,106,4.0
    5,101,4.0
    5,102,3.0
    5,103,2.0
    5,104,4.0
    5,105,3.5
    5,106,4.0

    7. copyToLocal操作

    说明:从HDFS复制文件在本地操作系

    对应Hadoop命令:


    ~ hadoop fs -copyToLocal /tmp/new/item.csv /home/conan/datafiles/tmp/

    ~ ls -l /home/conan/datafiles/tmp/
    -rw-rw-r-- 1 conan conan 210 Oct 3 16:16 item.csv

    Java程序:


    public void download(String remote, String local) throws IOException {
    Path path = new Path(remote);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    fs.copyToLocalFile(path, new Path(local));
    System.out.println("download: from" + remote + " to " + local);
    fs.close();
    }

    public static void main(String[] args) throws IOException {
    JobConf conf = config();
    HdfsDAO hdfs = new HdfsDAO(conf);
    hdfs.download("/tmp/new/item.csv", "datafile/download");

    File f = new File("datafile/download/item.csv");
    System.out.println(f.getAbsolutePath());
    }

    控制台输出:


    2013-10-12 17:17:32 org.apache.hadoop.util.NativeCodeLoader
    警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    download: from/tmp/new/item.csv to datafile/download
    D:\workspace\java\myMahout\datafile\download\item.csv

    8. 创建一个新文件,并写入内容

    说明:创建一个新文件,并写入内容。

    • touchz:可以用来创建一个新文件,或者修改文件的时间戳。
    • 写入内容没有对应命令。

    对应Hadoop命令:


    ~ hadoop fs -touchz /tmp/new/empty

    ~ hadoop fs -ls /tmp/new
    Found 3 items
    -rw-r--r-- 1 conan supergroup 0 2013-10-03 16:24 /tmp/new/empty
    -rw-r--r-- 1 conan supergroup 210 2013-10-03 16:07 /tmp/new/item.csv
    -rw-r--r-- 3 Administrator supergroup 36655 2013-10-03 16:09 /tmp/new/randomData.csv

    ~ hadoop fs -cat /tmp/new/empty

    Java程序:


    public void createFile(String file, String content) throws IOException {
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    byte[] buff = content.getBytes();
    FSDataOutputStream os = null;
    try {
    os = fs.create(new Path(file));
    os.write(buff, 0, buff.length);
    System.out.println("Create: " + file);
    } finally {
    if (os != null)
    os.close();
    }
    fs.close();
    }

    public static void main(String[] args) throws IOException {
    JobConf conf = config();
    HdfsDAO hdfs = new HdfsDAO(conf);
    hdfs.createFile("/tmp/new/text", "Hello world!!");
    hdfs.cat("/tmp/new/text");
    }

    控制台输出:


    Create: /tmp/new/text
    cat: /tmp/new/text
    Hello world!!

作者介绍:

张丹,R语言中文社区专栏特邀作者,《R的极客理想》系列图书作者,民生银行大数据中心数据分析师,前况客创始人兼CTO。

10年IT编程背景,精通R ,Java, Nodejs 编程,获得10项SUN及IBM技术认证。丰富的互联网应用开发架构经验,金融大数据专家。个人博客 http://fens.me, Alexa全球排名70k。

著有《R的极客理想-工具篇》、《R的极客理想-高级开发篇》,合著《数据实践之美》,新书《R的极客理想-量化投资篇》(即将出版)。

Clipboard Image.png

《R的极客理想-工具篇》京东购买快速通道:https://item.jd.com/11524750.html

《R的极客理想-高级开发篇》京东购买快速通道:https://item.jd.com/11731967.html

《数据实践之美》京东购买快速通道:https://item.jd.com/12106224.html

推荐 2
本文由 张丹 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册