您的位置:首页 > 数据库 > Redis

redis消息模式

2017-01-16 11:45 309 查看


消息通知

  一般来说,消息队列有两种场景,一种是生产者消费者模式,一种是发布者订阅者模式。利用redis这两种场景的消息队列都能实现。


  1、生产者消费者模式

  生产者生产消息放到队列中,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息,即对于每个消息最多只能被一个消费者拥有。

  具体的方法就是创建一个任务队列,生产者主动lpush消息,而消费者去rpop数据。但是这样存在一个问题,就是消费者需要主动去请求数据,周期性的请求会造成资源的浪费。如果可以实现一旦有新消息加入队列就通知消费者就好了,这时借助brpop命令就可以实现这样的需求。brpop和rpop命令相似,唯一区别就是当列表中没有元素时,brpop命令会一直阻塞住连接,直到有新元素加入。

BRPOP key timeout


  brpop命令接收两个参数,第一个参数key为键值,第二个参数timeout为超时时间。BRPOP命令取数据时候,如果暂时不存在数据,该命令会一直阻塞直到达到超时时间。如果timeout设置为0,那么就会无限等待下去。

  



  2、发布者订阅者模式

  发布者生产消息放到队列里,多个监听队列的订阅者都会受到同一份消息。

  生产者使用下面命令来发布消息:

PUBLISH CHANNEL MESSAGE


  订阅者通过下面的命令来订阅消息,执行subscribe命令后,客户端进入订阅状态,处于此状态的客户端不能使用4个属于“发布/订阅”模型的命令之外的命令。另外,可以使用subscribe channel1.1 channel1.2 ... 同时订阅多个频道。

SUBSCRIBE CHANNEL


  


  3、Java实现的redis的消息队列

  在jedis中,有对应的方法进行订阅和发布,为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。

  下面我们要实现三个类,一个对应publish,一个对应subscribe,一个对应要传递的对象实体类:

  实体类:

import java.io.Serializable;
/**
* 实体类
* 封装消息
* @author Administrator
*
*/
public class Message implements Serializable
{
private static final long serialVersionUID = 1L;
private String title;
private String content;
public String getTitle()
{
return title;
}
public void setTitle(String title)
{
this.title = title;
}
public String getContent()
{
return content;
}
public void setContent(String content)
{
this.content = content;
}
}


  Publish类:

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import redis.clients.jedis.Jedis;
/**
* 发布者,用于发布消息
* @author Administrator
*
*/
public class TestPub
{
public static void main(String[] args)
{
Jedis jedis = new Jedis("127.0.0.1");
try
{
Message message = new Message();
message.setTitle("体育新闻");
message.setContent("著名NBA球星科比退役了!");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(message);
String msg1 = baos.toString("ISO-8859-1");

jedis.publish("foo", msg1);
} catch (Exception e)
{
e.printStackTrace();
}
jedis.close();
}
}


  Subscribe类:

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
/**
* 订阅者,用于接收消息
* @author Administrator
*
*/
public class TestSub
{
public static void main(String[] args)
{
Jedis jedis = new Jedis("127.0.0.1");
JedisPubSub jedisPubSub = new JedisPubSub()
{
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// 取消订阅时候的处理  
System.out.println("1");
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
// 初始化订阅时候的处理  
System.out.println("2");
}

@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// // 取消按表达式的方式订阅时候的处理  
System.out.println("3");
}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
//   // 初始化按表达式的方式订阅时候的处理  
System.out.println("4");
}

@Override
public void onPMessage(String pattern, String channel,
String message) {
//// 取得按表达式的方式订阅的消息后的处理  
System.out.println("5");
}

@Override
public void onMessage(String channel, String message)
{// 取得订阅的消息后的处理  
try
{
ByteArrayInputStream bis = new ByteArrayInputStream(message.getBytes("ISO-8859-1"));
            // 此处指定字符集将字符串编码成字节数组,此处的字符集需要与发布时的字符集保持一致
ObjectInputStream ois = new ObjectInputStream(bis);
Message  message2= (Message) ois.readObject();
System.out.println(message2.getTitle()+"\n"+message2.getContent());
} catch (Exception e)
{
e.printStackTrace();
} finally
{

}
}
};
jedis.subscribe(jedisPubSub, "foo");
jedis.close();
}
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: