您的位置:首页 > 编程语言 > Java开发

rabbitmq学习以及spring项目spring-amqp配置

2017-08-19 13:50 411 查看

rabbitmq学习

rabbimq是一种由erlang语言开发的,程序和程序之间通讯工具,因此,在使用rabbitmq时候需要安装erlang的环境,这里就略了。

装好rabbitmq后,我们登录127.0.0.1:15672 ,就可以进入页面视图,进行账户相关的设置,这里也略过.



我们主要学习rabbitmq的消息通讯学习,我们使用pom工程,引入rabbitmq的依赖,坐标是:

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>


rabbitmq消息通讯主要有5种模式:简单模式,work模式,订阅模式,路由模式,通配符模式.

简单模式

/** 发送者 **/
public class Send {

private static String QUEUE_NAME= "test_queue";
public static void main(String[] args) throws IOException {
/*
* 步骤:
* 1.建立连接
* 2.创建通道
* 3.声明队列
* 4.设定消息内容,连接通道
* 5.关闭通道和连接
*/
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello rabbitmq";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

channel.close();
connection.close();

}

}

/**连接工具类,通过工具类连接开头设置的host/账号/密码**/
public class ConnectionUtil {

public static Connection getConnection() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/huawei");
factory.setUsername("huawei");
factory.setPassword("huawei");

Connection connection = factory.newConnection();
return connection;
}
}
/**接收者**/
public class Recev {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception {
/*
* 步骤:
* 1.建立连接
* 2.创建通道
* 3.声明队列
* 4.设定消息者,监听队列
* 5.获取消息
*/
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true,consumer);//监听 ,true表示消息自动确认

while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("receve:"+message);
}

}
}


代码写好后,先运行接收者,让其处理待接收状态,然后运行发送者,这样接收者就能接收到消息,并打印出消息内容,在页面的queue木块也能查看到队列名以及队列消 息运行历史.

work模式

work模式和简单模式的区别就是,简单模式是一个发送者一个接收者,但是work模式可以是一个发送者,多个接收者,缺点就是多个接收者接收到的消息都是消息的一部分,并不是全部 .

/**  消息发送者 **/
public class Send {

private static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

for (int i = 0; i < 100; i++) {
String message = "" + i ;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
Thread.sleep(i*10);
}

channel.close();
connection.close();
}
}
/** 消息接收者1 **/
public class Recev {
private static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection
4000
connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//开启能者多劳模式,同一时间只会发送过一条消息给消费者
channel.basicQos(1);

QueueingConsumer consumer  = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true,consumer);//true表示消息自动确认

while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received :" + message);

Thread.sleep(10);
}

}
}
/**   消息接收者2  **/
public class Recev2 {
private static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//开启能者多劳模式,同一时间只会发送过一条消息给消费者
channel.basicQos(1);

QueueingConsumer consumer  = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true,consumer);//true表示消息自动确认

while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received2 :" + message);

Thread.sleep(1000);
}

}
}


订阅模式

订阅模式解决了work模式的缺点,使得多个接收者都能够接收到所有的消息,解决的机制是通过一个交换机,消息经过发送者发送后,到达交换机,交换机再将消息分发到多个队列中,再由接收者接收

/** 发送者 **/
public class Send{
private final static String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明exchange 交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("send: " + message + "'");

channel.close();
connection.close();
}
}
/**  接收者1 **/
public class Recv {

private final static String QUEUE_NAME = "test_queue_work";

private final static String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
/**  接收者2 **/
public class Recv2 {

private final static String QUEUE_NAME = "test_queue_work2";

private final static String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}


路由模式

订阅模式看起来已经比较完善了,但是还是有个缺点,就是不能满足我们的特殊需求,比如我们消息分为很多中,有的是更新消息,有的是插入删除消息,如果需要使不同的消费者处理不同的消息,可以使用路由模式,路由模式的原理是,在交换机设置的地方设置routingkey ,消费者根据发送者发送的routingkey来匹配是否需要处理。

/** 发送者 **/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

//设定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String messge = "hello rabbitmq";
//这里第二个参数,设定了routingkey为insert
channel.basicPublish(EXCHANGE_NAME, "insert", null, messge.getBytes());

channel.close();
connection.close();

}
}
/**      消费者1 **/
public class Recev {

private final static String QUEUE_NAME = "test_queue_work";

private final static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定到交换机,第三个参数为routingkey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
/**  消费者2 **/
public class Recv2 {

private final static String QUEUE_NAME = "test_queue_work2";

private final static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机,第三个参数为routingkey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}


通配符模式

通配符模式是路由模式的升级版,是对routingkey的匹配上做了规则改进,可以使用#号或星号指定routingkey的匹配规则, #匹配一个或多个单词,而 星号 匹配不多不少一个单词

/** 发送者     **/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 消息内容
String message = "{item:123123,type:insert}";
channel.basicPublish(EXCHANGE_NAME, "key.insert", null, message.getBytes());

channel.close();
connection.close();
}
}
/** 接收者1 **/
public class Recev {
private final static String QUEUE_NAME = "test_queue_topic_work";

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
/**  接收者2 **/
public class Recev2 {

private final static String QUEUE_NAME = "test_queue_topic_work2";

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}


spring-amqp项目

spring-amqp项目是spring整合了rabbitmq,使得我们的项目中可以通过配置文件配置rabbitmq来使用

首先导入rabbitmq以及spring整合rabbitmq的依赖

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>


applicationContext-rabbitmq.xml的配置文件配置:

<!-- amqp 需要配置的信息有
1.连接工厂
2.管理器(管理队列和交换机的)
3.交换机(路由)
4.amqp模板(用于发送消息)
5.队列
6.监听(消费者用于监听队列的消息)
-->
<!-- 1.定义连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672"
username="huawei" password="huawei" virtual-host="/huawei"/>

<!-- 2.定义管理器,需要依赖连接工厂 -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- 3.定义交换机 -->
<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding  queue="myQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>

<!-- <rabbit:topic-exchange name="">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="foo.*" />
</rabbit:bindings>
</rabbit:topic-exchange> -->

<!-- 4.定义模板 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="fanoutExchange">
</rabbit:template>

<!-- 5.定义队列 -->
<rabbit:queue name="myQueue" auto-declare="true"/>

<!-- 6.定义监听 ,需要依赖连接工厂,定义bean,方法 ,队列名-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="foo" method="listen" queue-names="myQueue"/>
</rabbit:listener-container>
<bean id="foo"  class="com.hw.rabbitmq.spring.Foo"/>


加载配置文件

public class SpringMain {

public static void main(String[] args) throws Exception {
AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:spring/rabbitmq-context.xml");

RabbitTemplate template = context.getBean(RabbitTemplate.class);
try {
template.convertAndSend("hello rabbitmq");
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(1000);
context.destroy();
}
}

public class Foo {

public  void listen(String foo){
System.out.println(foo);
}
}


当我们运行SpringMain时候,Foo就会打印出收到的消息
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rabbitmq