Java程序调用Hadoop接口入门
2017-04-24 20:29
435 查看
一、安装Hadoop集群环境
参考http://blog.itpub.net/29485627/viewspace-2137702/
二、程序编写
1 整个程序的目录为
2 HdfsUtil.java中的代码为
package hadoop.hdfs;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author 作者 : yangyang
* @version 创建时间:2016年4月21日 类说明 :hdfs文件系统操作类
*/
public class HdfsUtil {
private static final Logger log = LoggerFactory.getLogger(HdfsUtil.class);
// 初始化
static Configuration conf = new Configuration();
static FileSystem hdfs;
static {
try {
hdfs = FileSystem.get(conf);
} catch (IOException e) {
if (log.isDebugEnabled())
log.debug("初始化hadoop配置失败", e);
}
}
/**
* 创建文件夹
*
* @param dir
* @throws IOException
*/
public static boolean mkDirs(String dir) {
try {
Path path = new Path(dir);
return hdfs.mkdirs(path);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}
/**
* 本地文件上传到hdfs
*
* @param localSrc
* @param hdfsDst
* @throws IOException
*/
public static void uploadFile(String localSrc, String hdfsDst) throws IOException {
Path src = new Path(localSrc);
Path dst = new Path(hdfsDst);
hdfs.copyFromLocalFile(src, dst);
FileStatus files[] = hdfs.listStatus(dst);
System.out.println("Upload to \t" + conf.get("fs.default.name")
+ hdfsDst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
}
/**
* 下载文件到本地
* @param remotePath hdfs文件目录
* @param localPath 本地文件目录
*/
public static void downLoadFile(String remotePath, String localPath) {
Path _remotePath = new Path(remotePath);
Path _localPath = new Path(localPath);
try {
hdfs.copyToLocalFile(false,_remotePath, _localPath,true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
hdfs.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* 创建文件
*
* @param fileName
* @param fileContent
* @throws IOException
*/
public static void createFile(String fileName, String fileContent) {
Path dst = new Path(fileName);
byte[] bytes = fileContent.getBytes();
FSDataOutputStream output;
try {
output = hdfs.create(dst);
output.write(bytes);
} catch (IOException e) {
if (log.isDebugEnabled())
log.debug("创建文件异常:" + fileName, e);
}
System.out.println("new file \t" + conf.get("fs.default.name")
+ fileName);
}
/**
* 获取文件内容
* @param fileName 文件名
* @return
*/
public static String readFileContent(String fileName){
Path p = new Path(conf.get("fs.default.name")+fileName);
FSDataInputStream in = null;
String content = "";
try {
in = hdfs.open(p);
BufferedReader buff = new BufferedReader(new InputStreamReader(in));
content = buff.readLine();
buff.close();
in.close();
hdfs.close();
} catch (IOException e) {
// TODO Auto-generated catch block
if(log.isDebugEnabled())
log.debug("读取文件:"+fileName+"失败", e);
}
return content;
}
public void listFiles(String dirName) throws IOException {
Path f = new Path(dirName);
FileStatus[] status = hdfs.listStatus(f);
System.out.println(dirName + " has all files:");
for (int i = 0; i < status.length; i++) {
System.out.println(status[i].getPath().toString());
}
}
/**
* 删除文件
*
* @param fileName
* 文件路径
* @throws IOException
*/
public static boolean deleteFile(String fileName) throws IOException {
Path f = new Path(fileName);
boolean isExists = hdfs.exists(f);
if (isExists) {
boolean isDel = hdfs.delete(f, true);
return isDel;
} else {
return false;
}
}
/**
* 获取集群上的所有节点名称
*
* @throws IOException
*/
public static List<DatanodeInfo> getDateNodeHost() throws IOException {
DistributedFileSystem _hdfs = (DistributedFileSystem) hdfs;
DatanodeInfo[] dataNodeStats = _hdfs.getDataNodeStats();
List<DatanodeInfo> dataNodeLst = Arrays.asList(dataNodeStats);
return dataNodeLst;
}
/**
* 文件重命名
*
* @param fileName
* 文件名
* @param newFileName
* 新文件名
* @throws IOException
*/
public static boolean renameFile(String fileName, String newFileName) {
Path path = new Path(fileName);
Path newPath = new Path(newFileName);
boolean b = false;
try {
b = hdfs.rename(path, newPath);
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("文件:[" + fileName + "]修改为:[" + newFileName + "]失败", e);
}
return b;
}
public static void main(String[] args) throws IOException {
/* System.err.println(mkDirs("/test"));
createFile("/test/my.txt", "I Love Beijing!");
System.out.println(readFileContent("/test/my.txt"));*/
// deleteFile("/test/my.txt");
// uploadFile("F:\\a1.txt", "/test");
downLoadFile("/test/a1.txt", "F:\\a2.txt");
}
}
3 src/main/resouces中的四个配置文件从hadoop环境中拷贝,具体位于/usr/hadoop/etc/hadoop中
三、验证
1 运行HdfsUtil中的main方法的上三行(下三行先注释起来),结果为
true
new file hdfs://192.168.121.201:9000/test/my.txt
在hadoop环境中验证
# hadoop fs -ls /
# hadoop fs -ls /test
2 将main方法中的第四行代码打开,其他行代码都注释,重新跑一次。
进hadoop环境中验证,发现my.txt文件已成功删除
3 建立F:\a1.txt文件,其中的内容随意输入,比如“abcdefg”。
将main方法中的第五行代码打开,其他行代码都注释,重新跑一次。运行结果为:
Uploadto hdfs://192.168.121.201:9000/test
hdfs://192.168.121.201:9000/test/a1.txt
进hadoop环境中验证,发现a1.txt已经被传到hadoop中
4 将main方法中的第六行代码打开,其他行代码都注释,重新跑一次。发现hadoop中的/test/a1.txt已被下载到F:\a2.txt,其中的内容为“abcdefg”
参考http://blog.itpub.net/29485627/viewspace-2137702/
二、程序编写
1 整个程序的目录为
2 HdfsUtil.java中的代码为
package hadoop.hdfs;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author 作者 : yangyang
* @version 创建时间:2016年4月21日 类说明 :hdfs文件系统操作类
*/
public class HdfsUtil {
private static final Logger log = LoggerFactory.getLogger(HdfsUtil.class);
// 初始化
static Configuration conf = new Configuration();
static FileSystem hdfs;
static {
try {
hdfs = FileSystem.get(conf);
} catch (IOException e) {
if (log.isDebugEnabled())
log.debug("初始化hadoop配置失败", e);
}
}
/**
* 创建文件夹
*
* @param dir
* @throws IOException
*/
public static boolean mkDirs(String dir) {
try {
Path path = new Path(dir);
return hdfs.mkdirs(path);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}
/**
* 本地文件上传到hdfs
*
* @param localSrc
* @param hdfsDst
* @throws IOException
*/
public static void uploadFile(String localSrc, String hdfsDst) throws IOException {
Path src = new Path(localSrc);
Path dst = new Path(hdfsDst);
hdfs.copyFromLocalFile(src, dst);
FileStatus files[] = hdfs.listStatus(dst);
System.out.println("Upload to \t" + conf.get("fs.default.name")
+ hdfsDst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
}
/**
* 下载文件到本地
* @param remotePath hdfs文件目录
* @param localPath 本地文件目录
*/
public static void downLoadFile(String remotePath, String localPath) {
Path _remotePath = new Path(remotePath);
Path _localPath = new Path(localPath);
try {
hdfs.copyToLocalFile(false,_remotePath, _localPath,true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
hdfs.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* 创建文件
*
* @param fileName
* @param fileContent
* @throws IOException
*/
public static void createFile(String fileName, String fileContent) {
Path dst = new Path(fileName);
byte[] bytes = fileContent.getBytes();
FSDataOutputStream output;
try {
output = hdfs.create(dst);
output.write(bytes);
} catch (IOException e) {
if (log.isDebugEnabled())
log.debug("创建文件异常:" + fileName, e);
}
System.out.println("new file \t" + conf.get("fs.default.name")
+ fileName);
}
/**
* 获取文件内容
* @param fileName 文件名
* @return
*/
public static String readFileContent(String fileName){
Path p = new Path(conf.get("fs.default.name")+fileName);
FSDataInputStream in = null;
String content = "";
try {
in = hdfs.open(p);
BufferedReader buff = new BufferedReader(new InputStreamReader(in));
content = buff.readLine();
buff.close();
in.close();
hdfs.close();
} catch (IOException e) {
// TODO Auto-generated catch block
if(log.isDebugEnabled())
log.debug("读取文件:"+fileName+"失败", e);
}
return content;
}
public void listFiles(String dirName) throws IOException {
Path f = new Path(dirName);
FileStatus[] status = hdfs.listStatus(f);
System.out.println(dirName + " has all files:");
for (int i = 0; i < status.length; i++) {
System.out.println(status[i].getPath().toString());
}
}
/**
* 删除文件
*
* @param fileName
* 文件路径
* @throws IOException
*/
public static boolean deleteFile(String fileName) throws IOException {
Path f = new Path(fileName);
boolean isExists = hdfs.exists(f);
if (isExists) {
boolean isDel = hdfs.delete(f, true);
return isDel;
} else {
return false;
}
}
/**
* 获取集群上的所有节点名称
*
* @throws IOException
*/
public static List<DatanodeInfo> getDateNodeHost() throws IOException {
DistributedFileSystem _hdfs = (DistributedFileSystem) hdfs;
DatanodeInfo[] dataNodeStats = _hdfs.getDataNodeStats();
List<DatanodeInfo> dataNodeLst = Arrays.asList(dataNodeStats);
return dataNodeLst;
}
/**
* 文件重命名
*
* @param fileName
* 文件名
* @param newFileName
* 新文件名
* @throws IOException
*/
public static boolean renameFile(String fileName, String newFileName) {
Path path = new Path(fileName);
Path newPath = new Path(newFileName);
boolean b = false;
try {
b = hdfs.rename(path, newPath);
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("文件:[" + fileName + "]修改为:[" + newFileName + "]失败", e);
}
return b;
}
public static void main(String[] args) throws IOException {
/* System.err.println(mkDirs("/test"));
createFile("/test/my.txt", "I Love Beijing!");
System.out.println(readFileContent("/test/my.txt"));*/
// deleteFile("/test/my.txt");
// uploadFile("F:\\a1.txt", "/test");
downLoadFile("/test/a1.txt", "F:\\a2.txt");
}
}
3 src/main/resouces中的四个配置文件从hadoop环境中拷贝,具体位于/usr/hadoop/etc/hadoop中
三、验证
1 运行HdfsUtil中的main方法的上三行(下三行先注释起来),结果为
true
new file hdfs://192.168.121.201:9000/test/my.txt
在hadoop环境中验证
# hadoop fs -ls /
# hadoop fs -ls /test
2 将main方法中的第四行代码打开,其他行代码都注释,重新跑一次。
进hadoop环境中验证,发现my.txt文件已成功删除
3 建立F:\a1.txt文件,其中的内容随意输入,比如“abcdefg”。
将main方法中的第五行代码打开,其他行代码都注释,重新跑一次。运行结果为:
Uploadto hdfs://192.168.121.201:9000/test
hdfs://192.168.121.201:9000/test/a1.txt
进hadoop环境中验证,发现a1.txt已经被传到hadoop中
4 将main方法中的第六行代码打开,其他行代码都注释,重新跑一次。发现hadoop中的/test/a1.txt已被下载到F:\a2.txt,其中的内容为“abcdefg”
相关文章推荐
- Java及Web程序调用hadoop2.6
- Java的JNI:使用Java调用C程序(入门)
- 使用java调用hadoop接口学习笔记
- hadoop;RPC;调用接口;cmd的jps查看java进程;有main方法的类才能产生进程
- java微信小程序调用支付接口
- java程序调用.net接口服务地址的写法
- webservice实际应用之调用外部系统提供的服务端地址,写java的webservice客户端程序并通过接口给外部系统传数据
- java jni 入门1 - 一个简单的从Java程序中调用C函数
- 用Dubbo+zookeeper 调用java接口程序
- java程序实现短信发送(可调用免费短信接口)
- java入门程序100例学习笔记(005递归调用)
- 通用权限管理系统多语言开发接口 - java,php 调用接口程序,多业务子系统集成
- hadoop之hdfs的java接口调用和hadoop的rpc调用
- spark java程序入门(三)外部程序调用
- java通过jni调用C程序接口
- WebService入门java实现服务端接口调用
- ICTCLAS分词系统Java调用接口在Eclipse中的安装
- java调用外部程序执行
- 在JAVA中调用外部可执行程序
- java实现外部程序调用