rabbitmq学习以及spring项目spring-amqp配置
2017-08-19 13:50
411 查看
rabbitmq学习
rabbimq是一种由erlang语言开发的,程序和程序之间通讯工具,因此,在使用rabbitmq时候需要安装erlang的环境,这里就略了。装好rabbitmq后,我们登录127.0.0.1:15672 ,就可以进入页面视图,进行账户相关的设置,这里也略过.
我们主要学习rabbitmq的消息通讯学习,我们使用pom工程,引入rabbitmq的依赖,坐标是:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency>
rabbitmq消息通讯主要有5种模式:简单模式,work模式,订阅模式,路由模式,通配符模式.
简单模式
/** 发送者 **/ public class Send { private static String QUEUE_NAME= "test_queue"; public static void main(String[] args) throws IOException { /* * 步骤: * 1.建立连接 * 2.创建通道 * 3.声明队列 * 4.设定消息内容,连接通道 * 5.关闭通道和连接 */ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello rabbitmq"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); channel.close(); connection.close(); } } /**连接工具类,通过工具类连接开头设置的host/账号/密码**/ public class ConnectionUtil { public static Connection getConnection() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/huawei"); factory.setUsername("huawei"); factory.setPassword("huawei"); Connection connection = factory.newConnection(); return connection; } } /**接收者**/ public class Recev { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws Exception { /* * 步骤: * 1.建立连接 * 2.创建通道 * 3.声明队列 * 4.设定消息者,监听队列 * 5.获取消息 */ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true,consumer);//监听 ,true表示消息自动确认 while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("receve:"+message); } } }
代码写好后,先运行接收者,让其处理待接收状态,然后运行发送者,这样接收者就能接收到消息,并打印出消息内容,在页面的queue木块也能查看到队列名以及队列消 息运行历史.
work模式
work模式和简单模式的区别就是,简单模式是一个发送者一个接收者,但是work模式可以是一个发送者,多个接收者,缺点就是多个接收者接收到的消息都是消息的一部分,并不是全部 .
/** 消息发送者 **/ public class Send { private static String QUEUE_NAME = "test_queue_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { String message = "" + i ; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); Thread.sleep(i*10); } channel.close(); connection.close(); } } /** 消息接收者1 **/ public class Recev { private static String QUEUE_NAME = "test_queue_work"; public static void main(String[] args) throws Exception { Connection 4000 connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //开启能者多劳模式,同一时间只会发送过一条消息给消费者 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true,consumer);//true表示消息自动确认 while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("received :" + message); Thread.sleep(10); } } } /** 消息接收者2 **/ public class Recev2 { private static String QUEUE_NAME = "test_queue_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //开启能者多劳模式,同一时间只会发送过一条消息给消费者 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true,consumer);//true表示消息自动确认 while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("received2 :" + message); Thread.sleep(1000); } } }
订阅模式
订阅模式解决了work模式的缺点,使得多个接收者都能够接收到所有的消息,解决的机制是通过一个交换机,消息经过发送者发送后,到达交换机,交换机再将消息分发到多个队列中,再由接收者接收
/** 发送者 **/ public class Send{ private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange 交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息内容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("send: " + message + "'"); channel.close(); connection.close(); } } /** 接收者1 **/ public class Recv { private final static String QUEUE_NAME = "test_queue_work"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } /** 接收者2 **/ public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
路由模式
订阅模式看起来已经比较完善了,但是还是有个缺点,就是不能满足我们的特殊需求,比如我们消息分为很多中,有的是更新消息,有的是插入删除消息,如果需要使不同的消费者处理不同的消息,可以使用路由模式,路由模式的原理是,在交换机设置的地方设置routingkey ,消费者根据发送者发送的routingkey来匹配是否需要处理。
/** 发送者 **/ public class Send { private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //设定交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String messge = "hello rabbitmq"; //这里第二个参数,设定了routingkey为insert channel.basicPublish(EXCHANGE_NAME, "insert", null, messge.getBytes()); channel.close(); connection.close(); } } /** 消费者1 **/ public class Recev { private final static String QUEUE_NAME = "test_queue_work"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定到交换机,第三个参数为routingkey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } /** 消费者2 **/ public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,第三个参数为routingkey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
通配符模式
通配符模式是路由模式的升级版,是对routingkey的匹配上做了规则改进,可以使用#号或星号指定routingkey的匹配规则, #匹配一个或多个单词,而 星号 匹配不多不少一个单词
/** 发送者 **/ public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "{item:123123,type:insert}"; channel.basicPublish(EXCHANGE_NAME, "key.insert", null, message.getBytes()); channel.close(); connection.close(); } } /** 接收者1 **/ public class Recev { private final static String QUEUE_NAME = "test_queue_topic_work"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } /** 接收者2 **/ public class Recev2 { private final static String QUEUE_NAME = "test_queue_topic_work2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
spring-amqp项目
spring-amqp项目是spring整合了rabbitmq,使得我们的项目中可以通过配置文件配置rabbitmq来使用首先导入rabbitmq以及spring整合rabbitmq的依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency>
applicationContext-rabbitmq.xml的配置文件配置:
<!-- amqp 需要配置的信息有 1.连接工厂 2.管理器(管理队列和交换机的) 3.交换机(路由) 4.amqp模板(用于发送消息) 5.队列 6.监听(消费者用于监听队列的消息) --> <!-- 1.定义连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="huawei" password="huawei" virtual-host="/huawei"/> <!-- 2.定义管理器,需要依赖连接工厂 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 3.定义交换机 --> <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="myQueue"/> </rabbit:bindings> </rabbit:fanout-exchange> <!-- <rabbit:topic-exchange name=""> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*" /> </rabbit:bindings> </rabbit:topic-exchange> --> <!-- 4.定义模板 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange"> </rabbit:template> <!-- 5.定义队列 --> <rabbit:queue name="myQueue" auto-declare="true"/> <!-- 6.定义监听 ,需要依赖连接工厂,定义bean,方法 ,队列名--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="foo" method="listen" queue-names="myQueue"/> </rabbit:listener-container> <bean id="foo" class="com.hw.rabbitmq.spring.Foo"/>
加载配置文件
public class SpringMain { public static void main(String[] args) throws Exception { AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/rabbitmq-context.xml"); RabbitTemplate template = context.getBean(RabbitTemplate.class); try { template.convertAndSend("hello rabbitmq"); } catch (Exception e) { e.printStackTrace(); } Thread.sleep(1000); context.destroy(); } } public class Foo { public void listen(String foo){ System.out.println(foo); } }
当我们运行SpringMain时候,Foo就会打印出收到的消息
相关文章推荐
- Spring 学习之旅(五) 持久层的配置,注册实现以及项目重构
- 框架学习之Spring 第一节 认识Spring以及环境的配置
- Spring学习——(二)IOC、DI以及常用xml配置
- spring-amqp 整合rabbitmq生产者配置与代码
- Atitit.mybatis的测试 以及spring与mybatis在本项目中的集成配置说明
- RabbitMQ学习之spring-amqp的重要类的认识
- Spring学习----------AOP以及Spring配置文件详解
- Spring-Security (学习记录五)--配置登录时,密码采用md5加密,以及获取登录信息属性监听同步自己想要的登录信息
- Spring学习总结(20)——Spring加载多个项目properties配置文件问题解决
- rabbitmq学习10:使用spring-amqp发送消息及异步接收消息
- Spring学习总结(20)——Spring加载多个项目properties配置文件问题解决
- 框架学习之Spring 第一节 认识Spring以及环境的配置
- spring-amqp 配置实现rabbitmq 路由
- Atitit.mybatis的测试 以及spring与mybatis在本项目中的集成配置说明
- rabbitmq学习9:使用spring-amqp发送消息及同步接受消息
- spring-amqp整合rabbitmq消费者配置和代码
- mongoDb学习以及spring管理 (包括百度云配置)
- Proxool在web项目中的配置,以及与spring3、hibernate4的集成
- RabbitMQ学习(八)之spring-amqp的重要类的认识
- spring3.2.5学习(一)——spring环境配置以及IOC简介