您的位置:首页 > 运维架构

Hadoop文件的基本操作

2015-06-16 16:00 381 查看
Hadoop提供了大量的API对文件系统中的文件进行操作,主要包括:

(1)读取文件

(2)写文件

(3)读取文件属性

(4)列出文件

(5)删除文件

(6)移动文件

(7)取出最旧的文件

1、读取文件

以下示例中,将hdfs中的一个文件读取出来,并输出到标准输出流中。

package org.jediael.hadoopdemo.fsdemo;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileSystemDoubleCat {

public static void main(String[] args) throws IOException {

String fileName = args[0];
Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(fileName), conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(fileName));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(0);
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
in.close();
}

}

}
(1)其中FSDataInputStream实现了Seekable接口,可以对文件进行随机定位,但注意,seek()的代价较高,如无必要,尽量少使用。

2、文件复制

package org.jediael.hadoopdemo.fsdemo;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileCopy {

public static void main(String[] args) throws IOException {
String sourceFile = args[0];
String destFile = args[1];

InputStream in = null;
OutputStream out = null;
try {
//1、准备输入流
in = new BufferedInputStream(new FileInputStream(sourceFile));
//2、准备输出流
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destFile), conf);
out = fs.create(new Path(destFile));
//3、复制
IOUtils.copyBytes(in, out, 4096, false);
} finally {
in.close();
out.close();
}

}

}


3、获取文件属性

文件属性以FileStatus对象进行封装,使用FileSystem对象的getFileStatus()方法,可以获取到文件的FileStatus对象。

package org.jediael.hadoopdemo.fsdemo;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileStatusDemo {

public static void main(String[] args) throws IOException {

String fileName = args[0];

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(fileName), conf);
//获取FileSystem对象。
FileStatus status = fs.getFileStatus(new Path(fileName));
System.out.println(status.getOwner()+" "+status.getModificationTime());

}

}


4、列出某个目录下的文件

使用FileSystem的ListStatus方法,可以获取到某个目录下所有文件的FileStatus对象。

package org.jediael.hadoopdemo.fsdemo;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

public class ListStatusDemo {

public static void main(String[] args) throws IOException {

String dir = args[0];

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dir), conf);
FileStatus[] stats =  fs.listStatus(new Path(dir));

Path[] paths = FileUtil.stat2Paths(stats);
for(Path path : paths){
System.out.println(path);
}
}

}


6、移动文件

hadoop没有提供专门的移动文件API,而是使用rename达到相同效果

public class CopyToLocal {

public static void main(String[] args) throws IOException{
String srcFile = "/trident/ljh/trident-4-3-1436154760161.txt";
Path srcPath = new Path(srcFile);
String srcDir = srcPath.getParent().toString();
String srcName = srcPath.getName();

String destFile = srcDir.toString() + "/done/"+srcName;

FileSystem fs = FileSystem.get(new Configuration());
fs.rename(srcPath, new Path(destFile));

}

}


注意如果移动新目录中,必须先创建目录,rename不能自动创建目录。

7、

以下代码返回最旧的文件,一般用于文件处理时先处理最旧的一个。

private static Path getOldestFile() {
Configuration conf = new Configuration();
Path srcPath = null;
FileStatus[] stats = null;

//列出所有文件
try {
//if (hdfsFs == null) {
hdfsFs = FileSystem.get(new URI(dir), conf);
//}
stats = hdfsFs.listStatus(new Path(dir));
} catch (Exception e) {
e.printStackTrace();
}

// 1、当目录内只有"done"目录时,sleep60秒后再检查
while (stats.length == 1) {
try {
Thread.sleep(60000);
stats = hdfsFs.listStatus(new Path(“<span style="font-family: Arial, Helvetica, sans-serif;">hdfs://192.168.169.91:9000/trident/ljh</span><span style="font-family: Arial, Helvetica, sans-serif;">”));</span>
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

//2、取得修改时间最早的那个文件的index。
long minModiticationTime = Long.MAX_VALUE;
int minModificationTimeIndex = 0;
for(int i = 0; i < stats.length; i++){
if(stats[i].getModificationTime() < minModiticationTime && !stats[i].isDirectory()){
minModiticationTime = stats[i].getModificationTime();
minModificationTimeIndex = i;
}
}
//srcStatus = hdfsFs.getFileStatus(srcPath);
srcPath = stats[minModificationTimeIndex].getPath();
System.out.println(srcPath.getParent()+srcPath.getName());

//3、若文件的最新修改时间与当前时间相减少于此文件生成的周期,则说明文件有可能还在写入之中,暂时不作处理。单位为ms。
while(new Date().getTime() - minModiticationTime < FILE_GEN_PERIOD){

try {
Thread.sleep(60000);
minModiticationTime = hdfsFs.getFileStatus(srcPath).getModificationTime();
} catch (Exception e) {
e.printStackTrace();
}
}

System.out.println(srcPath.getName());
return srcPath;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: