基于netty4的beanstalkd的java客户端实现
2017-06-25 20:35
316 查看
最近实现了基于netty4的beanstalkd的客户端, 实现此客户端的目的是为了学习netty。
beanstalkd是一个高性能、轻量级的分布式内存队列系统,个人认为,如果需要一个轻量型的 中间件, beanstalkd是很不错的一个选择,协议也很简单。beanstalkd的详细介绍 可见点击打开链接。
针对消息中间件的概念,分为消息的提供者和消息的消费者,结合beanstalkd和netty的概念,beanstalkd中tube与netty中的channel一一对应。为了在同一个项目中消息的提供者与消息的消费者不相互影响,相同名称的tube,提供者和消费者使用不同的channel。
提取公共部分到抽象类中
beanstalkd每一个协议对于一个具体的类,例如:put命令实现类
消费类reserve实现
解码用于把netty的ByteBuf转换为beanstalkd协议的响应。beanstalkd协议的响应结构相同,定义一个类来表示
解码器
此处使用了阻塞队列保存发送命令的响应。beanstalkd保证了按照顺序接收命令,并按照顺序处理命令。不会出现命令的响应与命令不一致的情况。 但是,netty无法保证请求和响应之间一一对应的关系,依赖于服务器的实现。如果服务器的实现是第三方的,例如像beanstalkd,则能保证一一对应;但是如果服务器对连接到自己的客户端channel的请求使用多线程来实现,则无法保证按照顺序实现一一对应,可能需要定义特殊的消息协议来保证。
5. beanstalkd 命令发送
beanstalkd的命令有10多个,以put为例
6. 消息提供者和消费者的初始化和退出
在一个项目中可以单独使用提供者或者消费者,也可以同时使用。此客户端支持简单的beanstalkd集群,同时部署多个beanstalkd服务器,初始化时指定多个服务器的地址,连接是按照tube选择具体的服务器。
初始化后,就可以使用provider或者consumer执行beanstalkd命令。
最后需要调用quit(),退出到服务器的连接
具体实现已经上传到GitHub中,https://github.com/shengjunzhao/beanstalk4j 点击打开链接
文中不足之处, 请各位大侠指正。
beanstalkd是一个高性能、轻量级的分布式内存队列系统,个人认为,如果需要一个轻量型的 中间件, beanstalkd是很不错的一个选择,协议也很简单。beanstalkd的详细介绍 可见点击打开链接。
针对消息中间件的概念,分为消息的提供者和消息的消费者,结合beanstalkd和netty的概念,beanstalkd中tube与netty中的channel一一对应。为了在同一个项目中消息的提供者与消息的消费者不相互影响,相同名称的tube,提供者和消费者使用不同的channel。
1. beanstalk协议的封装
协议的封装参考了dinstone,首先定义接口表示协议的功能package com.haole.mq.beanstalk.command; import io.netty.buffer.ByteBuf; import java.nio.charset.Charset; /** * Created by shengjunzhao on 2017/5/27. */ public interface Command { String getCommandLine(); // 拼接协议的命令 ByteBuf prepareRequest(ByteBuf sendBuf, Charset charset, String delimiter); // 编码命令到ByteBuf中 }
提取公共部分到抽象类中
package com.haole.mq.beanstalk.command; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import java.nio.charset.Charset; /** * Created by shengjunzhao on 2017/5/27. */ public class AbstractCommand implements Command { private String commandLine; @Override public String getCommandLine() { return this.commandLine; } @Override public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) { sendBuf.writeBytes(this.commandLine.getBytes(charset)); sendBuf.writeBytes(delimiter.getBytes(charset)); return sendBuf; } public void setCommandLine(String commandLine) { this.commandLine = commandLine; } }
beanstalkd每一个协议对于一个具体的类,例如:put命令实现类
package com.haole.mq.beanstalk.command; import io.netty.buffer.ByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; /** * put command * Created by shengjunzhao on 2017/5/27. */ public class PutCommand extends AbstractCommand { private final static Logger log = LoggerFactory.getLogger(PutCommand.class); private byte[] data; public PutCommand(int priority, int delay, int ttr, byte[] data) { if (data == null) { throw new IllegalArgumentException("data is null"); } if (data.length > 65536) { throw new IllegalArgumentException("data is too long than 65536"); } setCommandLine("put " + priority + " " + delay + " " + ttr + " " + data.length); this.data = data; } @Override public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) { sendBuf = super.prepareRequest(sendBuf,charset,delimiter); sendBuf.writeBytes(data); sendBuf.writeBytes(delimiter.getBytes(charset)); return sendBuf; } }
消费类reserve实现
package com.haole.mq.beanstalk.command; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Created by shengjunzhao on 2017/5/29. */ public class ReserveCommand extends AbstractCommand { private final static Logger log = LoggerFactory.getLogger(ReserveCommand.class); public ReserveCommand(long timeout) { if (timeout > 0) setCommandLine("reserve-with-timeout " + timeout); else setCommandLine("reserve"); } }
2. 编码解码的实现
编码用于把beanstalkd的每一个协议转换为netty的ByteBufpackage com.haole.mq.beanstalk.codec; import com.haole.mq.beanstalk.command.Command; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; /** * Created by shengjunzhao on 2017/5/28. */ public class CommandEncode extends MessageToByteEncoder<Command> { private static final Logger log = LoggerFactory.getLogger(CommandEncode.class); private Charset charset; private String delimiter; public CommandEncode(Charset charset) { this(charset, "\r\n"); } public CommandEncode(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } @Override protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception { if (null == msg) { throw new Exception("The encode message is null"); } ByteBuf sendBuf = ctx.channel().alloc().buffer(512); log.debug("&&&&& command={}", msg.getCommandLine()); sendBuf = msg.prepareRequest(sendBuf, charset, delimiter); out.writeBytes(sendBuf); } }
解码用于把netty的ByteBuf转换为beanstalkd协议的响应。beanstalkd协议的响应结构相同,定义一个类来表示
package com.haole.mq.beanstalk.command; /** * Created by shengjunzhao on 2017/5/27. */ public class Response { private String statusLine; private byte[] data; public String getStatusLine() { return statusLine; } public void setStatusLine(String statusLine) { this.statusLine = statusLine; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } }
解码器
package com.haole.mq.beanstalk.codec; import com.haole.mq.beanstalk.command.Response; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.util.List; /** * Created by shengjunzhao on 2017/5/28. */ public class CommandDecode extends ByteToMessageDecoder { private final static Logger log = LoggerFactory.getLogger(CommandDecode.class); private Charset charset; private String delimiter; public CommandDecode(Charset charset) { this(charset, "\r\n"); } public CommandDecode(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { in.markReaderIndex(); int readableBytes = in.readableBytes(); Response response = new Response(); byte[] resp = new byte[readableBytes]; in.readBytes(resp); log.debug("bytebuf in {}",resp); byte previous = 0; boolean isReset = true; for (int i = 0; i < readableBytes; i++) { byte current = resp[i]; if (previous == 13 && current == 10) { String commandLine = new String(resp, 0, i - 1, charset); String[] spilts = commandLine.split(" "); String result = spilts[0]; if ("RESERVED".equals(result) || "FOUND".equals(result) || "OK".equals(result)) { String bytesStr = spilts[spilts.length - 1]; if (bytesStr.matches("\\d+")) { int bytes = Integer.valueOf(bytesStr); if (bytes == readableBytes - i - 1 - 2) { byte[] data = new byte[bytes]; System.arraycopy(resp, i + 1, data, 0, bytes); response.setData(data); isReset = false; } } else isReset = false; } else isReset = false; response.setStatusLine(commandLine); break; } previous = current; } if (isReset) in.resetReaderIndex(); else { out.add(response); } } }
3. netty 处理器
处理器接收netty解码后的响应保存到阻塞队列中package com.haole.mq.beanstalk.handler; import com.haole.mq.beanstalk.command.Command; import com.haole.mq.beanstalk.command.Response; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; /** * Created by shengjunzhao on 2017/5/28. */ public class BeanstalkHandler extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(BeanstalkHandler.class); private LinkedBlockingQueue<Response> queue = new LinkedBlockingQueue<>(); private Channel channel; public BeanstalkHandler() {} @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.channel=ctx.channel(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Response response = (Response) msg; queue.put(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } public Response sendMessage(Command command) throws InterruptedException { this.channel.writeAndFlush(command); return queue.take(); } }
此处使用了阻塞队列保存发送命令的响应。beanstalkd保证了按照顺序接收命令,并按照顺序处理命令。不会出现命令的响应与命令不一致的情况。 但是,netty无法保证请求和响应之间一一对应的关系,依赖于服务器的实现。如果服务器的实现是第三方的,例如像beanstalkd,则能保证一一对应;但是如果服务器对连接到自己的客户端channel的请求使用多线程来实现,则无法保证按照顺序实现一一对应,可能需要定义特殊的消息协议来保证。
4. netty的初始化
private void init() { b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("beanstalk decode", new CommandDecode(Charset.forName("UTF-8"))); pipeline.addLast("beanstalk encode", new CommandEncode(Charset.forName("UTF-8"))); pipeline.addLast("beanstalk client handler", new BeanstalkHandler()); } }); }
5. beanstalkd 命令发送
beanstalkd的命令有10多个,以put为例
/** * 向队列中插入一个job * * @param channel * @param priority 优先级 0~2**32的整数,最高优先级是0 * @param delay 是一个整形数,表示将job放入ready队列需要等待的秒数 * @param ttr time to run—是一个整形数,表示允许一个worker执行该job的秒数。这个时间将从一个worker 获取一个job开始计算。 * 如果该worker没能在<ttr> 秒内删除、释放或休眠该job,这个job就会超时,服务端会主动释放该job。 * 最小ttr为1。如果客户端设置了0,服务端会默认将其增加到1。 * @param data 及job体,是一个长度为<byetes> 的字符序列 * @return 如果大于0,是新job的数字编号,如果小于0,错误,-1;未知;-2:BURIED;-3:EXPECTED_CRLF;-4:JOB_TOO_BIG;-5:DRAINING * @throws InterruptedException */ public long put(Channel channel, int priority, int delay, int ttr, byte[] data) throws InterruptedException { Command putCommand = new PutCommand(priority, delay, ttr, data); Response response = channel.pipeline().get(BeanstalkHandler.class).sendMessage(putCommand); log.debug("response status {}", response.getStatusLine()); String[] spilts = response.getStatusLine().split(" "); if ("INSERTED".equals(spilts[0])) { return Long.valueOf(spilts[1]).longValue(); } else if ("BURIED".equals(spilts[0])) { return -2; } else if ("EXPECTED_CRLF".equals(spilts[0])) { return -3; } else if ("JOB_TOO_BIG".equals(spilts[0])) { return -4; } else if ("DRAINING".equals(spilts[0])) { return -5; } else return -1; }
6. 消息提供者和消费者的初始化和退出
在一个项目中可以单独使用提供者或者消费者,也可以同时使用。此客户端支持简单的beanstalkd集群,同时部署多个beanstalkd服务器,初始化时指定多个服务器的地址,连接是按照tube选择具体的服务器。
Set<String> servers = new HashSet<>(); servers.add("192.168.209.132:11300"); servers.add("192.168.209.133:11300"); servers.add("192.168.209.134:11300"); BeanstalkProvider provider1 = new DefaultBeanstalkProvider(servers, "beanstalks1");
BeanstalkConsumer consumer1 = new DefaultBeanstalkConsumer(servers, "beanstalks1");
初始化后,就可以使用provider或者consumer执行beanstalkd命令。
最后需要调用quit(),退出到服务器的连接
provider1.quit(); consumer1.quit();
具体实现已经上传到GitHub中,https://github.com/shengjunzhao/beanstalk4j 点击打开链接
文中不足之处, 请各位大侠指正。
相关文章推荐
- ZPush--基于netty4实现的苹果通知推送服务(APNs)Java客户端
- Java 基于 UDP 实现 Socket中的多客户端通信
- 基于Socket的TCP长连接(服务端Java+客户端Android),Service配合AIDL实现
- memcached学习——常用命令+基于java客户端的3种简单实现(二)
- Java基于socket实现的客户端和服务端通信功能完整实例
- Java基于TCP实现服务器和多客户端之间的通信
- Java客户端连接elasticsearch5.5.3实现数据搜索(基于xpack安全管理)
- Java 基于 UDP 实现 Socket中的多客户端通信
- 基于TCP/UDP的NIO服务端/客户端代码实现damo(java)
- Java基于UDP实现服务器和多客户端之间的通信
- ZPush--基于netty4实现的苹果通知推送服务(APNs)Javaclient
- 基于jetty9实现java版的webcoket服务端和客户端
- MQTT客户端--基于paho实现(Java)
- Java 基于 TCP/IP 实现 Socket中的多客户端通信
- Java 基于 TCP/IP 实现 Socket中的多客户端通信
- Java基于Socket实现HTTP下载客户端
- Java客户端连接elasticsearch5.5.3实现数据搜索(基于xpack安全管理)
- 基于JAVA技术的搜索引擎的研究与实现
- 基于JAVA技术的搜索引擎的研究与实现
- 基于JAVA技术的搜索引擎的研究与实现