redis消息模式
2017-01-16 11:45
309 查看
消息通知
一般来说,消息队列有两种场景,一种是生产者消费者模式,一种是发布者订阅者模式。利用redis这两种场景的消息队列都能实现。
1、生产者消费者模式
生产者生产消息放到队列中,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息,即对于每个消息最多只能被一个消费者拥有。具体的方法就是创建一个任务队列,生产者主动lpush消息,而消费者去rpop数据。但是这样存在一个问题,就是消费者需要主动去请求数据,周期性的请求会造成资源的浪费。如果可以实现一旦有新消息加入队列就通知消费者就好了,这时借助brpop命令就可以实现这样的需求。brpop和rpop命令相似,唯一区别就是当列表中没有元素时,brpop命令会一直阻塞住连接,直到有新元素加入。
BRPOP key timeout
brpop命令接收两个参数,第一个参数key为键值,第二个参数timeout为超时时间。BRPOP命令取数据时候,如果暂时不存在数据,该命令会一直阻塞直到达到超时时间。如果timeout设置为0,那么就会无限等待下去。
2、发布者订阅者模式
发布者生产消息放到队列里,多个监听队列的订阅者都会受到同一份消息。生产者使用下面命令来发布消息:
PUBLISH CHANNEL MESSAGE
订阅者通过下面的命令来订阅消息,执行subscribe命令后,客户端进入订阅状态,处于此状态的客户端不能使用4个属于“发布/订阅”模型的命令之外的命令。另外,可以使用subscribe channel1.1 channel1.2 ... 同时订阅多个频道。
SUBSCRIBE CHANNEL
3、Java实现的redis的消息队列
在jedis中,有对应的方法进行订阅和发布,为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。
下面我们要实现三个类,一个对应publish,一个对应subscribe,一个对应要传递的对象实体类:
实体类:
import java.io.Serializable; /** * 实体类 * 封装消息 * @author Administrator * */ public class Message implements Serializable { private static final long serialVersionUID = 1L; private String title; private String content; public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
Publish类:
import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import redis.clients.jedis.Jedis; /** * 发布者,用于发布消息 * @author Administrator * */ public class TestPub { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1"); try { Message message = new Message(); message.setTitle("体育新闻"); message.setContent("著名NBA球星科比退役了!"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(message); String msg1 = baos.toString("ISO-8859-1"); jedis.publish("foo", msg1); } catch (Exception e) { e.printStackTrace(); } jedis.close(); } }
Subscribe类:
import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; /** * 订阅者,用于接收消息 * @author Administrator * */ public class TestSub { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1"); JedisPubSub jedisPubSub = new JedisPubSub() { @Override public void onUnsubscribe(String channel, int subscribedChannels) { // 取消订阅时候的处理 System.out.println("1"); } @Override public void onSubscribe(String channel, int subscribedChannels) { // 初始化订阅时候的处理 System.out.println("2"); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { // // 取消按表达式的方式订阅时候的处理 System.out.println("3"); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { // // 初始化按表达式的方式订阅时候的处理 System.out.println("4"); } @Override public void onPMessage(String pattern, String channel, String message) { //// 取得按表达式的方式订阅的消息后的处理 System.out.println("5"); } @Override public void onMessage(String channel, String message) {// 取得订阅的消息后的处理 try { ByteArrayInputStream bis = new ByteArrayInputStream(message.getBytes("ISO-8859-1")); // 此处指定字符集将字符串编码成字节数组,此处的字符集需要与发布时的字符集保持一致 ObjectInputStream ois = new ObjectInputStream(bis); Message message2= (Message) ois.readObject(); System.out.println(message2.getTitle()+"\n"+message2.getContent()); } catch (Exception e) { e.printStackTrace(); } finally { } } }; jedis.subscribe(jedisPubSub, "foo"); jedis.close(); } }
相关文章推荐
- 使用redis的发布订阅模式实现消息队列
- redis 消息队列 发布、订阅模式
- Redis基础学习--Redis 事务(watch命令)、生存时间、排序、消息通知("发布/订阅"模式)、管道、节省空间
- Spring Boot 与Redis 集成实现消息发布/订阅模式
- 利用redis实现消息队列之queue模式
- 【转】redis 消息队列发布订阅模式spring boot实现
- Redis基础---消息通信模式
- redis 消息队列发布订阅模式spring boot实现
- Redis消息通知(任务队列和发布订阅模式)
- redis实现消息队列&发布/订阅模式使用
- Spring Data Redis实现消息队列——发布/订阅模式
- Java Jedis操作Redis示例(一)——pub/sub模式实现消息队列
- Java Jedis操作Redis示例(二)——list 生产者/消费者模式实现消息队列
- 利用redis实现消息队列之topic模式
- WCF的消息交换模式
- 探索AJAX中的消息传输模式(一)
- 探索AJAX中的消息传输模式(一)
- 结合Msg Ack机制和纯Msg的Web Service调用方法实现消息到达主动通知模式
- UI与逻辑的关系、主线程与工作线程的通讯及对观察者模式、消息总线机制的思考
- 消息交换模式 汇总