ActiveMQ消息的同步和异步
2017-08-07 13:48
260 查看
一、jms规范的异步性
我们先来看一下同步和异步的概念,jms本身是异步的,但是activemq也可以以同步方式轮询访问生产者的数据。
异步:两个通信应用之间可以不用同时在线等待,任何一方只需各自处理自己的业务,比如发送方发送消息以后不用登录接收方的响应,可以接着处理其他的任务。也就是说发送方和接收方都是相互独立存在的,发送方只管方,接收方只能接收,无须去等待对方的响应。
同步:两个通信应用服务之间必须要进行同步,两个服务之间必须都是正常运行的。发送程序和接收程序都必须一直处于运行状态,并且随时做好相互通信的准备。发送程序首先向接收程序发起一个请求,称之为发送消息,发送程序紧接着就会堵塞当前自身的进程,不与其他应用进行任何的通信以及交互,等待接收程序的响应,待发送消息得到接收程序的返回消息之后会继续向下运行,进行下一步的业务处理。
jms是异步通信,发送方发送消息后就可以继续其它业务,而不用阻塞等等接收方响应。但接收方在接收消息上有两种模式:一种是同步接收消息,一种是异步接收消息。下面的示例中也会分别演示
同步接收消息:接收者主动接收消息,若消息队列中没有消息则阻塞等待,当然也有其它方法可以规定等待时间或是不等待。
异步接收消息:当消息队列有消息时会调用接收者的onMessage方法,接收者不用阻塞等待,可执行其它业务。
二、消息模型
jms支持两类消息传输模型:点对点模型(p2p)和发布/订阅模型(pub/sub)
1、p2p模型
点对点消息传送模型允许JMS客户端通过队列(queue)和这个虚拟通道来同步和异步发送、接收消息。消息生产者称为发送者(sender),消息消费者称为接收者(receiver)。传统上,点对点模型是一个基于拉取(pull)或轮询(polling)的消息传送模型,这种模型从队列中请求消息,而不是自动地将消息推送到客户端。
特点:
(1)发送到队列的消息被一个而且仅仅一个接收者所接收;
(2)队列保留信息,只到被消费或超时,接收者可用离线,上线时可消费消息。
(3)一旦消息被消费,就不会在队列中;
2、Pub/Sub模型
在发布/订阅模型中,消息会被发布到一个名为主题(topic)的虚拟通道中。消息生产者称为发布者(Publisher),而消息消费者称为订阅者(subscriber)。与点对点模型不同,使用发布/订阅模型发布到一个主题的消息,能够由多个订阅者所接收。有时候,也称这项技术为广播(broadcasting)消息。每个订阅者会接收到每条消息的一个副本。总的来说,发布/订阅消息传送模型基本上是一个基于推送(push)的模型,其中消息自动地向消费者广播,无需请求或轮询主题来获得新消息。
特点:
(1) 每个消息可以有多个消费者
(2)发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
(3)为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
三、点对点模型示例
1、下载activemq,有window和linux两个版本,此处为linux版本
2、解压文件,得到以下目录:
3、启动:到bin目录下,有三种启动方式
1) 普通启动 ./activemq start
2) 启动并指定日志文件 ./activemq start >tmp/smlog
3) 后台启动方式nohup ./activemq start >/tmp/smlog
运行截图如下:
4、ActiveMQ自带了一套管理系统,访问http://ip:8161/admin/,输入用户名密码即可登录:
5、实现,登录管理器后我们可以新建队列、topic,但为了更好理解Activemq,我们在程序中实现新建队列、topic。
首先新建一个生产者类(ActivemqQueueProducer.java),代码如下:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
[/code]
为了同时演示同步消息接收和异步消息接收,我们新建两个类(ActivemqQueueConsumer.Java和ActivemqQueueConsumerAsyn.Java),同步和异步的区别在于:
同步:使用Message message = consumer.receive()接收消息
异步:实现接口MessageListener,注册监听器 consumer.setMessageListener(this); //(异步接收) ,实现 onMessage方法
同步ActivemqQueueConsumer.java
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
[/code]
异步ActivemqQueueConsumerAsyn.java
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
[/code]
测试类:TestActiveMqQueue.java
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
[/code]
6、结果
可以看到程序先向队列发送5条记录,如果同步接收类中的循环条件记录大于5和等于5,可看到如下结果:
可明显看出大于5时,消费者阻塞等待接收,而异步则不阻塞,可进行其它业务。
事务的控制语句为: session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);如果是false表示不支持事务,如果是true表示支持事务。如果支持事务则需submit提交,否则不入队列。可通过控制台观察结果
每一项的说明如下:
Number Of Consumers 消费者 这个是消费者端的消费者数量
Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量 。
7、遇到的问题
启动mq的时候没报错但也没成功,可能的原因:
(1) 注意jdk和activemq版本兼容问题;
(2) 注意jvm虚拟内存小的问题;
我们先来看一下同步和异步的概念,jms本身是异步的,但是activemq也可以以同步方式轮询访问生产者的数据。
异步:两个通信应用之间可以不用同时在线等待,任何一方只需各自处理自己的业务,比如发送方发送消息以后不用登录接收方的响应,可以接着处理其他的任务。也就是说发送方和接收方都是相互独立存在的,发送方只管方,接收方只能接收,无须去等待对方的响应。
同步:两个通信应用服务之间必须要进行同步,两个服务之间必须都是正常运行的。发送程序和接收程序都必须一直处于运行状态,并且随时做好相互通信的准备。发送程序首先向接收程序发起一个请求,称之为发送消息,发送程序紧接着就会堵塞当前自身的进程,不与其他应用进行任何的通信以及交互,等待接收程序的响应,待发送消息得到接收程序的返回消息之后会继续向下运行,进行下一步的业务处理。
jms是异步通信,发送方发送消息后就可以继续其它业务,而不用阻塞等等接收方响应。但接收方在接收消息上有两种模式:一种是同步接收消息,一种是异步接收消息。下面的示例中也会分别演示
同步接收消息:接收者主动接收消息,若消息队列中没有消息则阻塞等待,当然也有其它方法可以规定等待时间或是不等待。
异步接收消息:当消息队列有消息时会调用接收者的onMessage方法,接收者不用阻塞等待,可执行其它业务。
二、消息模型
jms支持两类消息传输模型:点对点模型(p2p)和发布/订阅模型(pub/sub)
1、p2p模型
点对点消息传送模型允许JMS客户端通过队列(queue)和这个虚拟通道来同步和异步发送、接收消息。消息生产者称为发送者(sender),消息消费者称为接收者(receiver)。传统上,点对点模型是一个基于拉取(pull)或轮询(polling)的消息传送模型,这种模型从队列中请求消息,而不是自动地将消息推送到客户端。
特点:
(1)发送到队列的消息被一个而且仅仅一个接收者所接收;
(2)队列保留信息,只到被消费或超时,接收者可用离线,上线时可消费消息。
(3)一旦消息被消费,就不会在队列中;
2、Pub/Sub模型
在发布/订阅模型中,消息会被发布到一个名为主题(topic)的虚拟通道中。消息生产者称为发布者(Publisher),而消息消费者称为订阅者(subscriber)。与点对点模型不同,使用发布/订阅模型发布到一个主题的消息,能够由多个订阅者所接收。有时候,也称这项技术为广播(broadcasting)消息。每个订阅者会接收到每条消息的一个副本。总的来说,发布/订阅消息传送模型基本上是一个基于推送(push)的模型,其中消息自动地向消费者广播,无需请求或轮询主题来获得新消息。
特点:
(1) 每个消息可以有多个消费者
(2)发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
(3)为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
三、点对点模型示例
1、下载activemq,有window和linux两个版本,此处为linux版本
2、解压文件,得到以下目录:
3、启动:到bin目录下,有三种启动方式
1) 普通启动 ./activemq start
2) 启动并指定日志文件 ./activemq start >tmp/smlog
3) 后台启动方式nohup ./activemq start >/tmp/smlog
运行截图如下:
4、ActiveMQ自带了一套管理系统,访问http://ip:8161/admin/,输入用户名密码即可登录:
5、实现,登录管理器后我们可以新建队列、topic,但为了更好理解Activemq,我们在程序中实现新建队列、topic。
首先新建一个生产者类(ActivemqQueueProducer.java),代码如下:
package com.css.sword.service; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class ActivemqQueueProducer { private Session session; private MessageProducer producer ; private Connection connection; public void initialize() throws JMSException { ConnectionFactory connectFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectFactory.createConnection(); //session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//支持事务 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//不支持事务 Destination destination = session.createQueue("queue1"); producer = session.createProducer(destination); connection.start(); } public void sendText(String Message) { try { TextMessage text = session.createTextMessage(Message); producer.send(text); System.out.println("Sending message:"+text.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void submit() throws JMSException { session.commit(); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
[/code]
为了同时演示同步消息接收和异步消息接收,我们新建两个类(ActivemqQueueConsumer.Java和ActivemqQueueConsumerAsyn.Java),同步和异步的区别在于:
同步:使用Message message = consumer.receive()接收消息
异步:实现接口MessageListener,注册监听器 consumer.setMessageListener(this); //(异步接收) ,实现 onMessage方法
同步ActivemqQueueConsumer.java
package com.css.sword.service; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; /** * <p>Title:ActivemqQueueConsumer</p> * <p>Description: 模拟同步接收消息</p> * @author yuanxj * @date 2016-1-20 */ public class ActivemqQueueConsumer { private String name = ""; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; ActivemqQueueConsumer(String name){ this.name=name; } public void initialize() throws JMSException { ConnectionFactory connectFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); Connection connection = connectFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("queue1"); consumer = session.createConsumer(destination); connection.start(); } public void recive() { try { initialize(); System.out.println("Consumer("+name+"):->Begin listening..."); int count=0; while(count<10) { Message message = consumer.receive(); //主动接收消息(同步) System.out.println("consumer recive:"+((TextMessage)message).getText()); count++; System.out.println(count); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void submit() throws JMSException { session.commit(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
[/code]
异步ActivemqQueueConsumerAsyn.java
package com.css.sword.service; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * <p>Title:ActivemqQueueConsumerAsyn</p> * <p>Description: 异步接收信息</p> * @author yuanxj * @date 2016-1-20 */ public class ActivemqQueueConsumerAsyn implements MessageListener{ private String name = ""; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; ActivemqQueueConsumerAsyn(String name){ this.name=name; } public void initialize() throws JMSException {ConnectionFactory connectFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); Connection connection = connectFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("queue1"); consumer = session.createConsumer(destination); connection.start(); } public void recive() { try { initialize(); System.out.println("Consumer("+name+"):->Begin listening..."); // 开始监听 consumer.setMessageListener(this); //(异步接收) } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void onMessage(Message arg0) { // TODO Auto-generated method stub try{ if(arg0 instanceof TextMessage) { TextMessage txtMsg = (TextMessage) arg0; System.out.println("consumer("+name+")异步 recive:"+txtMsg.getText()); Thread.sleep(500); } }catch(Exception e) { e.printStackTrace(); } } public void submit() throws JMSException { session.commit(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
[/code]
测试类:TestActiveMqQueue.java
package com.css.sword.service; import javax.jms.JMSException; public class TestActiveMqQueue { public static void main(String[] args) throws InterruptedException, JMSException { ActivemqQueueProducer producer = new ActivemqQueueProducer(); ActivemqQueueConsumer consumer = new ActivemqQueueConsumer("1"); ActivemqQueueConsumerAsyn consumer1 = new ActivemqQueueConsumerAsyn("2"); producer.initialize(); Thread.sleep(500); for(int i=0;i<5;i++) { producer.sendText("Hello, world!"+i); } //producer.submit();//如果开启事务需使用 // producer.close(); System.out.println("consumer1开始监听"); consumer.recive(); consumer.close(); System.out.println("consumer1接收完毕!"); for(int i=0;i<10;i++) { producer.sendText("Hello, world!"+i); } //producer.submit();//如果开启事务需使用 System.out.println("consumer2开始监听"); consumer1.recive(); System.out.println("consumer2接收完毕!"); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
[/code]
6、结果
可以看到程序先向队列发送5条记录,如果同步接收类中的循环条件记录大于5和等于5,可看到如下结果:
可明显看出大于5时,消费者阻塞等待接收,而异步则不阻塞,可进行其它业务。
事务的控制语句为: session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);如果是false表示不支持事务,如果是true表示支持事务。如果支持事务则需submit提交,否则不入队列。可通过控制台观察结果
每一项的说明如下:
Number Of Consumers 消费者 这个是消费者端的消费者数量
Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量 。
7、遇到的问题
启动mq的时候没报错但也没成功,可能的原因:
(1) 注意jdk和activemq版本兼容问题;
(2) 注意jvm虚拟内存小的问题;
相关文章推荐
- ActiveMQ生产者以同步和异步方式发送消息
- 如何利用Oracle golden gate 分发数据库同步消息至ActiveMQ
- 同步/异步与阻塞/非阻塞的区别消息
- Spring framework(10):集成 JMS 异步消息队列(ActiveMQ)
- 同步和异步消息机制
- activemq集合JMS处理异步消息
- 3 ActiveMQ 特性 - 消费者(同步和异步调度)
- (十七)SpringBoot之使用异步消息服务jms之ActiveMQ
- 同步方式的消息发送和异步方式的消息发送
- 循序渐进做项目系列(2):最简单的C/S程序——消息异步调用与消息同步调用
- MSMQ(3)创建、同步异步接收消息
- 使用apache的activemq集合JMS处理异步消息
- 简单消息,异步消息,同步消息辨析
- JMS 中消息的 同步消费 和 异步消费(listener的方式)
- 回调函数,同步调用,异步调用,事件,消息循环
- ActiveMQ事务、异步发送、消息确认概念
- 消息的同步发送,异步发送以及消息发送的可靠性
- 回调函数,同步调用,异步调用,事件,消息循环
- spring整合JMS实现同步收发消息(基于ActiveMQ的实现)
- Objective-C 编程语言(13)远程消息---Obj-C 对远程消息的支持(同步异步消息)