您的位置:首页 > 编程语言 > Java开发

hadoop的java操作方式--RPC示例

2015-01-15 10:57 399 查看
1、如果eclipse连接不上hdfs的原因如下:

01、用ip配置:core-site.xml中的fs.default.name,比如hdfs://192.168.208.15:9000

mapred-site.xml中的mapred.job.tracker,比如:192.168.208.15:9001

注意这里要配置主机名对应的ip,这里我的主机名是hadooptest,所以要在/etc/hosts下配置

127.0.0.1 hadooptest,不然datanode和JobTracker起不来

02、用主机名配置:core-site.xml中的fs.default.name,比如hdfs://hadooptest:9000

mapred-site.xml中的mapred.job.tracker,比如:hadooptest:9001

要在etc/hosts下配置192.168.208.15 hadooptest

不然eclipse和下linux终端上执行hadoop fs -lsr hdfs://192.168.208.15:9000/,都是连接不上的

2、java代码:

import java.io.File;

import java.io.FileInputStream;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.net.URI;

import java.net.URISyntaxException;

import javax.swing.filechooser.FileSystemView;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.tools.rumen.ConcatenatedInputFilesDemuxer;

public class OperateHdfs {

private final static String PATH = "hdfs://192.168.208.15:9000";

//得到文件操作类

public static FileSystem getFileSystem() throws IOException, URISyntaxException{

return FileSystem.get(new URI(PATH), new Configuration());

}

//创建文件夹

public static void mkdir(String dir,FileSystem fileSystem) throws IOException{

fileSystem.mkdirs(new Path(dir));

}

//删除文件夹和文件

public static void remove(String dir,FileSystem fileSystem) throws IOException{

fileSystem.delete(new Path(dir), true);

}

//上传文件

public static void putData(FileSystem fileSystem,String LocalSrc,String hafsDes) throws IOException{

FSDataOutputStream out = fileSystem.create(new Path(hafsDes));

//OutputStream out = fileSystem.create(new Path(hafsDes));//用OutputStream接也行

FileInputStream in = new FileInputStream(new File(LocalSrc));

IOUtils.copyBytes(in, out, new Configuration(), true);

}

//下载文件

public static void downLoad(FileSystem fileSystem,String hdfsSrc,String LocalTarget) throws IOException{

FSDataInputStream in = fileSystem.open(new Path(hdfsSrc));

//InputStream in = fileSystem.open(new Path(hdfsSrc));//用InputStream接也行

//OutputStream out = System.out;

FileOutputStream out = new FileOutputStream(new File(LocalTarget));

IOUtils.copyBytes(in, out, new Configuration(),true);

}

//浏览文件和文件夹

public static void list(FileSystem filesystem,String dir) throws Exception{

FileStatus[] fileStatus = filesystem.listStatus(new Path(dir));

for(int i=0;i<fileStatus.length;i++){

FileStatus fileS = fileStatus[i];

String path = fileS.getPath().toString();

String isFile = fileS.isDir()?"文件夹":"文件";

String owner = fileS.getOwner();

String group = fileS.getGroup();

String permission = fileS.getPermission().toString();

String modification_time = fileS.getModificationTime()+"";

String access_time = fileS.getAccessTime()+"";

String blocksize = fileS.getBlockSize()+"";

System.out.println("path:"+path+" isFile:"+isFile+" owner:"+owner+" group:"+group+" permission"+permission+" FsPermission:"+

" modification_time:"+modification_time+" access_time:"+access_time+" blocksize:"+

blocksize);

}

}

//测试

public static void main(String[] args) throws Exception {

FileSystem filesystem = getFileSystem();

mkdir("/test/test2",filesystem);

//remove("/test",filesystem);

//putData(filesystem,"C:\\Users\\Administrator\\Desktop\\hello.txt","\\test\\test2\\hello.txt");

//downLoad(filesystem, "/test/test2/hello.txt", "C:\\Users\\Administrator\\Desktop\\download.txt");

list(filesystem, "/test");

}

}

(注意 这里的putData方法往hdfs上传数据的时候,查看文件属性是,副本数是3,但是在linux终端操作上传的时候,副本数是1,最主要的问题是:在eclipse底下只写了java代码,没有配置文件,默认读取源码中的配置,源码中的配置dfs.replication值是3,所以我们要把linux上的配置拷贝到eclipse的src下,起到覆盖配置的作用,和我们启动的hdfs配置参数一样,总共copy3个参数文件:core-site.xml,hdfs-site.xml,mapred-site.xml:




3、可以用URL来连接HDFS:

import java.io.InputStream;

import java.net.URL;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;

import org.apache.hadoop.io.IOUtils;

public class App1 {

static final String PATH = "hdfs://chaoren:9000/hello";

public static void main(String[] args) throws Exception {

URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());

final URL url = new URL(PATH);

final InputStream in = url.openStream();

/**

* @param in
表示输入流

* @param out
表示输出流

* @param buffSize表示缓冲大小

* @param close 表示在传输结束后是否关闭流

*/

IOUtils.copyBytes(in, System.out, 1024, true);

}

}

4、RPC远程调用示例:

接口:

import org.apache.hadoop.ipc.VersionedProtocol;

public interface MyBizable extends VersionedProtocol{

public final static long VERSION = 234567L;

abstract String getName(String name);

}

实现类:

import java.io.IOException;

public class MyBiz implements MyBizable {

@Override

public long getProtocolVersion(String protocol, long clientVersion)

throws IOException {

// TODO Auto-generated method stub

return MyBizable.VERSION;

}

@Override

public String getName(String name) {

// TODO Auto-generated method stub

return "hello: "+name;

}

}

服务端:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;

import org.apache.hadoop.ipc.Server;

public class MyServer {

public final static String ADDRESS = "127.0.0.1";

public final static int port = 23455;

public static void main(String[] args) throws Exception {

/**

* 构造一个RPC的服务端.

* @param instance 这个实例中的方法会被调用

* @param bindAddress 绑定的地址是用于监听连接的

* @param port 绑定的端口是用于监听连接的

* @param conf the configuration to use

*/

Server server = RPC.getServer(new MyBiz(), ADDRESS, port, new Configuration());

server.start();

}

}

客户端:

import java.io.IOException;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;

public class MyClient {

public static void main(String[] args) throws IOException {

/**

* 构造一个客户端代理对象,该代理对象实现了命名的协议。代理对象会与指定地址的服务端通话

*/

MyBizable proxy = (MyBizable) RPC.getProxy(MyBizable.class, MyBizable.VERSION, new InetSocketAddress(MyServer.ADDRESS, MyServer.port),new Configuration());

String result = proxy.getName("xiaomei");

System.out.println("result: "+result);

//关闭网络连接

RPC.stopProxy(proxy);//这步一定要有,不然会报错

}

}

/**

1.RPC

1.1 RPC (remote procedure call)远程过程调用.

远程过程指的是不是同一个进程。

1.2 RPC至少有两个过程。调用方(client),被调用方(server)。

1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。

1.4 RPC是hadoop构建的基础。

2. 通过例子获得的认识?

2.1 RPC是一个远程过程调用。

2.2 客户端调用服务端的方法,意味着调用服务端的对象中的方法。

2.3 如果服务端的对象允许客户端调用,那么这个对象必须实现接口。

2.4 如果客户端能够调用到服务端对象的方法,那么这些方法一定位于对象的接口中。

**/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: