您的位置:首页 > 编程语言 > Java开发

客户端用java api 远程操作HDFS以及远程提交MR任务(源码和异常处理)

2013-12-05 15:16 411 查看
两个类,一个HDFS文件操作类,一个是wordcount 词数统计类,都是从网上看来的。上代码:

[java] view
plaincopy

package mapreduce;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.BlockLocation;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hdfs.DistributedFileSystem;

import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

import org.apache.hadoop.io.IOUtils;

/**

* file operation on HDFS

* @author liuxingjiaofu

*

*/

public class HDFS_File {

//read the file from HDFS

public void ReadFile(Configuration conf, String FileName){

try{

FileSystem hdfs = FileSystem.get(conf);

FSDataInputStream dis = hdfs.open(new Path(FileName));

IOUtils.copyBytes(dis, System.out, 4096, false);

dis.close();

}catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

//copy the file from HDFS to local

public void GetFile(Configuration conf, String srcFile, String dstFile){

try {

FileSystem hdfs = FileSystem.get(conf);

Path srcPath = new Path(srcFile);

Path dstPath = new Path(dstFile);

hdfs.copyToLocalFile(true,srcPath, dstPath);

}catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

//copy the local file to HDFS

public void PutFile(Configuration conf, String srcFile, String dstFile){

try {

FileSystem hdfs = FileSystem.get(conf);

Path srcPath = new Path(srcFile);

Path dstPath = new Path(dstFile);

hdfs.copyFromLocalFile(srcPath, dstPath);

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

//create the new file

public FSDataOutputStream CreateFile(Configuration conf, String FileName){

try {

FileSystem hdfs = FileSystem.get(conf);

Path path = new Path(FileName);

FSDataOutputStream outputStream = hdfs.create(path);

return outputStream;

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

//rename the file name

public boolean ReNameFile(Configuration conf, String srcName, String dstName){

try {

Configuration config = new Configuration();

FileSystem hdfs = FileSystem.get(config);

Path fromPath = new Path(srcName);

Path toPath = new Path(dstName);

boolean isRenamed = hdfs.rename(fromPath, toPath);

return isRenamed;

}catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return false;

}

//delete the file

// tyep = true, delete the directory

// type = false, delete the file

public boolean DelFile(Configuration conf, String FileName, boolean type){

try {

FileSystem hdfs = FileSystem.get(conf);

Path path = new Path(FileName);

boolean isDeleted = hdfs.delete(path, type);

return isDeleted;

}catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return false;

}

//Get HDFS file last modification time

public long GetFileModTime(Configuration conf, String FileName){

try{

FileSystem hdfs = FileSystem.get(conf);

Path path = new Path(FileName);

FileStatus fileStatus = hdfs.getFileStatus(path);

long modificationTime = fileStatus.getModificationTime();

return modificationTime;

}catch(IOException e){

e.printStackTrace();

}

return 0;

}

//check if a file exists in HDFS

public boolean CheckFileExist(Configuration conf, String FileName){

try{

FileSystem hdfs = FileSystem.get(conf);

Path path = new Path(FileName);

boolean isExists = hdfs.exists(path);

return isExists;

}catch(IOException e){

e.printStackTrace();

}

return false;

}

//Get the locations of a file in the HDFS cluster

public List<String []> GetFileBolckHost(Configuration conf, String FileName){

try{

List<String []> list = new ArrayList<String []>();

FileSystem hdfs = FileSystem.get(conf);

Path path = new Path(FileName);

FileStatus fileStatus = hdfs.getFileStatus(path);

BlockLocation[] blkLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());

int blkCount = blkLocations.length;

for (int i=0; i < blkCount; i++) {

String[] hosts = blkLocations[i].getHosts();

list.add(hosts);

}

return list;

}catch(IOException e){

e.printStackTrace();

}

return null;

}

//Get a list of all the nodes host names in the HDFS cluster

// have no authorization to do this operation

public String[] GetAllNodeName(Configuration conf){

try{

FileSystem fs = FileSystem.get(conf);

DistributedFileSystem hdfs = (DistributedFileSystem) fs;

DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

String[] names = new String[dataNodeStats.length];

for (int i = 0; i < dataNodeStats.length; i++) {

names[i] = dataNodeStats[i].getHostName();

}

return names;

}catch(IOException e){

System.out.println("error!!!!");

e.printStackTrace();

}

return null;

}

}

wordcount

[java] view
plaincopy

package mapreduce;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class mywordcount {

public static class wordcountMapper extends

Mapper<LongWritable, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{

String line = value.toString();

StringTokenizer itr = new StringTokenizer(line);

while(itr.hasMoreElements()){

word.set(itr.nextToken());

context.write(word, one);

}

}

}

public static class wordcountReducer extends

Reducer<Text, IntWritable, Text, IntWritable>{

public void reduce(Text key, Iterable<IntWritable>values, Context context)throws IOException, InterruptedException{

int sum = 0;

for (IntWritable str : values){

sum += str.get();

}

context.write(key, new IntWritable(sum));

}

}

/**

* 2 args, the file you want to count words from and the directory you want to save the result

* @param args /home/hadooper/testmp/testtext /home/hadooper/testmp/testresult

* @throws Exception

*/

public static void main(String args[])throws Exception{

//首先定义两个临时文件夹,这里可以使用随机函数+文件名,这样重名的几率就很小。

String dstFile = "temp_src";

String srcFile = "temp_dst";

//这里生成文件操作对象。

HDFS_File file = new HDFS_File();

Configuration conf = new Configuration();

// must!!! config the fs.default.name be the same to the value in core-site.xml

conf.set("fs.default.name","hdfs://node1");

[java] view
plaincopy

conf.set("mapred.job.tracker","node1:54311");

//从本地上传文件到HDFS,可以是文件也可以是目录

file.PutFile(conf, args[0], dstFile);

System.out.println("up ok");

Job job = new Job(conf, "mywordcount");

job.setJarByClass(mywordcount.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setMapperClass(wordcountMapper.class);

job.setReducerClass(wordcountReducer.class);

job.setCombinerClass(wordcountReducer.class);

//注意这里的输入输出都应该是在HDFS下的文件或目录

FileInputFormat.setInputPaths(job, new Path(dstFile));

FileOutputFormat.setOutputPath(job, new Path(srcFile));

//开始运行

job.waitForCompletion(true);

//从HDFS取回文件保存至本地

file.GetFile(conf, srcFile, args[1]);

System.out.println("down the result ok!");

//删除临时文件或目录

file.DelFile(conf, dstFile, true);

file.DelFile(conf, srcFile, true);

System.out.println("delete file on hdfs ok!");

}

}

期间,遇到几个错误:

1.HDFS版本问题--Call to node1/172.*.*.*:8020 failed on local exception: java.io.EOFException

main() {……

Configuration conf = new Configuration();

conf.set("fs.default.name","hdfs://node1");//与conf/core-site里的值对应,必须

HDFS_File file = new HDFS_File();

//print all the node name

String[] host_name = file.GetAllNodeName(conf);

……}

public String[] GetAllNodeName(Configuration conf){

try{

// Configuration config = new Configuration();

FileSystem fs = FileSystem.get(conf);

DistributedFileSystem hdfs = (DistributedFileSystem) fs;

DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

String[] names = new String[dataNodeStats.length];

for (int i = 0; i < dataNodeStats.length; i++) {

names[i] = dataNodeStats[i].getHostName();

}

return names;

}catch(IOException e){

System.out.println("eeeeeeeeeeeeeeeeeeeerror!!!!");

e.printStackTrace();

}

return null;

}

异常:

eeeeeeeeeeeeeeeeeeeerror!!!!

java.io.IOException: Call to node1/172.10.39.250:8020 failed on local exception: java.io.EOFException

at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)

at org.apache.hadoop.ipc.Client.call(Client.java:743)

at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)

at $Proxy0.getProtocolVersion(Unknown Source)

at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)

at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:112)

at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:213)

at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:176)

at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1378)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1390)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:196)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:95)

at mapreduce.HDFS_File.GetAllNodeName(HDFS_File.java:151)

at mapreduce.File_Operation.main(File_Operation.java:15)

Caused by: java.io.EOFException

at java.io.DataInputStream.readInt(DataInputStream.java:392)

at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:501)

at org.apache.hadoop.ipc.Client$Connection.run(Client.java:446)

Exception in thread "main" java.lang.NullPointerException

at mapreduce.File_Operation.main(File_Operation.java:16)

原因:版本问题,确保java中的jar包跟hadoop集群的jar包是相同版本的

2.HDFS
权限问题



org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security.AccessControlException: Permission denied: user=hadooper, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x

解决方案之

(1 added this entry to conf/hdfs-site.xml

<property>

<name>dfs.permissions</name>

<value>false</value>

</property>

(2.放开 要写入目录 hadoop 目录的权限 , 命令如下 :$ hadoop fs -chmod 777 /user/

我用的是第2种方案

3.HDFS 2011-12-20 17:00:32 org.apache.hadoop.util.NativeCodeLoader <clinit>

警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

在Hadoop的配置文件core-site.xml中可以设置是否使用本地库:

<property>

<name>hadoop.native.lib</name>

<value>true</value>

<description>Should native hadoop libraries, if present, be used.</description>

</property>

Hadoop默认的配置为启用本地库。

另外,可以在环境变量中设置使用本地库的位置:

export JAVA_LIBRARY_PATH=/path/to/hadoop-native-libs

有的时候也会发现Hadoop自带的本地库无法使用,这种情况下就需要自己去编译本地库了。在$HADOOP_HOME目录下,使用如下命令即可:

ant compile-native

编译完成后,可以在$HADOOP_HOME/build/native目录下找到相应的文件,然后指定文件的路径或者移动编译好的文件到默认目录下即可

我试了下,那个是64位的,我电脑是32位的,没有源代码,编译不了,那只好一段段程序的试,找出哪段代码出了这个警告,我的是

try {

FileSystem hdfs = FileSystem.get(conf);

Path srcPath = new Path(srcFile);

Path dstPath = new Path(dstFile);

hdfs.copyToLocalFile(true,srcPath, dstPath);//定位到此句

}catch (IOException e) {

到了此步,便只能如此了,为什么呢,java不是跨平台的吗

4.MR-jar包缺失

ClassNotFoundException: org.codehaus.jackson.map.JsonMappingException

NoClassDefFoundError: org/apache/commons/httpclient/HttpMethod

添加jar包到java工程中

jackson-core-asl-1.5.2.jar

jackson-mapper-asl-1.5.2.jar

commons-httpclient-3.0.1.jar

我是不习惯将所有Jar包都加到工程里,觉得这样很容易便加多了,浪费时空。

完成第一次mapreduce,不错!

5.远程的JOB挂掉了,居然还能运行成功,发现是mapred.job.tracker属性没设,默认在local下运行,其值在namenode的mapred-site.xml中看

conf.set("mapred.job.tracker","node1:54311");

配置完了,运行可以初始化,但是找不到mapper类:

信息: Task Id : attempt_201112221123_0010_m_000000_0, Status : FAILED

java.lang.RuntimeException: java.lang.ClassNotFoundException: mapreduce.mywordcount$wordcountMapper

at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:996)

at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:212)

at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:611)

at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)

at org.apache.hadoop.mapred.Child$4.run(Child.java:270)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:396)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)

at org.apache.hadoop.mapred.Child.main(Child.java:264)

将程序打成jar包放到hadoop集群的jobtracker上可用,正常,结果也正确,但是在客户端运行却报上述错误,暂时还没解决。

总结

1.远程操作HDFS文件以及远程提交MR任务,必须配置的两项(其他暂时还没发现):

conf.set("fs.default.name","hdfs://node1");//与conf/core-site.xml里的值对应,必须

conf.set("mapred.job.tracker","node1:54311");//mapred-site.xml
2.耐心分析问题,解决问题
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐