您的位置:首页 > 编程语言 > Java开发

【HDFS】HDFS中JAVA API的使用

2016-05-31 09:04 453 查看

HDFS中JAVA API的使用

HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。下面记录一下使用JAVA API对HDFS中的文件进行操作的过程。

Configuration类:该类的对象封转了客户端或者服务器的配置。

FileSystem类:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作。FileSystem fs = FileSystem.get(conf);通过FileSystem的静态方法get获得该对象。

FSDataInputStream和FSDataOutputStream:这两个类是HDFS中的输入输出流。分别通过FileSystem的open方法和create方法获得。

HDFS API URL 方式操作

import java.io.InputStream;
import java.net.URL;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

/*
* HDFS API URL 方式操作
*/
public class HDFSUrlTest {

//让JAVA 程序识别HDFS的Url
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}

//查看文件内容
@Test
public void testRead(){
InputStream in = null;

//文件路径
String fileUrl = "hdfs://192.168.99.110:9000/user/hadoop/input/test2.txt";

try {
//获取文件输入流
in = new URL(fileUrl).openStream();

//将文件内容打出来输入到控制台
IOUtils.copyBytes(in, System.out, 4096, false);
} catch (Exception e) {
e.printStackTrace();
}finally{
IOUtils.closeStream(in);
}
}
}


HDFS FileSystem API 方式操作

注:要想成功运行需导入以下两个配置文件到自己工程下

core-site.xml


hdfs-site.xml


HDFS工具类,用来获取配置信息及文件系统

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
/*
* HDFS 工具类
*/
public class HDFSUtils {

public static FileSystem getFileSystem(){

FileSystem hdfs = null;

try {
//获取配置文件信息
Configuration conf = new Configuration();
//          conf.addResource(new Path("/conf/core-site.xml"));
//获取文件系统
hdfs = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
}

return hdfs;
}
}


具体操作示例

import java.io.IOException;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class HDFSFsTest {

//读取文件内容
@Test
public void testRead() throws IOException{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//文件名称
Path path = new Path("/user/hadoop/input/test2.txt");

//打开文件输入流-----open
FSDataInputStream inStream = hdfs.open(path);

//读取文件内容到控制台显示-----read
IOUtils.copyBytes(inStream, System.out, 4096, false);

//关闭流------close
IOUtils.closeStream(inStream);
}

//查看目录
@Test
public void testList() throws IOException{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//目录
Path path = new Path("/user/hadoop/");

FileStatus[] fileStatus = hdfs.listStatus(path);

for(FileStatus fs : fileStatus){
Path p = fs.getPath();
String info = fs.isDir() ? "目录" : "文件" ;
System.out.println(info + ":" + p);
}
}

//创建目录
@Test
public void testDirectory() throws Exception{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//目录
Path path = new Path("/usr/hadoop/dir");

//创建目录
boolean bool = hdfs.mkdirs(path);

//返回结果为true创建成功
System.out.println(bool);
}

//上传文件
@Test
public void testPut() throws Exception{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//本地路径 目录 + 文件名称
Path srcPath = new Path("c:/jdk-8u92-linux-x64.gz");

//目标路径
Path dstPath = new Path("/usr/hadoop/test/");

hdfs.copyFromLocalFile(srcPath, dstPath);

}

//创建HDFS文件并写入内容
@Test
public void testCreat() throws Exception{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//HDFS上传文件路径
Path path = new Path("/usr/hadoop/dir/toich.data");

//获取文件并上传输出流
FSDataOutputStream fs = hdfs.create(path);

//通过输出流写内容
fs.writeUTF("Hello Hadoop!!!");

fs.close();
}

//对HDFS上文件进行重命名
@Test
public void testRename() throws Exception{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//源文件名称
Path srcPath = new Path("/usr/hadoop/dir/toich.data");

//目标文件名称
Path dstPath = new Path("/usr/hadoop/dir/touch.data");

//重命名
boolean bool = hdfs.rename(srcPath, dstPath);

System.out.println(bool);
}

//删除文件
@Test
public void testDelete() throws Exception{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//HDFS 文件路径
Path path = new Path("/usr/hadoop/dir/touch.data");

//删除
//      boolean bool = hdfs.delete(path);

boolean bool = hdfs.deleteOnExit(path);

System.out.println(bool);
}

//删除目录
@Test
public void testDeleteDir() throws Exception{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//HDFS 文件路径
Path path = new Path("/usr/hadoop/dir/");

//删除
boolean bool = hdfs.delete(path, true);

System.out.println(bool);
}

//查找某个文件在集群的位置
@Test
public void testLocation() throws Exception{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

//HDFS 文件上传路径
Path path = new Path("/usr/hadoop/test/jdk-8u92-linux-x64.gz");

FileStatus fs = hdfs.getFileStatus(path);

BlockLocation[] blockLocations =  hdfs.getFileBlockLocations(fs, 0, fs.getLen());

for(BlockLocation blockLocation : blockLocations){

String[] hosts = blockLocation.getHosts();

for(String host : hosts){
System.out.print(host + "  ");
}
System.out.println();
}
}

//获取HDFS集群上所有节点名称信息
@Test
public void testCluster() throws Exception{

//获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();

DistributedFileSystem distributeFileSyatem = (DistributedFileSystem) hdfs;

DatanodeInfo[] datanodeInfos =  distributeFileSyatem.getDataNodeStats();

for(DatanodeInfo datanodeInfo : datanodeInfos){

String datanode = datanodeInfo.getHostName();

System.out.println(datanode);
}

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  api java