spymemcached源码解析-1-基本过程分析
2015-04-02 00:00
1176 查看
摘要: 结合调用过程,对spymemcached源码分析。
1、整体流程
初始化
客户端执行new MemcachedClient(new InetSocketAddress("192.168.56.101", 11211))。初始化 MemcachedClient,内部初始化MemcachedConnection,创建selector,注册channel到selector,启动IO线程。
线程模型
初始化完成后,把监听mc节点事件的线程,也就是调用select的线程,称为IO线程;应用执行 c.get("someKey"),把应用所在的线程称为工作线程。工作线程通常由tomcat启动,负责创建操作,加入节点的操作队列,工作线程通常有多个;IO线程负责从队列中拿到操作,执行操作。
工作线程
工作线程最终会调用asyncGet,方法内部会创建CountDownLatch(1), GetFuture,GetOperationImpl(持有一个内部类,工作线程执行完成后,最终会调用 latch.countDown()),选择mc节点,操作op初始化(生成写缓冲区),把op放入节点等待队列inputQueue中,同时会把当前节点放入mc连接(mconn)的addedQueue属性中,最后唤醒selector。最终工作线程在latch上等待(默认超时2.5秒)IO线程的执行结果。
IO线程
IO线程被唤醒后
1、handleInputQueue()。移动Operation从inputQueue到writeQ中。对添加到addedQueue中的每一个MemcachedNode分别进行处理。这个函数会处理所有节点上的所有操作,全部发送到mc服务器(之前节点上就有写操作的才这么处理,否则只是注册写事件)。
2、循环过程中,如果当前node中没有写操作,则判断writeQ,readQ中有操作,在SK上注册读/写事件;如果有写操作,需要执行handleWrites函数。这个函数内部首先做的是、填充缓冲区fillWriteBuffer():从writeQ中取出一个可写的操作(remove掉取消的和超时的),改变操作的状态为WRITING,把操作的数据复制到写缓冲区(写缓冲区默认16K,操作的字节数从十几字节到1M,这个地方有复杂的处理,后面会详细分析,现在只考虑简单情况),复制完成后把操作状态变为READING,从writeQ中remove当前操作,把操作add到readQ当中,这个地方会再去复制pending的操作;‚、发送写缓冲区的内容,全部发送完成后,会再次去填充缓冲区fillWriteBuffer()(比如说一个大的命令,一个缓冲区不够)。循环,直到所有的写操作都处理完。ƒ、判断writeQ,readQ是否有操作,更新sk注册的读写事件。get操作的话,现在已经注册了读事件。
3、selector.select()
4、数据到达时,执行handleIO(sk),处理读事件;执行channel.read(rbuf);执行readFromBuffer(),解析数据,读取到END\r\n将操作状态置为COMPLETE。
2、初始化详细流程
1、默认连接工厂为 DefaultConnectionFactory。接着创建TranscodeService(解码的线程池,默认线程最多为10),创建AsciiOperationFactory(支持ascii协议的操作工厂,负责生成各种操作,比如 GetOperationImpl),创建MemcachedConnection,设置操作超时时间(默认2.5秒)。
2、DefaultConnectionFactory创建MemcachedConnection详细过程:创建reconnectQueue,addedQueue,设置shouldOptimize为true,设置maxDelay为30秒,设置opFact,设置timeoutExceptionThreshold为1000(超过这个值,关闭到 mc node 的连接),打开 Selector,创建nodesToShutdown,设置bufSize为16384字节,创建到每个node的 MemcachedNode(默认是AsciiMemcachedNodeImpl,这一步创建SocketChannel,连接到mc节点,注册到selector,设置sk为刚注册得到的SelectionKey),最后启动 MemcachedConnection 线程,进入事件处理的循环代码
while(running) handleIO()。
3、核心流程代码
具体分析参见:http://my.oschina.net/astute/blog/93492?from=20121209
1、整体流程
初始化
客户端执行new MemcachedClient(new InetSocketAddress("192.168.56.101", 11211))。初始化 MemcachedClient,内部初始化MemcachedConnection,创建selector,注册channel到selector,启动IO线程。
线程模型
初始化完成后,把监听mc节点事件的线程,也就是调用select的线程,称为IO线程;应用执行 c.get("someKey"),把应用所在的线程称为工作线程。工作线程通常由tomcat启动,负责创建操作,加入节点的操作队列,工作线程通常有多个;IO线程负责从队列中拿到操作,执行操作。
工作线程
工作线程最终会调用asyncGet,方法内部会创建CountDownLatch(1), GetFuture,GetOperationImpl(持有一个内部类,工作线程执行完成后,最终会调用 latch.countDown()),选择mc节点,操作op初始化(生成写缓冲区),把op放入节点等待队列inputQueue中,同时会把当前节点放入mc连接(mconn)的addedQueue属性中,最后唤醒selector。最终工作线程在latch上等待(默认超时2.5秒)IO线程的执行结果。
IO线程
IO线程被唤醒后
1、handleInputQueue()。移动Operation从inputQueue到writeQ中。对添加到addedQueue中的每一个MemcachedNode分别进行处理。这个函数会处理所有节点上的所有操作,全部发送到mc服务器(之前节点上就有写操作的才这么处理,否则只是注册写事件)。
2、循环过程中,如果当前node中没有写操作,则判断writeQ,readQ中有操作,在SK上注册读/写事件;如果有写操作,需要执行handleWrites函数。这个函数内部首先做的是、填充缓冲区fillWriteBuffer():从writeQ中取出一个可写的操作(remove掉取消的和超时的),改变操作的状态为WRITING,把操作的数据复制到写缓冲区(写缓冲区默认16K,操作的字节数从十几字节到1M,这个地方有复杂的处理,后面会详细分析,现在只考虑简单情况),复制完成后把操作状态变为READING,从writeQ中remove当前操作,把操作add到readQ当中,这个地方会再去复制pending的操作;‚、发送写缓冲区的内容,全部发送完成后,会再次去填充缓冲区fillWriteBuffer()(比如说一个大的命令,一个缓冲区不够)。循环,直到所有的写操作都处理完。ƒ、判断writeQ,readQ是否有操作,更新sk注册的读写事件。get操作的话,现在已经注册了读事件。
3、selector.select()
4、数据到达时,执行handleIO(sk),处理读事件;执行channel.read(rbuf);执行readFromBuffer(),解析数据,读取到END\r\n将操作状态置为COMPLETE。
2、初始化详细流程
1、默认连接工厂为 DefaultConnectionFactory。接着创建TranscodeService(解码的线程池,默认线程最多为10),创建AsciiOperationFactory(支持ascii协议的操作工厂,负责生成各种操作,比如 GetOperationImpl),创建MemcachedConnection,设置操作超时时间(默认2.5秒)。
2、DefaultConnectionFactory创建MemcachedConnection详细过程:创建reconnectQueue,addedQueue,设置shouldOptimize为true,设置maxDelay为30秒,设置opFact,设置timeoutExceptionThreshold为1000(超过这个值,关闭到 mc node 的连接),打开 Selector,创建nodesToShutdown,设置bufSize为16384字节,创建到每个node的 MemcachedNode(默认是AsciiMemcachedNodeImpl,这一步创建SocketChannel,连接到mc节点,注册到selector,设置sk为刚注册得到的SelectionKey),最后启动 MemcachedConnection 线程,进入事件处理的循环代码
while(running) handleIO()。
3、核心流程代码
具体分析参见:http://my.oschina.net/astute/blog/93492?from=20121209
相关文章推荐
- OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一
- spring mvc源码分析(续)——视图解析过程
- mybatis源码学习之执行过程分析(2)——config.xml配置文件和mapper.xml映射文件解析过程
- Rxjava2源码分析(一):Flowable的创建和基本使用过程分析
- memcached源码分析-----哈希表基本操作以及扩容过程
- OpenStack建立实例完整过程源码详细分析(13)----依据AMQP通信架构实现消息发送机制解析之二
- hadoop源码分析(2):Map-Reduce的过程解析
- Spring源码分析----IOC容器的实现(IoC容器的初始化过程(定位、载入解析、注册))
- Android源码解析四大组件系列(一)---Service的启动过程分析
- hadoop源码分析(2):Map-Reduce的过程解析
- OpenStack建立实例完整过程源码详细分析(15)----依据AMQP通信架构实现消息接收机制解析之二
- 对Xabber源码解析的过程(1)工程目录分析
- Android短信源码分析 --PDU解析过程
- Android源码解析Window系列第(一)篇---Window的基本认识和Activity的Window创建过程
- Spring MVC源码分析(续)——视图解析过程
- [Android源码分析]蓝牙文件传输过程解析之UI实现
- Fresco源码解析 - 初始化过程分析
- Tinyxml解析过程源码分析
- SpringMVC解析请求响应请求过程-源码分析
- Nginx源码分析 - HTTP模块篇 - ngx_http_wait_request_handler函数和HTTP Request解析过程