您的位置:首页 > 运维架构

hadoop 读流程和写流程

2016-03-02 17:28 267 查看
hadoop HDFD读流程
hadoop HDFD写流程

package com.lhj.hadoop;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
* 创建路径并上传文件,hdfs://node1:9000/test/input/aaa.txt
* 导出jar文件到node1
* 执行jar包:hadoop jar HdfsUtils.jar com.lhj.hadoop.HdfsUtils
* @author Administrator
*
* 读的流程:
* 1.客户端通过FileSystem的open方法打开一个输入流。FileSystem具体的实现是DistributedFileSystem这个类。
* 2.从namenode获取block的位置信息
* 3.从datanode读取数据,(通过流的方式FSDataInputStream)
* 4.close关闭流
*
* write的流程:
* 1.客户端通过FileSystem的create方法,创建一个输出流
* 2.往namenode写元数据
* 3.往其中一个datanode写数据,由datanode产生线程,复制到其他datanode上
* 4.close关闭流
*/
public class HdfsUtils {
public static void main(String[] args) {
HdfsUtils hdfsUtils = new HdfsUtils();
hdfsUtils.read();
hdfsUtils.write();
}

private void read() {
Path path = new Path("hdfs://node1:9000/test/input/aaa.txt");
Configuration conf = new Configuration();
BufferedReader reader=null;

try {
FSDataInputStream in = FileSystem.get(conf).open(path);//关键代码,FS打开路径
reader = new BufferedReader(new InputStreamReader(in));

String line=null;
while ((line=reader.readLine()) != null) {
System.out.println(line);
}
} catch (IOException e) {
e.printStackTrace();
} finally{
if (reader!=null){
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

private void write(){
Path path = new Path("hdfs://node1:9000/test/input/bbb.txt");
Configuration conf = new Configuration();
FSDataOutputStream output = null;

String[] contents=new String[]{
"aaaaaaaaaaa\n",
"bbbbbbbbbbb\n",
"ccccccccccc\n",
"\n"
};

try {
FileSystem fs = path.getFileSystem(conf);
output = fs.create(path);

for (String line:contents){
output.write(line.getBytes("UTF-8"));
output.flush();
}

} catch (IOException e) {
e.printStackTrace();
} finally{
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息