您的位置:首页 > 其它

[置顶] 基于netty实现的远程服务框架

2016-09-30 14:53 555 查看

HSF服务管理平台

基于netty实现远程服务框架,为终端提供REST形式的HTTP服务。

目前只实现部分功能,可以提供REST形式和传统形式的HTTP服务,其特点主要包括:

基于netty实现http协议开发,作为服务端和客户端的通信桥梁

利用zk管理服务提供者,实现分布是部署

通过路由平台,随机分发请求,保证负载均衡

动态监控服务提供者的存活状态

服务提供者开发简单,易于接入

一、架构设计

 

 

 




 

二、流程




 三、服务端介绍

服务提供者引入一个核心jar,通过xml配置,即可发布服务。

核心jar的部分代码介绍

ZookeeperFactory类:

 

package com.ab.hsf.zk;

import com.ab.hsf.constants.Constants;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

import java.util.List;

/**
* zookeeper工厂类
*/
public class ZookeeperFactory {

public static Logger logger = Logger.getLogger(ZookeeperFactory.class);

/**
* zookeeper服务地址
*/
private String hosts;
/**
* 回话的超时时间(毫秒)
*/
private Integer sessionTimeOut;
/**
* 连接的超时时间(毫秒)
*/
private Integer connectionTimeOut;
/**
* 命名空间
*/
private String nameSpace;
/**
* zookeeper管理对象
*/
private CuratorFramework zkTools;
/**
* 应用ip:port
*/
private String appAddress;

/**
* 连接状态
*/
private String connectionState;

/**
* 连接
*/
public void connection() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, Integer.MAX_VALUE);
zkTools = CuratorFrameworkFactory
.builder()
.connectString(hosts)
.namespace(nameSpace)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeOut == null ? 30000 : connectionTimeOut)
.sessionTimeoutMs(sessionTimeOut == null ? 300000 : sessionTimeOut)
.build();
zkTools.start();
connectionState = "CONNECTED";
addListener();
}

/**
* 注册
*
* @param interFaceIds 接口服务列表
*/
public void register(List<String> interFaceIds) {
if (interFaceIds == null) {
logger.error("interface list is null");
return;
}
try {
for (String interFaceId : interFaceIds) {
String interFaceIdNode = Constants.SEPARATOR + interFaceId;    //节点路径
if (connectionState != null && (connectionState.equals("CONNECTED") || connectionState.equals("RECONNECTED"))) {
if (zkTools.checkExists().forPath(interFaceIdNode) == null) {       //无当前节点
zkTools.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(interFaceIdNode);//创建的路径和值
}

String ipNode = interFaceIdNode + Constants.SEPARATOR + this.getAppAddress();    //节点路径
if (zkTools.checkExists().forPath(ipNode) != null) {     //有当前IP的接点,则删除后,重新建立
zkTools.delete().forPath(ipNode);
}
zkTools.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(ipNode);//创建的路径和值
}
}
} catch (Exception e) {
logger.error("create zookeeper node failure", e);
}
}

/**
* 连接状态监听
*/
public void addListener() {
zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState.equals(ConnectionState.CONNECTED)) {
logger.info("连接");
connectionState = "CONNECTED";
}
if (newState.equals(ConnectionState.RECONNECTED)) {
logger.info("重新连接");
connectionState = "RECONNECTED";
connection();
}
if (newState.equals(ConnectionState.LOST)) {
logger.info("丢失");
connectionState = "LOST";
}
if (newState.equals(ConnectionState.SUSPENDED)) {
logger.info("暂停");
connectionState = "SUSPENDED";
}
if (newState.equals(ConnectionState.READ_ONLY)) {
logger.info("只读");
connectionState = "READ_ONLY";
}
}
});
}

/**
* 关闭连接
*/
public void close() {
if (zkTools != null) {
zkTools.close();
zkTools = null;
}
}

public String getHosts() {
return hosts;
}

public void setHosts(String hosts) {
this.hosts = hosts;
}

public Integer getSessionTimeOut() {
return sessionTimeOut;
}

public void setSessionTimeOut(Integer sessionTimeOut) {
this.sessionTimeOut = sessionTimeOut;
}

public Integer getConnectionTimeOut() {
return connectionTimeOut;
}

public void setConnectionTimeOut(Integer connectionTimeOut) {
this.connectionTimeOut = connectionTimeOut;
}

public String getNameSpace() {
return nameSpace;
}

public void setNameSpace(String nameSpace) {
this.nameSpace = nameSpace;
}

public String getAppAddress() {
return appAddress;
}

public void setAppAddress(String appAddress) {
this.appAddress = appAddress;
}
}

 

 

netty实现部分代码:

 

 

package com.ab.hsf.server.http;

import com.ab.hsf.server.HsfServer;
import com.ab.hsf.server.http.handler.HsfHttpServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

/**
* http服务类
* User: alex
* DateTime: 15-7-23 下午2:03
*/
public class HsfHttpServer implements HsfServer {

protected ServerBootstrap bootstrap = new ServerBootstrap();
protected EventLoopGroup bossGroup = new NioEventLoopGroup();
protected EventLoopGroup workerGroup = new NioEventLoopGroup();
protected int port = 8080;
private int backlog = 128;
private int maxRequestSize = 1024 * 1024 * 10;
protected boolean keepalive = false; // 是否长连接

/**
* 启动服务
*/
public void start() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// server端接收到的是httpRequest,所以要使用HttpRequestDecoder进行解码
ch.pipeline().addLast(new HttpRequestDecoder());
// server端发送的是httpResponse,所以要使用HttpResponseEncoder进行编码
ch.pipeline().addLast(new HttpResponseEncoder());
//HttpObjectAggregator会把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse
//                        ch.pipeline().addLast(new HttpObjectAggregator(maxRequestSize));
//解决粘包/半包问题
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 2, 0, 2));
//解决粘包/半包问题
ch.pipeline().addLast(new LengthFieldPrepender(2));
//压缩
//                        ch.pipeline().addLast(new HttpContentCompressor());
//处理类
ch.pipeline().addLast(new HsfHttpServerHandler());

}
})
.option(ChannelOption.SO_BACKLOG, backlog)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive);

ChannelFuture f = bootstrap.bind(port).syncUninterruptibly();
f.channel().closeFuture().syncUninterruptibly();
}

/**
* 停止服务
*/
public void stop() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public int getBacklog() {
return backlog;
}

public void setBacklog(int backlog) {
this.backlog = backlog;
}

public boolean isKeepalive() {
return keepalive;
}

public void setKeepalive(boolean keepalive) {
this.keepalive = keepalive;
}
}

 

package com.ab.hsf.server.http.handler;

import com.ab.hsf.analysis.ParamsAnalysis;
import com.ab.hsf.bean.HsfServiceBean;
import com.ab.hsf.constants.Constants;
import com.ab.hsf.data.Invocation;
import com.ab.hsf.data.RequestMessage;
import com.ab.hsf.data.ResponseMessage;
import com.ab.hsf.init.HsfServiceFactoryBean;
import com.ab.hsf.reflect.Invoker;
import com.ab.hsf.reflect.impl.DefaultInvoker;
import com.ab.hsf.util.StringUtils;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.util.Map;

import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
* http服务处理类
* User: alex
* DateTime: 15-7-23 下午2:09
*/
public class HsfHttpServerHandler extends ChannelInboundHandlerAdapter {

private HttpRequest request;

private ParamsAnalysis paramsAnalysis;

private Invoker defaultInvoker;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RequestMessage requestMessage = null;
if (msg instanceof HttpRequest) {
request = (HttpRequest) msg;
URI uri = new URI(request.getUri());
if (uri.getPath().equals("/favicon.ico")) {
return;
}
paramsAnalysis = new ParamsAnalysis(request, request.getUri());
requestMessage = paramsAnalysis.getMethodHandle(request);
}
if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;
requestMessage = paramsAnalysis.postMethodHandle(httpContent);
}

//判断是否满足条件
if (requestMessage != null && requestMessage.getErrorMessage() != null) {
return;
}

// 解析http头部
Map<String,String> httpHeaderMap = paramsAnalysis.parseHeader(request.headers());
//反射取值
String response = invokerHandle(requestMessage,httpHeaderMap);
//响应
ctx.write(this.httpResponseHandle(response));
ctx.flush();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}

/**
* 反射获取结果
*
* @param requestMessage 请求参数
* @return 结果
*/
private String invokerHandle(RequestMessage requestMessage,Map<String,String> httpHeaderMap ) {
Invocation invocation = requestMessage.getInvocationBody();
HsfServiceBean configBean = HsfServiceFactoryBean.getProvider(invocation.getIfaceId(), invocation.getAlias());
//校验token
if(!checkHeader(configBean,httpHeaderMap)){
return "token is wrong";
}
defaultInvoker = new DefaultInvoker(configBean.getInterfacePath(), configBean.getTargetObject());
String result = null;
try {
ResponseMessage responseMessage = defaultInvoker.invoke(requestMessage);
result = String.valueOf(responseMessage.getResponse());
} catch (Exception e) {
result = e.getLocalizedMessage();
}

return result;
}

/**
* 封装响应数据信息
*
* @param responseMessage 响应数据
* @return 响应对象
*/
private FullHttpResponse httpResponseHandle(String responseMessage) {
FullHttpResponse response = null;
try {
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(responseMessage.getBytes("UTF-8")));
response.headers().set(CONTENT_TYPE, Constants.RESPONSE_JSON);
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
if (HttpHeaders.isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return response;
}

/**
* 校验头信息
* @param configBean  配置bean
* @param httpHeaderMap  头信息
* @return  true 校验通过,false校验失败
*/
private boolean checkHeader(HsfServiceBean configBean,Map<String,String> httpHeaderMap) {
boolean flag = false;
//需要校验
if(StringUtils.isNotBlank(configBean.getToken())){
if(httpHeaderMap != null){
//如果token不为空,需要和前台传入的token比较,不一致,返回错误
String token = httpHeaderMap.get(Constants.TOKEN);
if(StringUtils.isNotBlank(token) && configBean.getToken().equals(token)) {
//验证通过
flag = true;
}
}
} else {
//验证通过
flag = true;
}
return flag;
}
}

 

 

 

package com.ab.hsf.analysis;

import com.ab.hsf.constants.Constants;
import com.ab.hsf.data.Invocation;
import com.ab.hsf.data.RequestMessage;
import com.ab.hsf.util.ParamsUtils;
import com.ab.hsf.util.ReflectUtils;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* 参数解析类
* User: alex
* DateTime: 15-8-7 下午3:10
*/
public class ParamsAnalysis {

private static final Logger logger = LoggerFactory.getLogger(ParamsAnalysis.class);

private HttpRequest httpRequest;

private String uri;

/**
* 构造方法
* @param httpRequest 请求
* @param uri uri
*/
public ParamsAnalysis(HttpRequest httpRequest, String uri) {
this.httpRequest = httpRequest;
this.uri = uri;
}

/**
* 处理get提交
* @param httpRequest 请求
* @return 结果
*/
public RequestMessage getMethodHandle(HttpRequest httpRequest) {
// 构建请求
RequestMessage requestMessage = new RequestMessage();
HttpMethod reqMethod = httpRequest.getMethod();
if (reqMethod != HttpMethod.GET) {
requestMessage.setErrorMessage("Only allow GET");
return requestMessage;
}
String jsonbody = null;
try {
requestMessage = convertRequestMessage(requestMessage);
Invocation invocation = requestMessage.getInvocationBody();
Object[] paramList = null;
String params = null;
int length = invocation.getArgsType().length;
paramList = new Object[length];
if (uri.indexOf("?") != -1) {     //问号传参形式
params = uri.substring(uri.indexOf("?") + 1);
paramList = ParamsUtils.parseParamArg(invocation.getArgClasses(), params);
} else {      //rest传参形式
paramList = ParamsUtils.parseParamArgForRest(uri);
}
requestMessage.getInvocationBody().setArgs(paramList);
} catch (Throwable e) {
logger.error("Failed to parse http request for uri " + uri + (jsonbody != null ? ", body is " + jsonbody : "") + ".", e);
requestMessage.setErrorMessage("Failed to parse http request for uri " + uri);
}
return requestMessage;
}

/**
* 处理post方法
* @param httpContent 实体
* @return 结果
*/
public RequestMessage postMethodHandle(HttpContent httpContent) {
// 构建请求
RequestMessage requestMessage = new RequestMessage();
HttpMethod reqMethod = httpRequest.getMethod();
if (reqMethod != HttpMethod.POST) {
requestMessage.setErrorMessage("Only allow POST");
return requestMessage;
}
String jsonbody = null;

try {
requestMessage = convertRequestMessage(requestMessage);
Invocation invocation = requestMessage.getInvocationBody();
// 解析请求body
Object[] paramList = null;
ByteBuf buf1 = httpContent.content();
int size = buf1.readableBytes();
byte[] s1 = new byte[size];
buf1.readBytes(s1);
jsonbody = new String(s1, Constants.DEFAULT_CHARSET);
paramList = ParamsUtils.streamParseJson(invocation.getArgClasses(), jsonbody);
if(paramList != null) {
requestMessage.getInvocationBody().setArgs(paramList);
}
} catch (Throwable e) {
logger.error("Failed to parse http request for uri " + uri + (jsonbody != null ? ", body is " + jsonbody : "") + ".", e);
requestMessage.setErrorMessage("Failed to parse http request for uri " + uri);
}
return requestMessage;
}

/**
* 转换请求头信息
* @param requestMessage 请求参数
* @return 结果
*/
private RequestMessage convertRequestMessage(RequestMessage requestMessage) {
// 解析uri
String[] strArr = ParamsUtils.getInterfaceIdAndMethod(uri);
String alias = strArr[0];
String interfaceId = strArr[1];
String methodName = strArr[2];

Invocation invocation = new Invocation();
invocation.setClazzName(interfaceId);
invocation.setIfaceId(interfaceId);
invocation.setMethodName(methodName);
invocation.setAlias(alias);
requestMessage.setInvocationBody(invocation);

Class[] classArray = ReflectUtils.getMethodArgsType(interfaceId, methodName);
if (classArray == null) {
logger.error("params type list can NOT be NULL, please check the interfaceId/methodName. interfaceId:" + interfaceId + " method:" + methodName);
requestMessage.setErrorMessage("params type list can NOT be NULL, please check the interfaceId/methodName. interfaceId:" + interfaceId + " method:" + methodName);
}
requestMessage.getInvocationBody().setArgsType(classArray);
return requestMessage;
}

/**
* 处理头信息
*/
public static Map<String,String> parseHeader(HttpHeaders httpHeaders) {
Map<String,String> httpHeaderMap = null;
for (Map.Entry header : httpHeaders) {
if(Constants.ACCEPT.equalsIgnoreCase(header.getKey().toString())) {
String value = String.valueOf(header.getValue());
try {
httpHeaderMap = JSON.parseObject(value, Map.class);
} catch (Exception e) {
logger.error("HttpHeaders Accept is not json data!");
httpHeaderMap = null;
}
}
}
return httpHeaderMap;
}
}

 

服务端接入方式:

1、下载jar包,或者引入maven依赖

 

<dependency>
<groupId>hsf</groupId>
<artifactId>hsf</artifactId>
<version>1.0</version>
</dependency>

 2、配置XML文件

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!--实现类 -->
<bean id="providerServiceImpl" class="com.b.asf.provider.impl.ProviderServiceImpl"/>
<!--服务提供者-->
<bean id="providerService" class="com.ab.hsf.bean.HsfServiceBean">
<property name="interfacePath" value="com.b.asf.provider.ProviderService"/>
<property name="targetObject" ref="providerServiceImpl"/>
<property name="alias" value="demo3"/>
<property name="token" value="12345"/>
</bean>
<!--服务配置-->
<bean id="hsfHttpServer" class="com.ab.hsf.server.http.HsfHttpServer">
<property name="port" value="8088"/>
<property name="keepalive" value="true"/>
</bean>
<!--zk管理-->
<bean id="zooKeeperFactory" class="com.ab.hsf.zk.ZookeeperFactory">
<property name="hosts" value="127.0.0.1:2181"/>
<property name="appAddress" value="10.25.3.207:8088"/>
<property name="nameSpace" value="demo3"/>
</bean>

<!--加载服务-->
<bean id="hsfServiceFactoryBean" autowire="no" class="com.ab.hsf.init.HsfServiceFactoryBean">
<property name="serviceList">
<list>
<ref bean="providerService"/>
</list>
</property>
<property name="zookeeperFactory" ref="zooKeeperFactory"/>
<property name="hsfHttpServer" ref="hsfHttpServer"/>
</bean>
</beans>

 

3、编写java实现类

声明接口

 

public interface ProviderService {
public String getResult(Map params);
}

 实现类

 

 

public class ProviderServiceImpl implements ProviderService {

public String getResult(Map params){
String r = null;
for(String t : params.keySet()) {
r = params.get(t).toString();
}
return "我是8088:" + r;
}
}

 

 

四、路由平台展示

首页展示



 

服务管理展示



服务详情展示



五、后续

1、增加接口监控告警功能,当服务提供者发生异常时,则通过邮件、短信等形式进行告警。

2、增加灰度发布功能,根据不同版本或者组别,发布灰度服务

3、增加降级功能,可以根据接口的需求,对接口进行降级操作

4、增加安全拦截,对接入的接口服务做安全校验





大小: 54.5 KB





大小: 16.2 KB





大小: 68 KB





大小: 45.4 KB





大小: 44.7 KB

查看图片附件
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: