Java分布式组件 - - RPC(手写一个RPC)
2018-03-06 17:44
405 查看
RPC
Remote Procedure Call – 远程调用过程。 它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议构成
4000网络协议
TCP或UDP Socket编程
输入输出流(对象流)
ObjectOutputStream : 将 Java 对象的基本数据类型和图形写入 OutputStream。可以使用 ObjectInputStream 读取(重构)对象。通过在流中使用文件可以实现对象的持久存储。如果流是网络套接字流,则可以在另一台主机上或另一个进程中重构对象。
ObjectInputStream : 对 ObjectOutputStream 写入的基本数据和对象进行反序列化。
动态代理
动态代理技术就是用来产生一个对象的代理对象的
java手写RPC
Server 提供服务调用和服务实现 对外暴露服务接口
/** * RPC--服务端基本方法 */ public interface Server { /** * Socket 端口 */ int PORT = 8080; /** * 启动服务端 */ void start() throws IOException; /** * 停止服务端 */ void stop(); /** * 服务注册 * @param serviceInterface -- 对外暴露服务接口 * @param impl -- 内部实现类 */ void regist(Class<? extends IRpcService> serviceInterface, Class<? extends IRpcService> impl); }
public class ServerCenter implements Server { /** * 线程池 接收客户端调用 */ private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10)); /** * 服务注册缓存 */ private static final HashMap<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>(); /** * 启动服务 */ @Override public void start() throws IOException { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(PORT)); try { for (;;) { executor.execute(new ServiceTask(server.accept())); } } finally { server.close(); } } /** * 停止服务 */ @Override public void stop() { executor.shutdown(); } /** * 注册服务 */ @Override public void regist(Class<? extends IRpcService> serviceInterface, Class<? extends IRpcService> impl) { serviceRegistry.put(serviceInterface.getName(), impl); } /** *服务具体调度--对象流反序列化,反射调用本地服务,并输出结果到客户端 */ private static class ServiceTask implements Runnable { Socket clent = null; public ServiceTask(Socket client) { this.clent = client; } @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { input = new ObjectInputStream(clent.getInputStream()); String serviceName = input.readUTF(); String methodName = input.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); Class<?> serviceClass = serviceRegistry.get(serviceName); if (serviceClass == null) { throw new ClassNotFoundException(serviceName + " not found"); } Method method = serviceClass.getMethod(methodName, parameterTypes); Object result = method.invoke(serviceClass.newInstance(), arguments); // 3.将执行结果反序列化,通过socket发送给客户端 output = new ObjectOutputStream(clent.getOutputStream()); output.writeObject(result); } catch (Exception e) { e.printStackTrace(); } finally { if (output != null) { try { output.close(); } catch (IOException e) { e.printStackTrace(); } } if (input != null) { try { input.close(); } catch (IOException e) { e.printStackTrace(); } } if (clent != null) { try { clent.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
服务端暴露服务接口
/** * 加入泛型,规范调用 */ public interface IRpcService extends Serializable{ } /** * 服务端暴露服务接口,客户端可以直接调用 */ public interface IHelloService extends IRpcService{ String sayHi(String name, String msg); }
服务端接口实现
/** * 服务端接口实现 */ public class HelloServiceImpl implements IHelloService { private static final long serialVersionUID = 1L; @Override public String sayHi(String name, String msg) { return new StringBuffer().append("hi~! ").append(name).append(",").append(msg).toString(); } }
客户端调用RPC服务
public class Client { /** * 客户端代理获取远程服务对象 * @param serviceInterface * @param addr * @return */ @SuppressWarnings("unchecked") public static <T extends IRpcService> T getRemoteProxyObj(Class<? extends IRpcService> serviceInterface,InetSocketAddress addr){ return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try { // 1.创建Socket客户端,根据指定地址连接远程服务提供者 socket = new Socket(); socket.connect(addr); // 2.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者 output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(serviceInterface.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); // 3.同步阻塞等待服务器返回应答,获取应答后返回 input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); } finally { if (socket != null) socket.close(); if (output != null) output.close(); if (input != null) input.close(); } } }); } }
测试
public class RpcTest { public static void main(String[] args) throws IOException { IHelloService service = Client.getRemoteProxyObj(IHelloService.class, new InetSocketAddress("localhost", 8080)); System.out.println(service.sayHi("张三", "新年快乐,大吉大利!")); } }
相关文章推荐
- 从零手写一个JAVA连接池组件,窥视架构团队的开发日常
- java组件写一个GUI 计算器
- DUBBO 一个高性能,基于Java的开源RPC框架
- 一个简单RPC,java实现
- 由浅入深写java分布式(4)基于注解 dubbo 一个app同时存在consumer和provider自启动失败的问题,以dubbo和spring注解加载顺序的问题
- 一个关于Java JFrame 无法初始化组件问题
- JAVA--第十周实验--编写一个JFrame,标题为“计算的窗口”,在该窗口中组件的布局是FlowLayout
- Java 注册监听器的方法总结(自身类this+外部类+内部类+匿名内部类+适配器Adapter+一个组件注册多个监听器)
- 采用Best effort 1pc + 回滚补偿机制实现的一个distributed transaction (分布式事务框架).基于dubbo rpc服务上实现。
- java组件写一个GUI 计算器
- java--第十周--任务二 编写一个JFrame,在该窗口中组件的布局是FlowLayout。窗口中添加两个文本区,当我们在一个文本区中输入若干个数时,另一个文本区同时对输入的数进行
- 一个轻量级分布式RPC框架--NettyRpc
- 自己动手写工具(一)一个用Java8实现的内存级别的缓存
- EhCache 是一个纯Java的进程内缓存框架,具有快速、精干等特点,是hibernate中默认的CacheProvider Ehcache是一种广泛使用的开源Java分布式缓存。主要面向通
- Java分布式开发中的RPC
- 阿里巴巴 2015 实习笔试题 分布式系统中的RPC请求经常出现乱序的情况 写一个算法来将一个乱序的序列保序输出
- java组件写一个GUI 计算器
- 一个轻量级分布式RPC框架--NettyRpc(六)
- JAVA--编写一个JFrame,标题为“计算的窗口”,在该窗口中组件的布局是FlowLayout。窗口中添加两个文本区,当我们在一个文本区中输入若干个数时,另一个文本区同时对输入的数进行求和运算并求
- java组件写一个GUI 计算器