您的位置:首页 > 移动开发 > Android开发

Android中RabbitMQ

2016-01-20 16:58 806 查看
NND

搞了好几天 ,才搞明白怎么回事。。。。。

1、http://www.rabbitmq.com/java-client.html官网下载jar包,放在libs中;

3、http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.6.0/rabbitmq-java-client-javadoc-3.6.0/overview-summary.html 说明

2、开始代码,做了一个类,简单的发送和接收。。。

import com.rabbitmq.client.AMQP.BasicProperties;

public class Activity01 extends Activity {

    

    private Connection connection;

    private Channel channel;

    

    private Connection reConnection;

    private Channel reChannel;

    

    boolean recieveLoop;

    
private Thread rabbitServer;
private Thread heartServer;

ConnectionFactory factory;

    

    public void onCreate(Bundle savedInstanceState) {

        super.onCreate(savedInstanceState);

        setContentView(R.layout.main);

        

        try{

        ceshi();

        }catch(Exception ex){

        System.out.println("---问题---"+ex.getMessage());

        }

    }

    

    private void ceshi() throws IOException, TimeoutException{

   

         //创建一个链接

    factory = new ConnectionFactory();

         //服务器ip+port

         factory.setHost("192.168.2.xxx");

         factory.setPort(8191);

        

         //用户名  密码

         factory.setUsername("APPxxxxxx");

         factory.setPassword("92A0DC59AB696xxxxxxx");

         factory.setVirtualHost("/domi");

       

          //创建连接

         connection = factory.newConnection();

         //创建一个channel

         channel = connection.createChannel();

         

         reConnection = factory.newConnection();//reConnection 是另一个通道,负责接收信息
 reChannel = connection.createChannel();

         channel.exchangeDeclare("amq.direct", "direct",true);//声明一个转发器,发送时只要有这个就可以发送的服务器

         

         reChannel.exchangeDeclare("amq.direct", "direct",true);//声明一个转发器

         reChannel.queueDeclare("1.1111xxxx", true, false, true, null);//指定一个队列,接收时使用

         //queueDeclare(java.lang.String queue,
boolean durable, boolean exclusive, boolean autoDelete,java.util.Map<java.lang.String,java.lang.Object> arguments)//第四个参数,是自动删除

         reChannel.queueBind("1.1111xxx", "amq.direct", "1.1111.xxx");//绑定一个到转发器

         MakeRabbitRecvThread().start();//开启线程接收信息

         //发生数据

         ParameterClass pc=new ParameterClass();

String utime=pc.getTime();
String verify=pc.getVerify(utime);

Map<String,Object> szHeads = new HashMap<String,Object>();
szHeads.put("route","1.1111xxx");
szHeads.put("cmd", 10x);
szHeads.put("stamp", utime);//"20131122"
szHeads.put("devid",1);
szHeads.put("auth", verify);//"AB

BasicProperties bs=new BasicProperties.Builder().headers(szHeads).build();

channel.basicPublish("amq.direct", "service.xxx,bs, "fsd".getBytes());//发送

         
System.out.println("-------send to the server -----给服务器发送");

RabbitHeartThread().start();//开启心跳线程

}

    

    public Thread MakeRabbitRecvThread() {

   

    System.out.println("-----进入 线程  接收----");
recieveLoop = true;

try {
rabbitServer = new Thread(new Runnable() {
public void run() {

boolean autoAck = false;

try {
synchronized (this) {
if (reConnection == null) {
System.out.println("connection is null");
reConnection = factory.newConnection();
}
if (reChannel == null) {
reChannel = reConnection.createChannel();
reChannel.exchangeDeclare("amq.direct", "direct",true);

reChannel.queueDeclare("1.1111xxxx", true, false, true, null);
reChannel.queueBind("1.1111xxxx", "amq.direct", "1.1111xxxx");
}
}

} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}catch(Exception ex){

}

// //创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(reChannel);

if (reChannel == null) {
System.out.println("reChannel is null-------------");
}else{
System.out.println("reChannel is not null-------------");
}

try {
reChannel.basicConsume("1.1111xxxx", autoAck, consumer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("ioexception ---"+e.getMessage());
}

System.out.println("------- [x] Awaiting RPC requests");

while (recieveLoop) {

QueueingConsumer.Delivery delivery;
try {
System.out.println("---进入接收循环---enter the while circulate");
delivery = consumer.nextDelivery();

        BasicProperties props = delivery.getProperties();

System.out.println("BasicProperties");

System.out.println("delivery getBody:"+delivery.getBody()+"---exchange:"+delivery.getEnvelope().getExchange()+"---routing key:"+delivery.getEnvelope().getRoutingKey()+
"---");
System.out.println("delivery getBody:"+changeData(delivery.getBody(),delivery.getBody().length));

Map<String,Object> szH=new HashMap<String,Object>();

szH=props.getHeaders();

                                                         //接收时,对应发送时,带的Header参数;     

                 
if(delivery.getEnvelope().getExchange().equals("amq.direct")){
System.out.println("in the amq.direct----cmd:"+szH.get("cmd").toString());

if(szH.get("cmd").toString().equals("102")){

System.out.println("-----收到消息------route:"+szH.get("route").toString());

String a=changeData(delivery.getBody(),delivery.getBody().length);

System.out.println("After conversion:"+a);

}else if(szH.get("cmd").toString().equals("103")){//

}

}else{

}

} catch (InterruptedException ie) {
continue;
} catch (Exception ex) {
ex.printStackTrace();
break;
}
// (process the message components ...)
}
}

});
} catch (Exception ex) {
ex.printStackTrace();
}
return rabbitServer;
}

    
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: