您的位置:首页 > 编程语言 > Java开发

java对HDFS中文件的操作

2018-01-11 11:36 204 查看
1 概述:

1   1     Hadoop 提供一类api可以通过java工程操作hdfs中的文件,包括:文件打开,读写,删除等、

           目录的创建,删除,读取目录中所有文件等。

2   2    需要处理hadoop的文件程序需要引入下面jar包,可以hadoop-1.2.1\lib找到。

     hadoop-core和common-log,commons-configuration-1.6.jar,commons-lang-2.4.jar。

 

3  3 想要运行写好的程序操作hdfs中文件两种方法:1 安装elipse的hadoop插件(之前有介绍)  2、将写好成程序打成jar包,在服务器上用hadoop命令运行

 

4  4  程序处理步骤: 1) 得到Configuration对象2)得到FileSystem对象 3)进行文件操作。

 

    5 以下是操作文件的事例程序,版本 hadoop-1.2.1

 

Java代码  


package org.tony.hdfs;  

  

import java.io.IOException;  

  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.fs.FSDataInputStream;  

import org.apache.hadoop.fs.FSDataOutputStream;  

import org.apache.hadoop.fs.FileStatus;  

import org.apache.hadoop.fs.FileSystem;  

import org.apache.hadoop.fs.Path;  

  

  

public class HDFSTest {  

    public static void main(String[] args) throws Exception  

    {  

        //uploadLocalFile2HDFS("E:/1.txt","/tmp/1.txt");//E盘下文件传到hdfs上  

        //createNewHDFSFile("/tmp/create2", "hello");  

        //String str = new String(readHDFSFile("/tmp/create2"));  

        //System.out.println(str);  

          

        //mkdir("/tmp/testdir");  

        //deleteDir("/tmp/testdir");  

        //listAll("/tmp/");  

                getDateNodeHost();  

    }  

    //获取HDFS集群上所有节点名称信息  

    public static void getDateNodeHost() throws IOException{  

          

        Configuration conf = getConf();  

  

               FileSystem fs=FileSystem.get(conf);  

               DistributedFileSystem hdfs = (DistributedFileSystem)fs;  

               DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();  

               for(int i=0;i<dataNodeStats.length;i++){  

                  System.out.println("DataNode_"+i+"_Name:"+dataNodeStats[i].getHostName());  

               }  

    }  

    /* 

     * upload the local file to the hds  

     * 路径是全路径 

     */  

    public static void uploadLocalFile2HDFS(String s, String d)   

        throws IOException  

    {  

        Configuration conf = getConf();  

        FileSystem hdfs = FileSystem.get(conf);  

        Path src = new Path(s);  

        Path dst = new Path(d);  

        hdfs.copyFromLocalFile(src, dst);  

        hdfs.close();  

    }  

      

    /* 

     * create a new file in the hdfs. 

     * notice that the toCreateFilePath is the full path 

     * and write the content to the hdfs file. 

     */  

    public static void createNewHDFSFile(String toCreateFilePath, String content) throws IOException  

    {  

        Configuration conf = getConf();  

        FileSystem hdfs = FileSystem.get(conf);  

          

        FSDataOutputStream os = hdfs.create(new Path(toCreateFilePath));  

        os.write(content.getBytes("UTF-8"));  

        os.close();  

        hdfs.close();  

    }  

      

    /* 

     * delete the hdfs file  

     * notice that the dst is the full path name 

     */  

    public static boolean deleteHDFSFile(String dst) throws IOException  

    {  

        Configuration conf = getConf();  

        FileSystem hdfs = FileSystem.get(conf);  

          

        Path path = new Path(dst);  

        boolean isDeleted = hdfs.delete(path);  

        hdfs.close();  

        return isDeleted;  

    }  

      

      

    /* 

     * read the hdfs file content 

     * notice that the dst is the full path name 

     */  

    public static byte[] readHDFSFile(String dst) throws Exception  

    {  

        Configuration conf = getConf();  

        FileSystem fs = FileSystem.get(conf);  

          

        // check if the file exists  

        Path path = new Path(dst);  

        if ( fs.exists(path) )  

        {  

            FSDataInputStream is = fs.open(path);  

            // get the file info to create the buffer  

            FileStatus stat = fs.getFileStatus(path);  

            // create the buffer  

            byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];  

            is.readFully(0, buffer);  

              

            is.close();  

            fs.close();  

              

            return buffer;  

        }  

        else  

        {  

            throw new Exception("the file is not found .");  

        }  

    }  

      

      

    /* 

     * make a new dir in the hdfs 

     * the dir may like '/tmp/testdir' 

     */  

    public static void mkdir(String dir) throws IOException  

    {  

        Configuration conf = getConf();  

        FileSystem fs = FileSystem.get(conf);  

        fs.mkdirs(new Path(dir));  

          

        fs.close();  

    }  

      

    /* 

     * delete a dir in the hdfs 

     * dir may like '/tmp/testdir' 

     */  

    public static void deleteDir(String dir) throws IOException  

    {  

        Configuration conf = getConf();  

        FileSystem fs = FileSystem.get(conf);  

        fs.delete(new Path(dir));  

        fs.close();  

    }  

    //文件系统连接到 hdfs的配置信息   

    private static Configuration getConf(){  

        Configuration conf = new Configuration();  

        // 这句话很关键,这些信息就是hadoop配置文件中的信息  

        conf.set("mapred.job.tracker", "192.168.102.136:9001");  

        conf.set("fs.default.name", "hdfs://192.168.102.136:9000");  

        return conf;  

    }  

      

    /** 

    * @Title: listAll  

    * @Description: 列出目录下所有文件  

    * @return void    返回类型  

    * @throws 

     */  

    public static void listAll(String dir) throws IOException  

    {  

        Configuration conf = getConf();  

        FileSystem fs = FileSystem.get(conf);  

        FileStatus[] stats = fs.listStatus(new Path(dir));  

        for(int i = 0; i < stats.length; ++i)  

        {  

            if (!stats[i].isDir())  

            {  

                // regular file  

                System.out.println(stats[i].getPath().toString());  

            }  

            else   

            {  

                // dir  

                System.out.println(stats[i].getPath().toString());  

            }  

//          else if(stats[i].())  

//          {  

//              // is s symlink in linux  

//              System.out.println(stats[i].getPath().toString());  

//          }  

                  

        }  

        fs.close();  

    }  

      

}  

 

 注意:因为hadoop读写文件时候需要权限,如果你window用户名为admin而hadoop集群中目录权限是hadoop用户,读写会失败

  1 修改window系统用户为hadoop

  2 elipse运行的jvm参数中设置 -DHADOOP_USER_NAME=hadoop ,运行的用户为hadoop 

  3 部署一个linux环境,在linux环境中开发是比较容易的(用户问题容易设置)。

  4 运行的时候指定用户 ,如下:

   Configuration conf = new Configuration();

FileSystem hdfs = FileSystem.get(new URI("hdfs://192.168.142.133:9000"),conf,"hadoop");

原文http://username2.iteye.com/blog/2158925
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: