rocketmq源码简析之namesrc模块
2017-04-25 17:21
169 查看
先看一下官网一张清晰的架构图:
nameserver作为替换早期版本zookeeper的轻量级实现,它只实现了zk的一致性+发布订阅,当然实现方式是不一样的。它的一致性是通过每个节点的Broker、Consumer、Producer定时心跳同步的,存在短暂的不一致性,可以说是弱一致性,不会有影响,因为有柔性事务保证重试机制。发布订阅跟zk就差不多了,Broker会与每一台nameserver保持tcp连接,上传topic信息,自身健康状态,filter信息等;Consumer和Producer会与一台nameserver保持tcp连接,获取路由信息,保证负载均衡。
接下来重点看一下nameserver相关代码:
启动类NamesrvStartup main0方法,通过解析命令行参数,比如c后面配置配置文件,封装到namesrvConfig和nettyServerConfig,来构建NamesrvController这个真正的启动控制器。
NamesrvController有几个重要属性
重点聊一下这个方法:
NamesrvController这个类,几乎把namesrv模块的所有类都关联进去了,本身这个模块类也就8个,所以说非常轻量级。再重点聊一下DefaultRequestProcessor这个类就完了。
DefaultRequestProcessor实现NettyRequestProcessor接口,实现processRequest(处理请求)和rejectRequest(拒绝请求)方法;rejectRequest直接返回false,没有理由拒绝。
上面设计思想是把request和response封装在统一对象RemotingCommand里,通过REQUEST_COMMAND和RESPONSE_COMMAND来区分,RemotingCommand还有很多解析处理内容,包括反射解析CommandCustomHeader等。这里只是通过RequestCode解析出对应RemotingCommand,真正执行,在rocketmq-remoting模块,这个是典型的netty程序,可以参考
https://github.com/a2888409/RocketMQ-Learning/blob/master/book/ch2/2-remoting.md
nameserver作为替换早期版本zookeeper的轻量级实现,它只实现了zk的一致性+发布订阅,当然实现方式是不一样的。它的一致性是通过每个节点的Broker、Consumer、Producer定时心跳同步的,存在短暂的不一致性,可以说是弱一致性,不会有影响,因为有柔性事务保证重试机制。发布订阅跟zk就差不多了,Broker会与每一台nameserver保持tcp连接,上传topic信息,自身健康状态,filter信息等;Consumer和Producer会与一台nameserver保持tcp连接,获取路由信息,保证负载均衡。
接下来重点看一下nameserver相关代码:
启动类NamesrvStartup main0方法,通过解析命令行参数,比如c后面配置配置文件,封装到namesrvConfig和nettyServerConfig,来构建NamesrvController这个真正的启动控制器。
NamesrvController有几个重要属性
//namesrv相关参数 private final NamesrvConfig namesrvConfig; //netty相关参数 private final NettyServerConfig nettyServerConfig; //单线程池,此线程用来启动namesrc,启动之后还有2个定时线程来scanNotActiveBroker(清理不生效broker)和printAllPeriodically(打印每个namesrv的配置表) private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread")); //namesrv配置管理器,容器为HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable,ReadWriteLock保证读写安全 private final KVConfigManager kvConfigManager; //所有运行数据管理器,topicqueuetable、brokeraddrtable等,信息量很多,ReadWriteLock保证读写安全 private final RouteInfoManager routeInfoManager; //服务启动接口,这里传入的是NettyRemotingServer,用netty启动,是rocketmq remoting模块,包括注册请求处理器DefaultRequestProcessor,以及几种数据传输方式invokeSync、invokeAsync、invokeOneway等 private RemotingServer remotingServer; //Broker事件监听器,属于netty概念,监听chanel 4个动作事件,提供处理方法 private BrokerHousekeepingService brokerHousekeepingService; //remotingServer的并发处理器,处理各种类型请求 private ExecutorService remotingExecutor; //公告类,这里是namesrv和nettyserver2个配置文件,ReadWriteLock保证读写,对外提供获取及持久化,DataVersion(时间戳+AtomicLong counter自增)做版本控制,Properties对象做update、string-file中间对象。 private Configuration configuration;
重点聊一下这个方法:
public boolean initialize() { //加载kvConfig.json至KVConfigManager的configTable,即持久化转移到内存 this.kvConfigManager.load(); //将namesrv作为一个netty server启动 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //启动请求处理线程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注册默认DefaultRequestProcessor和remotingExecutor,等start启动即开始处理netty请求 this.registerProcessor(); //每10s检查2分钟接受不到心跳的broker清除掉 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //每10分钟,打印namesrv全局配置信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); return true; }
NamesrvController这个类,几乎把namesrv模块的所有类都关联进去了,本身这个模块类也就8个,所以说非常轻量级。再重点聊一下DefaultRequestProcessor这个类就完了。
DefaultRequestProcessor实现NettyRequestProcessor接口,实现processRequest(处理请求)和rejectRequest(拒绝请求)方法;rejectRequest直接返回false,没有理由拒绝。
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (log.isDebugEnabled()) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null;
上面设计思想是把request和response封装在统一对象RemotingCommand里,通过REQUEST_COMMAND和RESPONSE_COMMAND来区分,RemotingCommand还有很多解析处理内容,包括反射解析CommandCustomHeader等。这里只是通过RequestCode解析出对应RemotingCommand,真正执行,在rocketmq-remoting模块,这个是典型的netty程序,可以参考
https://github.com/a2888409/RocketMQ-Learning/blob/master/book/ch2/2-remoting.md
相关文章推荐
- RocketMQ源码深度解析二之Name Server篇
- RocketMQ client客户端模块源码分析一(生产者)
- UEFI简单的应用程序模块MyHelloWorld:C源文件/INF工程文件源码/简析[6]
- RocketMQ源码解析(1)--代码结构与模块说明
- RocketMQ的name server启动源码总结
- Zepto源码分析-callbacks模块
- agario服务端源码https://github.com/huytd/agar.io-clone/blob/master/src/server/server.js
- RocketMQ源码分析(二)Producer端发送数据
- 【源码解析】Sharding-Jdbc模块分析
- Android4.4设置源码分析(一):设置主界面与各模块之间的联系
- nginx源码分析-过滤模块
- layui源码详细分析系列之文件上传模块
- Centos6.9实现基于源码编译安装LAMP(FPM模块方式)的wordpress应用
- dubbo源码深度解读四之remoting模块
- 读Zepto源码之Event模块
- 读Zepto源码之Stack模块
- linux、内核源码、内核编译与配置、内核模块开发、内核启动流程
- flask源码笔记:三,app.py模块(6)——Flask的方法(上)
- springMVC源码分析--RequestToViewNameTranslator请求到视图名称的转换
- Cocos2d-x源码解析(1)——地图模块(2)