您的位置:首页 > 编程语言 > Java开发

spring的RabbitTemplate 接收Message源码导读

2014-09-25 23:29 417 查看
1,首先调用类SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法。内部类的主要属性如下

private final BlockingQueueConsumer consumer;

private final CountDownLatch start;

private volatile FatalListenerStartupException startupException;


2,内部类的run方法如下

boolean aborted = false;

int consecutiveIdles = 0;

int consecutiveMessages = 0;

try {

try {
SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
this.consumer.start();
this.start.countDown();
}
catch (QueuesNotAvailableException e) {
if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
throw e;
}
else {
this.start.countDown();
handleStartupFailure(e);
throw e;
}
}
catch (FatalListenerStartupException ex) {
throw ex;
}
catch (Throwable t) {
this.start.countDown();
handleStartupFailure(t);
throw t;
}

if (SimpleMessageListenerContainer.this.transactionManager != null) {
/*
* Register the consumer's channel so it will be used by the transaction manager
* if it's an instance of RabbitTransactionManager.
*/
ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory());
}

// Always better to stop receiving as soon as possible if
// transactional
boolean continuable = false;
while (isActive(this.consumer) || continuable) {
try {
// Will come back false when the queue is drained
continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (continuable) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
/*
*  These will normally be wrapped by an LEFE if thrown by the
*  listener, but we will also honor it if thrown by an
*  error handler.
*/
}
}

}
catch (InterruptedException e) {
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
}
catch (QueuesNotAvailableException ex) {
if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
}
catch (FatalListenerStartupException ex) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
catch (FatalListenerExecutionException ex) {
logger.error("Consumer received fatal exception during processing", ex);
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
catch (ShutdownSignalException e) {
if (RabbitUtils.isNormalShutdown(e)) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
}
}
else {
this.logConsumerException(e);
}
}
catch (AmqpIOException e) {
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
logger.warn(e.getCause().getCause().toString());
}
else {
this.logConsumerException(e);
}
}
catch (Error e) {
logger.error("Consumer thread error, thread abort.", e);
aborted = true;
}
catch (Throwable t) {
this.logConsumerException(t);
}
finally {
if (SimpleMessageListenerContainer.this.transactionManager != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}

// In all cases count down to allow container to progress beyond startup
start.countDown();

if (!isActive(consumer) || aborted) {
logger.debug("Cancelling " + this.consumer);
try {
this.consumer.stop();
synchronized (consumersMonitor) {
if (SimpleMessageListenerContainer.this.consumers != null) {
SimpleMessageListenerContainer.this.consumers.remove(this.consumer);
}
}
}
catch (AmqpException e) {
logger.info("Could not cancel message consumer", e);
}
if (aborted) {
logger.error("Stopping container from aborted consumer");
stop();
}
}
else {
logger.info("Restarting " + this.consumer);
restart(this.consumer);
}

3,run方法调用类BlockingQueueConsumer的start方法,BlockingQueueConsumer的属性如下

private final BlockingQueue<Delivery> queue;

// When this is non-null the connection has been closed (should never happen in normal operation).
private volatile ShutdownSignalException shutdown;

private final String[] queues;

private final int prefetchCount;

private final boolean transactional;

private Channel channel;

private RabbitResourceHolder resourceHolder;

private InternalConsumer consumer;

private final AtomicBoolean cancelled = new AtomicBoolean(false);

private volatile long shutdownTimeout;

private final AtomicBoolean cancelReceived = new AtomicBoolean(false);

private final AcknowledgeMode acknowledgeMode;

private final ConnectionFactory connectionFactory;

private final MessagePropertiesConverter messagePropertiesConverter;

private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter;

private final Map<String, Object> consumerArgs = new HashMap<String, Object>();

private final boolean exclusive;

private final Set<Long> deliveryTags = new LinkedHashSet<Long>();

private final boolean defaultRequeuRejected;

private final CountDownLatch suspendClientThread = new CountDownLatch(1);

private final Collection<String> consumerTags = Collections.synchronizedSet(new HashSet<String>());

private final Set<String> missingQueues = Collections.synchronizedSet(new HashSet<String>());

private final long retryDeclarationInterval = 60000;

private long lastRetryDeclaration;


4,调用ConnectionFactoryUtils获取connection

private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionFactory connectionFactory,
ResourceFactory resourceFactory) {

Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
Assert.notNull(resourceFactory, "ResourceFactory must not be null");

RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
.getResource(connectionFactory);
if (resourceHolder != null) {
Channel channel = resourceFactory.getChannel(resourceHolder);
if (channel != null) {
return resourceHolder;
}
}
RabbitResourceHolder resourceHolderToUse = resourceHolder;
if (resourceHolderToUse == null) {
resourceHolderToUse = new RabbitResourceHolder();
}
Connection connection = resourceFactory.getConnection(resourceHolderToUse);
Channel channel = null;
try {
/*
* If we are in a listener container, first see if there's a channel registered
* for this consumer and the consumer is using the same connection factory.
*/
channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory);
if (channel == null && connection == null) {
connection = resourceFactory.createConnection();
resourceHolderToUse.addConnection(connection);
}
if (channel == null) {
channel = resourceFactory.createChannel(connection);
}
resourceHolderToUse.addChannel(channel, connection);

if (resourceHolderToUse != resourceHolder) {
bindResourceToTransaction(resourceHolderToUse, connectionFactory,
resourceFactory.isSynchedLocalTransactionAllowed());
}

return resourceHolderToUse;

} catch (IOException ex) {
RabbitUtils.closeChannel(channel);
RabbitUtils.closeConnection(connection);
throw new AmqpIOException(ex);
}
}


 

5,调用类CachingConnectionFactory的createConnection

@Override
public final Connection createConnection() throws AmqpException {
synchronized (this.connectionMonitor) {
if (this.cacheMode == CacheMode.CHANNEL) {
if (this.connection == null) {
this.connection = new ChannelCachingConnectionProxy(super.createBareConnection());
// invoke the listener *after* this.connection is assigned
getConnectionListener().onCreate(connection);
}
return this.connection;
}
else if (this.cacheMode == CacheMode.CONNECTION) {
ChannelCachingConnectionProxy connection = null;
while (connection == null && !this.idleConnections.isEmpty()) {
connection = this.idleConnections.poll();
if (connection != null) {
if (!connection.isOpen()) {
if (logger.isDebugEnabled()) {
logger.debug("Removing closed connection '" + connection + "'");
}
connection.notifyCloseIfNecessary();
this.openConnections.remove(connection);
this.openConnectionNonTransactionalChannels.remove(connection);
this.openConnectionTransactionalChannels.remove(connection);
connection = null;
}
}
}
if (connection == null) {
connection = new ChannelCachingConnectionProxy(super.createBareConnection());
getConnectionListener().onCreate(connection);
if (logger.isDebugEnabled()) {
logger.debug("Adding new connection '" + connection + "'");
}
this.openConnections.add(connection);
this.openConnectionNonTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());
this.openConnectionTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Obtained connection '" + connection + "' from cache");
}
}
return connection;
}
}
return null;
}

6,调用类ConnectionFactory的newConnection方法

public Connection newConnection(ExecutorService executor, Address[] addrs)
throws IOException
{
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
ConnectionParams params = params(executor);

if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
conn.init();
return conn;
} else {
IOException lastException = null;
for (Address addr : addrs) {
try {
FrameHandler handler = fhFactory.create(addr);
AMQConnection conn = new AMQConnection(params, handler);
conn.start();
return conn;
} catch (IOException e) {
lastException = e;
}
}
throw (lastException != null) ? lastException : new IOException("failed to connect");
}
}

7,调用AMQConnection的start方法

public void start()
throws IOException
{
initializeConsumerWorkService();
initializeHeartbeatSender();
this._running = true;
// Make sure that the first thing we do is to send the header,
// which should cause any socket errors to show up for us, rather
// than risking them pop out in the MainLoop
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
new AMQChannel.SimpleBlockingRpcContinuation();
// We enqueue an RPC continuation here without sending an RPC
// request, since the protocol specifies that after sending
// the version negotiation header, the client (connection
// initiator) is to wait for a connection.start method to
// arrive.
_channel0.enqueueRpc(connStartBlocker);
try {
// The following two lines are akin to AMQChannel's
// transmit() method for this pseudo-RPC.
_frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
_frameHandler.sendHeader();
} catch (IOException ioe) {
_frameHandler.close();
throw ioe;
}

// start the main loop going
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
// after this point clear-up of MainLoop is triggered by closing the frameHandler.

AMQP.Connection.Start connStart = null;
AMQP.Connection.Tune connTune = null;
try {
connStart =
(AMQP.Connection.Start) connStartBlocker.getReply().getMethod();

_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

Version serverVersion =
new Version(connStart.getVersionMajor(),
connStart.getVersionMinor());

if (!Version.checkVersion(clientVersion, serverVersion)) {
throw new ProtocolVersionMismatchException(clientVersion,
serverVersion);
}

String[] mechanisms = connStart.getMechanisms().toString().split(" ");
SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
if (sm == null) {
throw new IOException("No compatible authentication mechanism found - " +
"server offered [" + connStart.getMechanisms() + "]");
}

LongString challenge = null;
LongString response = sm.handleChallenge(null, this.username, this.password);

do {
Method method = (challenge == null)
? new AMQP.Connection.StartOk.Builder()
.clientProperties(_clientProperties)
.mechanism(sm.getName())
.response(response)
.build()
: new AMQP.Connection.SecureOk.Builder().response(response).build();

try {
Method serverResponse = _channel0.rpc(method).getMethod();
if (serverResponse instanceof AMQP.Connection.Tune) {
connTune = (AMQP.Connection.Tune) serverResponse;
} else {
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
response = sm.handleChallenge(challenge, this.username, this.password);
}
} catch (ShutdownSignalException e) {
Method shutdownMethod = e.getReason();
if (shutdownMethod instanceof AMQP.Connection.Close) {
AMQP.Connection.Close shutdownClose =  (AMQP.Connection.Close) shutdownMethod;
if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
throw new AuthenticationFailureException(shutdownClose.getReplyText());
}
}
throw new PossibleAuthenticationFailureException(e);
}
} while (connTune == null);
} catch (ShutdownSignalException sse) {
_frameHandler.close();
throw AMQChannel.wrap(sse);
} catch(IOException ioe) {
_frameHandler.close();
throw ioe;
}

try {
int channelMax =
negotiateChannelMax(this.requestedChannelMax,
connTune.getChannelMax());
_channelManager = instantiateChannelManager(channelMax, threadFactory);

int frameMax =
negotiatedMaxValue(this.requestedFrameMax,
connTune.getFrameMax());
this._frameMax = frameMax;

int heartbeat =
negotiatedMaxValue(this.requestedHeartbeat,
connTune.getHeartbeat());

setHeartbeat(heartbeat);

_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
.channelMax(channelMax)
.frameMax(frameMax)
.heartbeat(heartbeat)
.build());
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
.virtualHost(_virtualHost)
.build());
} catch (IOException ioe) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw ioe;
} catch (ShutdownSignalException sse) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw AMQChannel.wrap(sse);
}

// We can now respond to errors having finished tailoring the connection
this._inConnectionNegotiation = false;

return;
}


8,默认queue以及Channel的值

Queue [name=app_queue, durable=true, autoDelete=false, exclusive=false, arguments=null]
Cached Rabbit Channel: AMQChannel(amqp://ad@192.168.120.12:5672/,1)


9,调用RabbitAdmin完成相关通道和绑定等的创建

10,ConnectionFactoryUtils.getTransactionalResourceHolder完成后,开始建立消费者BlockingQueueConsumer的start方法中

this.consumer = new InternalConsumer(channel);
this.deliveryTags.clear();
this.activeObjectCounter.add(this);


11,SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法中调用

while (isActive(this.consumer) || continuable) {
try {
// Will come back false when the queue is drained
continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (continuable) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
/*
*  These will normally be wrapped by an LEFE if thrown by the
*  listener, but we will also honor it if thrown by an
*  error handler.
*/
}
}


 

12,后续调用业务onmessage见下图



 



 

注意:消费者默认属性

 

acknowledgeMode  prefetchCount=1

AcknowledgeMode AutoAck
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: