您的位置:首页 > 编程语言 > Java开发

Java程序调用Hadoop接口入门

2017-04-24 20:29 435 查看
一、安装Hadoop集群环境

参考http://blog.itpub.net/29485627/viewspace-2137702/

 

二、程序编写

1 整个程序的目录为



 

2 HdfsUtil.java中的代码为

package hadoop.hdfs;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author 作者 : yangyang
* @version 创建时间:2016年4月21日 类说明 :hdfs文件系统操作类
*/
public class HdfsUtil {

private static final Logger log = LoggerFactory.getLogger(HdfsUtil.class);
// 初始化
static Configuration conf = new Configuration();
static FileSystem hdfs;
static {
try {
hdfs = FileSystem.get(conf);
} catch (IOException e) {
if (log.isDebugEnabled())
log.debug("初始化hadoop配置失败", e);
}
}

/**
* 创建文件夹
*
* @param dir
* @throws IOException
*/
public static boolean mkDirs(String dir) {
try {
Path path = new Path(dir);
return hdfs.mkdirs(path);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}

/**
* 本地文件上传到hdfs
*
* @param localSrc
* @param hdfsDst
* @throws IOException
*/
public static void uploadFile(String localSrc, String hdfsDst) throws IOException {
Path src = new Path(localSrc);
Path dst = new Path(hdfsDst);
hdfs.copyFromLocalFile(src, dst);

FileStatus files[] = hdfs.listStatus(dst);
System.out.println("Upload to \t" + conf.get("fs.default.name")
+ hdfsDst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
}

/**
* 下载文件到本地
* @param remotePath hdfs文件目录
* @param localPath 本地文件目录
*/
public static void downLoadFile(String remotePath, String localPath) {

Path _remotePath = new Path(remotePath);
Path _localPath = new Path(localPath);

try {
hdfs.copyToLocalFile(false,_remotePath, _localPath,true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {

try {
hdfs.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}

/**
* 创建文件
*
* @param fileName
* @param fileContent
* @throws IOException
*/
public static void createFile(String fileName, String fileContent) {
Path dst = new Path(fileName);
byte[] bytes = fileContent.getBytes();
FSDataOutputStream output;

try {
output = hdfs.create(dst);
output.write(bytes);
} catch (IOException e) {
if (log.isDebugEnabled())
log.debug("创建文件异常:" + fileName, e);
}
System.out.println("new file \t" + conf.get("fs.default.name")
+ fileName);
}

/**
* 获取文件内容
* @param fileName 文件名
* @return
*/
public static String readFileContent(String fileName){
Path p = new Path(conf.get("fs.default.name")+fileName);
FSDataInputStream in = null;
String content = "";

try {
in = hdfs.open(p);
BufferedReader buff = new BufferedReader(new InputStreamReader(in));
content = buff.readLine();
buff.close();
in.close();
hdfs.close();
} catch (IOException e) {
// TODO Auto-generated catch block
if(log.isDebugEnabled())
log.debug("读取文件:"+fileName+"失败", e);
}

return content;
}

public void listFiles(String dirName) throws IOException {
Path f = new Path(dirName);
FileStatus[] status = hdfs.listStatus(f);
System.out.println(dirName + " has all files:");
for (int i = 0; i < status.length; i++) {
System.out.println(status[i].getPath().toString());
}
}

/**
* 删除文件
*
* @param fileName
* 文件路径
* @throws IOException
*/
public static boolean deleteFile(String fileName) throws IOException {
Path f = new Path(fileName);
boolean isExists = hdfs.exists(f);
if (isExists) {
boolean isDel = hdfs.delete(f, true);
return isDel;
} else {
return false;
}
}

/**
* 获取集群上的所有节点名称
*
* @throws IOException
*/
public static List<DatanodeInfo> getDateNodeHost() throws IOException {
DistributedFileSystem _hdfs = (DistributedFileSystem) hdfs;
DatanodeInfo[] dataNodeStats = _hdfs.getDataNodeStats();
List<DatanodeInfo> dataNodeLst = Arrays.asList(dataNodeStats);
return dataNodeLst;
}

/**
* 文件重命名
*
* @param fileName
* 文件名
* @param newFileName
* 新文件名
* @throws IOException
*/
public static boolean renameFile(String fileName, String newFileName) {

Path path = new Path(fileName);
Path newPath = new Path(newFileName);
boolean b = false;
try {
b = hdfs.rename(path, newPath);
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("文件:[" + fileName + "]修改为:[" + newFileName + "]失败", e);
}
return b;
}

public static void main(String[] args) throws IOException {
/* System.err.println(mkDirs("/test"));
createFile("/test/my.txt", "I Love Beijing!");
System.out.println(readFileContent("/test/my.txt"));*/
// deleteFile("/test/my.txt");
// uploadFile("F:\\a1.txt", "/test");
downLoadFile("/test/a1.txt", "F:\\a2.txt");
}
}

 
3 src/main/resouces中的四个配置文件从hadoop环境中拷贝,具体位于/usr/hadoop/etc/hadoop中

 

三、验证

1 运行HdfsUtil中的main方法的上三行(下三行先注释起来),结果为

true
new file    hdfs://192.168.121.201:9000/test/my.txt

 

在hadoop环境中验证

# hadoop fs -ls /

# hadoop fs -ls /test



 

2 将main方法中的第四行代码打开,其他行代码都注释,重新跑一次。

进hadoop环境中验证,发现my.txt文件已成功删除



3 建立F:\a1.txt文件,其中的内容随意输入,比如“abcdefg”。

将main方法中的第五行代码打开,其他行代码都注释,重新跑一次。运行结果为:

Uploadto   hdfs://192.168.121.201:9000/test
hdfs://192.168.121.201:9000/test/a1.txt

进hadoop环境中验证,发现a1.txt已经被传到hadoop中



4 将main方法中的第六行代码打开,其他行代码都注释,重新跑一次。发现hadoop中的/test/a1.txt已被下载到F:\a2.txt,其中的内容为“abcdefg”

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: