您的位置:首页 > 编程语言 > Java开发

使用spring容器管理和配置netty

2016-08-07 15:20 351 查看

使用spring容器管理和配置netty

为了使程序达到方便配置和管理,spring的ioc容器是特效药之一。本文将使用ioc来管理和配置netty服务器

服务端使用Reactor多线程模型,详见 李林锋老师的博文 Netty系列之Netty高性能之道



在本程序中Reactor Thread Acceptor对应BossGroup,Reactor Thread

Pool对应WorkerGroup

1. netty和spring的maven依赖

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.10.Final</version>
</dependency>


java代码结构解析

1. 服务器管理接口

public interface Server {

public interface TransmissionProtocol{

}

// 服务器使用的协议
public enum TRANSMISSION_PROTOCOL implements TransmissionProtocol {
TCP,UDP
}

TransmissionProtocol getTransmissionProtocol();
// 启动服务器
void startServer() throws Exception;

void startServer(int port) throws Exception;;

void startServer(InetSocketAddress socketAddress) throws Exception;

// 关闭服务器
void stopServer() throws Exception;

InetSocketAddress getSocketAddress();

}


2. 服务器初始化配置接口

public interface NettyServer extends Server
{
/**
* ServerBootstrap创建成功后会有一个ChannelInitializer(即pipeline factory), 本方法主要用于获取这个
* ChannelInitializer
*
* @return
*/
public ChannelInitializer<? extends Channel> getChannelInitializer();

/**
* 设置自己的ChannelInitializer
*
* @param initializer
*            pipeline的工厂类,主要为每个新的链接创建一个pipeline
*/
public void setChannelInitializer(ChannelInitializer<? extends Channel> initializer);

/**
* 获取netty server的configuration
*
* @return .
*/
public NettyConfig getNettyConfig();

}


3. 基本服务器配置类

public abstract class AbstractNettyServer implements NettyServer
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractNettyServer.class);
//用于管理所有的channel
public static final ChannelGroup ALL_CHANNELS = new DefaultChannelGroup("NADRON-CHANNELS", GlobalEventExecutor.INSTANCE);
protected final NettyConfig nettyConfig;
protected ChannelInitializer<? extends Channel> channelInitializer;

public AbstractNettyServer(NettyConfig nettyConfig,
ChannelInitializer<? extends Channel> channelInitializer)
{
this.nettyConfig = nettyConfig;
this.channelInitializer = channelInitializer;
}

@Override
public void startServer(int port) throws Exception
{
nettyConfig.setPortNumber(port);
nettyConfig.setSocketAddress(new InetSocketAddress(port));
startServer();
}

@Override
public void startServer(InetSocketAddress socketAddress) throws Exception
{
nettyConfig.setSocketAddress(socketAddress);
startServer
4000
();
}

@Override
public void stopServer() throws Exception
{
LOG.debug("In stopServer method of class: {}", this.getClass()
.getName());
ChannelGroupFuture future = ALL_CHANNELS.close();
try
{
future.await();
}
catch (InterruptedException e)
{
LOG.error(
"Execption occurred while waiting for channels to close: {}",
e);
}
finally
{
if (null != nettyConfig.getBossGroup())
{
nettyConfig.getBossGroup().shutdownGracefully();
}
if (null != nettyConfig.getWorkerGroup())
{
nettyConfig.getWorkerGroup().shutdownGracefully();
}
}
}

@Override
public ChannelInitializer<? extends Channel> getChannelInitializer()
{
return channelInitializer;
}

// 获取configuration @link(NettyConfig.class)
@Override
public NettyConfig getNettyConfig() {
return nettyConfig;
}

// 获取bossGroup,在spring中配置
protected EventLoopGroup getBossGroup(){
return nettyConfig.getBossGroup();
}

// 获取workerGroup, 在spring中配置
protected EventLoopGroup getWorkerGroup(){
return nettyConfig.getWorkerGroup();
}

@Override
public InetSocketAddress getSocketAddress()
{
return nettyConfig.getSocketAddress();
}

@Override
public String toString()
{
return "NettyServer [socketAddress=" + nettyConfig.getSocketAddress()
+ ", portNumber=" + nettyConfig.getPortNumber() + "]";
}

}


4. tcpServer实现类

public class NettyTCPServer extends AbstractNettyServer {
private static final Logger LOG = LoggerFactory
.getLogger(NettyTCPServer.class);

private ServerBootstrap serverBootstrap;

public NettyTCPServer(NettyConfig nettyConfig,
ChannelInitializer<? extends Channel> channelInitializer)
{
super(nettyConfig, channelInitializer);
}

@Override
public void startServer() throws Exception {
try {
serverBootstrap = new ServerBootstrap();
Map<ChannelOption<?>, Object> channelOptions = nettyConfig.getChannelOptions();
if(null != channelOptions){
Set<ChannelOption<?>> keySet = channelOptions.keySet();
// 获取configuration配置到channelOption
for(ChannelOption option : keySet)
{
serverBootstrap.option(option, channelOptions.get(option));
}
}
// reactor多线程模型,配置bossGroup和workGroup
// bossGroup和workGroup使用spring容器管理
serverBootstrap.group(getBossGroup(),getWorkerGroup())
.channel(NioServerSocketChannel.class)
.childHandler(getChannelInitializer());
// 绑定端口,启动并监听
Channel serverChannel = serverBootstrap.bind(nettyConfig.getSocketAddress()).sync()
.channel();
ALL_CHANNELS.add(serverChannel);
} catch(Exception e) {
LOG.error("TCP Server start error {}, going to shut down", e);
super.stopServer();
throw e;
}
}

@Override
public TransmissionProtocol getTransmissionProtocol() {
return TRANSMISSION_PROTOCOL.TCP;
}

// 配置自己的initializer
@Override
public void setChannelInitializer(ChannelInitializer<? extends Channel> initializer) {
this.channelInitializer = initializer;
serverBootstrap.childHandler(initializer);
}

@Override
public String toString() {
return "NettyTCPServer [socketAddress=" + nettyConfig.getSocketAddress()
+ ", portNumber=" + nettyConfig.getPortNumber() + "]";
}

}


5. configuration实现类NettyConfig

// 用于配置server
public class NettyConfig {
private Map<ChannelOption<?>, Object> channelOptions;

// reactor多线程模型中的acceptor
private NioEventLoopGroup bossGroup;

// reactor多线程模型中的threadPool
private NioEventLoopGroup workerGroup;

//bossGroup的线程数
private int bossThreadCount;

//workerGroup的线程数
private int workerThreadCount;
private InetSocketAddress socketAddress;
private int portNumber = 18090;
protected ChannelInitializer<? extends Channel> channelInitializer;

public Map<ChannelOption<?>, Object> getChannelOptions() {
return channelOptions;
}

public void setChannelOptions(
Map<ChannelOption<?>, Object> channelOptions) {
this.channelOptions = channelOptions;
}

public synchronized NioEventLoopGroup getBossGroup() {
if (null == bossGroup) {
if (0 >= bossThreadCount) {
bossGroup = new NioEventLoopGroup();
} else {
bossGroup = new NioEventLoopGroup(bossThreadCount);
}
}
return bossGroup;
}

public void setBossGroup(NioEventLoopGroup bossGroup) {
this.bossGroup = bossGroup;
}

public synchronized NioEventLoopGroup getWorkerGroup() {
if (null == workerGroup) {
if (0 >= workerThreadCount) {
workerGroup = new NioEventLoopGroup();
} else {
workerGroup = new NioEventLoopGroup(workerThreadCount);
}
}
return workerGroup;
}

public void setWorkerGroup(NioEventLoopGroup workerGroup) {
this.workerGroup = workerGroup;
}

public int getBossThreadCount() {
return bossThreadCount;
}

public void setBossThreadCount(int bossThreadCount) {
this.bossThreadCount = bossThreadCount;
}

public int getWorkerThreadCount() {
return workerThreadCount;
}

public void setWorkerThreadCount(int workerThreadCount) {
this.workerThreadCount = workerThreadCount;
}

public synchronized InetSocketAddress getSocketAddress() {
if (null == socketAddress) {
socketAddress = new InetSocketAddress(portNumber);
}
return socketAddress;
}

public void setSocketAddress(InetSocketAddress socketAddress) {
this.socketAddress = socketAddress;
}

public int getPortNumber() {
return portNumber;
}

public void setPortNumber(int portNumber) {
this.portNumber = portNumber;
}

}


6. 实现自己的ChannelInitializer

// ChannelInitializer是默认的initializer,因此需要继承ChannelInitializer类来实现自己的initializer
public class MyChannelInitializer extends
ChannelInitializer<SocketChannel> {
private static final int MAX_IDLE_SECONDS = 60;

@Override
protected void initChannel(SocketChannel ch) throws Exception {

// 添加到pipeline中的handler会被串行处理(PS: 类似工业生产中的流水线)
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("idleStateCheck", new IdleStateHandler(
MAX_IDLE_SECONDS, MAX_IDLE_SECONDS, MAX_IDLE_SECONDS));
// 使用addLast来添加自己定义的handler到pipeline中
// pipeline.addLast("multiplexer", createMyProtcolDecoder());
}

}


7. 线程工厂类NamedThreadFactory

在netty4中为bossGroup和workerGroup创建线程时需要使用ThreadFactory来创建


// 自定义线程工厂类
public class NamedThreadFactory implements ThreadFactory {
// a thread counter
private static AtomicInteger counter = new AtomicInteger(1);
private String        name = "Adam";
private boolean       daemon; // 守护线程
private int           priority; // 线程优先级

public NamedThreadFactory(String name) {
this(name, false, -1);
}

public NamedThreadFactory(String name, boolean daemon) {
this(name, daemon, -1);
}

public NamedThreadFactory(String name, boolean daemon, int priority) {
this.name = name;
this.daemon = daemon;
this.priority = priority;
}

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, name + "[" + counter.getAndIncrement() + "]");
thread.setDaemon(daemon);
if (priority != -1) {
thread.setPriority(priority);
}
return thread;
}
}


8. AppContext类获取bean

public class AppContext implements ApplicationContextAware
{

public static final String TCP_SERVER = "tcpServer";

// The spring application context.
private static ApplicationContext applicationContext;

public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException
{
AppContext.applicationContext = applicationContext;
}

// 根据beanName获取bean
public static Object getBean(String beanName)
{
if (null == beanName)
{
return null;
}
return applicationContext.getBean(beanName);
}
}


以上即为所需用到的java类,接下来要实现的是如何使用spring ioc来配置管理netty server

spring容器配置

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd"> 
<bean id="tcpServer" class= "io.adam.server.netty.NettyTCPServer" destroy-method="stopServer">
<constructor-arg ref="tcpConfig" />
<constructor-arg ref="myChannelInitializer" />
</bean>

<bean id="myChannelInitializer" class="io.adam.server.netty.MyChannelInitializer">

<bean id="tcpConfig" class="io.nadron.server.netty.NettyConfig">
<property name="channelOptions" ref="tcpChannelOptions"/>
<property name="bossGroup" ref="bossGroup"/>
<property name="workerGroup" ref="workerGroup"/>
<property name="portNumber" value="10086"/>
</bean>

<util:map id="tcpChannelOptions" map-class="java.util.HashMap">
<entry>
<key><util:constant static-field="io.netty.channel.ChannelOption.SO_KEEPALIVE"/></key>
<value type="java.lang.Boolean">true</value>
</entry>
<entry>
<key><util:constant static-field="io.netty.channel.ChannelOption.SO_BACKLOG"/></key>
<value type="java.lang.Integer">100</value>
</entry>
</util:map>

<bean id="bossGroup" class="io.netty.channel.nio.NioEventLoopGroup" destroy-method="shutdownGracefully">
<constructor-arg type="int" index="0" value="2" />
<constructor-arg index="1" ref="bossThreadFactory" />
</bean>

<bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup" destroy-method="shutdownGracefully">
<constructor-arg type="int" index="0"
value="8" />
<constructor-arg index="1" ref="workerThreadFactory" />
</bean>

<bean id="bossThreadFactory" class="io.adam.concurrent.NamedThreadFactory">
<constructor-arg type="java.lang.String" value="Server-Boss" />
</bean>

<bean id="workerThreadFactory" class="io.adam.concurrent.NamedThreadFactory">
<constructor-arg type="java.lang.String" index="0" value="Server-Worker" />
</bean>
<bean id="appContext" class="io.adam.context.AppContext" />
</beans>


以上即为所需spring 容器bean的配置,bean中的class需根据自己的项目路径进行修改

接下来编写测试代码来启动服务端

ServerManager 类

public class ServerManager
{
private AbstractNettyServer tcpServer;

public ServerManager() {
tcpServer = (AbstractNettyServer)AppContext.getBean(AppContext.TCP_SERVER);
}

public void startServer(int port) throws Exception {
tcpServer.startServer(port);
}

public void startServer() throws Exception {
tcpServer.startServer();
}
public void stopServer() throws Exception {
tcpServer.stopServer();
}
}


测试类

public class TestServer
{
public static void main(String[] args) {
ServerManager manager = new ServerManager();
//manager.startServer(args[0]);
manager.startServer();
}
}


本文基本实现了使用spring配置管理netty server所需的基本骨架,可在此基础上拓展其它功能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息