您的位置:首页 > 其它

com.rabbitmq.client.ShutdownSignalException

2016-12-18 22:44 489 查看
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:648)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:678)
at xx.mq.Producer.main(Producer.java:25)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 4 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
at java.lang.Thread.run(Thread.java:745)
Closed the channel and conn.




点击“lulei”这个,进入设置权限界面:



设置完成后:



再运行成功了。

附上代码:

package xx.mq;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

private final static String QUEUE_NAME = "hello world";

public static void main(String[] argv) throws java.io.IOException, TimeoutException {

Connection connection = null;
Channel channel = null;
try {
/* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("*.*.*.*");//此处IP隐去
factory.setUsername("lulei");
factory.setPassword("123456");
factory.setPort(5672);
/* 创建连接 */
connection = factory.newConnection();
/* 创建信道 */
channel = connection.createChannel();

// 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

String message = "hello world..."; // 需发送的信息

/* 发送消息,使用默认的direct交换器 */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Send message -> " + message);

} catch(Exception e){
e.printStackTrace();
}finally {
/* 关闭连接、通道 */
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}

System.out.println("Closed the channel and conn.");
}

}

}


消费者:

package xx.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Customer {

private final static String QUEUE_NAME = "hello world";

public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {

/* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("****");
factory.setUsername("lulei");
factory.setPassword("123456");
factory.setPort(5672);
/* 创建连接 */
Connection connection = factory.newConnection();
/* 创建信道 */
Channel channel = connection.createChannel();

// 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Waiting for messages.");

/* 定义消费者 */
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received the message -> " + message);
}
};

// 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}


更多信息:http://blog.csdn.net/roc1029/article/details/51249412
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐