ActiveMQ安装和简单使用
2017-06-29 21:16
330 查看
1、安装
Windows
解压apache-activemq-5.11.1-bin.zip 文件 双击apache-activemq-5.11.1\bin\win64\activemq.bat文件在浏览器中输入 http://localhost:8161/ 出现如下图所示 表示mq启动成功
2、第一个activemq代码
生产者
public class Producer {public static void main(String[] args) throws Exception {
//创建连接工厂
//第一个参数用户名
//第二个参数密码
//第三个参数mq的的地址 在activemq.xml可以找到
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://127.0.0.1:61616");
//创建客户端连接
Connection connection=connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话
//第一个参数表示 是否开启实物 如果false表示不开启 如果是true 在发送完数据的时候要执行session.commit();
//第二个参数表示签收模式 Session.AUTO_ACKNOWLEDGE 表示自动签收
//Session.CLIENT_ACKNOWLEDGE 表示手动签收 消费端每消费一条数据时都要向mq发送一个确认签收 多用这种方式
//Session.DUPS_OK_ACKNOWLEDGE 不一定签不签收 这个容易造成重复消费
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建目的地(消息发送到哪里) 在PTP中 Destination被称作queue 在发布订阅模式中Destination被称作topic
//第一个参数表示给要发送到哪里起个名字
Destination queue = session.createQueue("aq");
//创建生产者
//如果是发布订阅模式 用 session.createTopic(arg0)
MessageProducer producer = session.createProducer(queue);
//设置消息的持久策略
//DeliveryMode.NON_PERSISTENT 不持久
//DeliveryMode.PERSISTENT 持久
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//生产消息
TextMessage textMsg=session.createTextMessage();
for(int i=0;i<10;i++){
textMsg.setText("生产第 "+i+" 条数据");
//如果上面开启事物 这里要提交事物session.commit();
// producer.send(arg0, arg1, arg2, arg3, arg4); 第一个参数目的地 第二个参数 要发送的消息 第三个参数 持久策略 地四个参数 是优先级 第五个参数消息在mq中的存活时间
producer.send(textMsg);
}
if(connection!=null){
connection.close();
}
}
}
消费者
public class Consoumer {public static void main(String[] args) throws Exception {
//创建连接工厂
//第一个参数用户名
//第二个参数密码
//第三个参数mq的的地址 在activemq.xml可以找到
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://127.0.0.1:61616");
//创建客户端连接
Connection connection=connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话
//第一个参数表示 是否开启实物 如果false表示不开启 如果是true 在发送完数据的时候要执行session.commit();
//第二个参数表示签收模式 Session.AUTO_ACKNOWLEDGE 表示自动签收
//Session.CLIENT_ACKNOWLEDGE 表示手动签收 消费端每消费一条数据时都要向mq发送一个确认签收 多用这种方式
//Session.DUPS_OK_ACKNOWLEDGE 不一定签不签收 这个容易造成重复消费
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建目的地(消息发送到哪里) 在PTP中 Destination被称作queue 在发布订阅模式中Destination被称作topic
//第一个参数表示给要发送到哪里起个名字
Destination queue = session.createQueue("aq");
//创建消费者
MessageConsumer consumer = session.createConsumer(queue);
while(true){
//接收消息
//consumer.receive 可以传时间毫秒数表示 等待几毫秒
//consumer.receiveNoWait() 表示不等待
TextMessage textMsg=(TextMessage) consumer.receive();
// 如果生产者的签收策略是 CLIENT_ACKNOWLEDGE 这里 一定要手工签收textMsg.acknowledge(); 否则mq不认为这条数据已经被消费
if (textMsg==null)break;
String text = textMsg.getText();
System.out.println("收到的消息是 "+text);
}
if(connection!=null){
connection.close();
}
}
}
3、activemq安全机制
在activemq.xml 中的broker节点中加入以下内容 表示只要在这里配置的用户才可以生产或消费数据<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="aa" password="bb" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
4、 JMS Selectors 消费者可以过滤从queue中获取消息
JMS Selectors用于在订阅中,基于消息属性对消息进行过滤。JMS Selectors由SQL92语法定义。以下是个Selectors的例子:Java代码
在生产数据的时候设置过滤条件的属性字段一定要用property如 setStringProperty("aa", "bb");
myMessage.setStringProperty("JMSType ", "car");
myMessage.setStringProperty("weight ", "5000");
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");
表示只消费 JMSType 属性值是car并且weight 大于2500的数据
在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,例如: LIKE '12%3' ('123' true,'12993' true,'1234' false) LIKE 'l_se' ('lose' true,'loose' false) LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false) 需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值。另外表达式中的属性不会自动进行类型转换
5、数据持久的 设置在mysql数据库中
把mysql-connector-java-5.1.26.jar 放到 apache-activemq-5.11.1\lib下在activemq.xml 中加入如下配置 在broker节点外面
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<!-- activemq这张表要保证数据库里已经有了-->
<property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
修改 <persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
6、activemq 主从集群环境搭建
一、部署方案,ActiveMQ集群环境准备:(1)首先我们下载apache-activemq-5.11.1-bin.tar.gz
(2)Zookeeper方案
主机IP 消息端口 通信端口
node22 2181 2888:3888
node33 2181 2888:3888
node44 2181 2888:3888
(3)ActiveMQ方案
主机IP 集群通信端口 消息端口 控制台端口
node22 62621 51511 8161
node33 62621 51511 8161
node44 62621 51511 8161
二、:首先搭建zookeeper环境 参考 https://my.oschina.net/xiaozhou18/blog/787132
三、:搭建activemq环境
(1)在node22节点下,创建/usr/local/java/activemq-
cluster文件夹,解压apache-activemq-5.11.1-bin.tar.gz文件
(2)修改activemq-cluster/apache-activemq-5.11.1/conf/activemq.xml配置文件
第一处修改:brokerName=”activemq-cluster”(三个节点都需要修改)
第二处修改:先注释掉适配器中的kahadb
第三处修改:添加新的leveldb配置如下(三个节点都需要修改)
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb" 数据存放目录
replicas="3" 有几台activemq 就配置几
bind="tcp://0.0.0.0:62621" 集群通信地址
zkAddress="node22:node33:node44:2181" zk集群地址
hostname="node22" 每台机器的主机名
zkPath="/activemq/leveldb-stores" zk的挂载节点
/>
</persistenceAdapter>
(3)把配置好的activemq-cluster文件夹复制到其他三台机器上 并修改activemq.xml对应的配置
四、:测试启动activemq集群:
第一步:启动zookeeper集群,命令:zkServer.sh start
第二步:启动三台mq集群:顺序启动mq:命令如下:
/usr/local/java/activemq-cluster/apache-activemq-5.11.1/bin/activemq start(关闭stop)
第三步:查看三台机器日志信息:
tail -100f /usr/local/java/activemq-cluster/apache-activemq-5.11.1/data/activemq.log
如果不报错,我们的集群启动成功,可以使用控制台查看
五:集群的brokerUrl配置进行修改即可:
failover:(tcp://node22:51511,tcp://node33:51511,tcp://node44:51511)?Randomize=false
集群操作只需要改 工厂连接即可
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"failover:(tcp://node22:51511,tcp://node33:51511,tcp://node44:51511)?Randomize=false");
六:负载均衡配置如下:
集群1链接集群2:
<networkConnectors>
<networkConnector
uri="static:(tcp://node22:51511,tcp://node33:51511,tcp://node44:51511)"
duplex="false"/>
</networkConnectors>
集群2链接集群1:
<networkConnectors>
<networkConnector
uri="static:(tcp://node22:51511,tcp://node33:51511,tcp://node44:51511)"
duplex="false"/>
</networkConnectors>
相关文章推荐
- ActiveMQ安装和简单使用
- 简单安装l7驱动与手机伴侣MPT(Mobile Phone Tools)使用步骤
- 数据库服务器 之 PostgreSQL安装和简单使用
- CKEditor 3.0(FCKEditor3.0)的简单安装配置使用
- vs2005下安装boost::regex 及简单的使用(转)
- PostgreSQL安装和简单使用
- jUDDI 简单安装使用
- Qt串口类库QExtSerialPort的安装与简单使用
- CKEditor 3.0(FCKEditor3.0)的简单安装配置使用
- PostgreSQL 安装和简单使用第1/2页
- CKEditor 3.0的简单安装配置使用
- Ant安装设置,简单使用测试
- 使用WiX制作简单MSI安装程序
- 使用WiX制作简单MSI安装程序
- CKEditor 3.0(FCKEditor3.0)的简单安装配置使用
- CLAPACK的安装与简单使用
- activemq-cpp安装及使用(3)
- Rails插件安装、使用(2)——FCKEditor的简单安装
- Cacti的插件安装及简单使用
- 最简单的 cvs 安装与使用