您的位置:首页 > 其它

RocketMQ的安装与使用

2016-01-12 10:29 453 查看
1.下载jdk并安装

sudo tar xvzf jdk-7u65-linux-x64.gz -C /usr/local
cd /usr/local
ln -s jdk1.8.0_65/ jdk
cd bin
sudo ln -s /usr/local/jdk/bin/java


2.下载RocketMQ并安装

官网下不下来

备用下载地址:http://download.csdn.net/detail/wtwshui/9194133#comment

sudo tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local
cd /usr/local
sudo ln -s alibaba-rocketmq rocketmq


3.设置环境变量

export PATH=$PATH:/usr/local/bin
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH::$ROCKETMQ_HOME/bin
cd /usr/local/rocketmq/bin
sudo sh os.sh


4.开启服务

nohup sh mqnamesrv &
nohup sh  mqbroker -n "127.0.0.1:9876" &
5.验证

完成上述步骤后,如果出现了错误,请试着修改runserver.sh和runbroker.sh这二个文件中的jvm配置选项,具体的jvm的含义如下:

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"


-Xmx:设置JVM最大可用内存为Xmx M。

-Xms:设置JVM促使内存为Xms m。此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。

-Xmn:设置年轻代大小。整个JVM内存大小=年轻代大小 + 年老代大小 + 持久代大小。持久代一般固定大小为64m,所以增大年轻代后,将会减小年老代大小。此值对系统性能影响较大,Sun官方推荐配置为整个堆的3/8。

-Xss:设置每个线程的堆栈大小。JDK5.0以后每个线程堆栈大小为1M,以前每个线程堆栈大小为256K。更具应用的线程所需内存大小进行调整。在相同物理内存下,减小这个值能生成更多的线程。但是操作系统对一个进程内的线程数还是有限制的,不能无限生成,经验值在3000~5000左右。

-XX:MaxPermSize:设置持久代大小。

使用jps命令列出所有的java进程

如果出现三个进程则表示安装配置的成功

6.编写代码

消费者:

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");

consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("QuickStartConsumer");
consumer.subscribe("QuickStart", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}
生产者:

public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("QuickStartProducer");
producer.start();

for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("QuickStart",// topic
"TagA",// tag
("Hello RocketMQ ,QuickStart" + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

producer.shutdown();
}
}


如果运行上述的程序出现异常,请看看第五步是不是有问题,如果没有的话,使用下面命令关闭防火墙

sudo ufw disable


7.运行结果







8.参考网站:/article/4424450.html

http://www.uml.org.cn/zjjs/201504021.asp
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: