您的位置:首页 > 编程语言 > Java开发

spring的RabbitTemplate 发送Message源码导读

2014-09-25 22:06 375 查看
1,首先业务方法调用RabbitTemplate的convertAndSend方法:(RabbitTemplate继承RabbitAccessor,实现了RabbitOperations和MessageListener接口

@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}


 

 

 

2,convertAndSend调用自己的重载方法:

public void convertAndSend(String exchange, String routingKey, final Object message,
final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
Message messageToSend = convertMessageIfNecessary(message);
messageToSend = messagePostProcessor.postProcessMessage(messageToSend);
send(exchange, routingKey, messageToSend, correlationData);
}


3,convertAndSend调用send方法:

public void send(final String exchange, final String routingKey,
final Message message, final CorrelationData correlationData)
throws AmqpException {
execute(new ChannelCallback<Object>() {

@Override
public Object doInRabbit(Channel channel) throws Exception {
doSend(channel, exchange, routingKey, message, correlationData);
return null;
}
});
}


4,send调用excute方法进行消息发送:

@Override
public <T> T execute(final ChannelCallback<T> action) {
if (this.retryTemplate != null) {
try {
return this.retryTemplate.execute(new RetryCallback<T, Exception>() {

@Override
public T doWithRetry(RetryContext context) throws Exception {
return RabbitTemplate.this.doExecute(action);
}

});
}
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
}
else {
return this.doExecute(action);
}
}


5,execute方法调用doExecute

private <T> T doExecute(ChannelCallback<T> action) {
Assert.notNull(action, "Callback object must not be null");
RabbitResourceHolder resourceHolder = getTransactionalResourceHolder();
Channel channel = resourceHolder.getChannel();
if (this.confirmCallback != null || this.returnCallback != null) {
addListener(channel);
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Executing callback on RabbitMQ Channel: " + channel);
}
return action.doInRabbit(channel);
}
catch (Exception ex) {
if (isChannelLocallyTransacted(channel)) {
resourceHolder.rollbackAll();
}
throw convertRabbitAccessException(ex);
}
finally {
ConnectionFactoryUtils.releaseResources(resourceHolder);
}
}


6,doExcute调用execute方法

public void send(final String exchange, final String routingKey,
final Message message, final CorrelationData correlationData)
throws AmqpException {
execute(new ChannelCallback<Object>() {

@Override
public Object doInRabbit(Channel channel) throws Exception {
doSend(channel, exchange, routingKey, message, correlationData);
return null;
}
});
}


7,execute调用doSend方法

protected void doSend(Channel channel, String exchange, String routingKey, Message message,
CorrelationData correlationData) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Publishing message on exchange [" + exchange + "], routingKey = [" + routingKey + "]");
}

if (exchange == null) {
// try to send to configured exchange
exchange = this.exchange;
}

if (routingKey == null) {
// try to send to configured routing key
routingKey = this.routingKey;
}
if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(),
new PendingConfirm(correlationData, System.currentTimeMillis()));
}
boolean mandatory = this.returnCallback != null && this.mandatory;
MessageProperties messageProperties = message.getMessageProperties();
if (mandatory) {
messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION, this.uuid);
}
BasicProperties convertedMessageProperties = this.messagePropertiesConverter
.fromMessageProperties(messageProperties, encoding);
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
// Check if commit needed
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
}


8,调用CachingConnectionFactory的invoke方法

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (methodName.equals("txSelect") && !this.transactional) {
throw new UnsupportedOperationException("Cannot start transaction on non-transactional channel");
}
if (methodName.equals("equals")) {
// Only consider equal when proxies are identical.
return (proxy == args[0]);
}
else if (methodName.equals("hashCode")) {
// Use hashCode of Channel proxy.
return System.identityHashCode(proxy);
}
else if (methodName.equals("toString")) {
return "Cached Rabbit Channel: " + this.target;
}
else if (methodName.equals("close")) {
// Handle close method: don't pass the call on.
if (active) {
synchronized (this.channelList) {
if (!RabbitUtils.isPhysicalCloseRequired() && this.channelList.size() < getChannelCacheSize()) {
logicalClose((ChannelProxy) proxy);
// Remain open in the channel list.
return null;
}
}
}

// If we get here, we're supposed to shut down.
physicalClose();
return null;
}
else if (methodName.equals("getTargetChannel")) {
// Handle getTargetChannel method: return underlying Channel.
return this.target;
}
else if (methodName.equals("isOpen")) {
// Handle isOpen method: we are closed if the target is closed
return this.target != null && this.target.isOpen();
}
try {
if (this.target == null || !this.target.isOpen()) {
this.target = null;
}
synchronized (targetMonitor) {
if (this.target == null) {
this.target = createBareChannel(theConnection, transactional);
}
return method.invoke(this.target, args);
}
}
catch (InvocationTargetException ex) {
if (this.target == null || !this.target.isOpen()) {
// Basic re-connection logic...
this.target = null;
if (logger.isDebugEnabled()) {
logger.debug("Detected closed channel on exception.  Re-initializing: " + target);
}
synchronized (targetMonitor) {
if (this.target == null) {
this.target = createBareChannel(theConnection, transactional);
}
}
}
throw ex.getTargetException();
}
}


9,调用rabbitmq的ChannelN类的basicPublish方法(继承AMQChannel,实现接口com.rabbitmq.client.Channel)

public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
BasicProperties useProps = props;
if (props == null) {
useProps = MessageProperties.MINIMAL_BASIC;
}
transmit(new AMQCommand(new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(),
useProps, body));
}


10,调用AMQChannel的transmit方法

public void transmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingTransmit(c);
}
}

11,调用quiescingTransmit方法

public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException e) {}

// This is to catch a situation when the thread wakes up during
// shutdown. Currently, no command that has content is allowed
// to send anything in a closing state.
ensureIsOpen();
}
}
c.transmit(this);
}
}


12,调用AMQCommand的transmit方法

public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();

synchronized (assembler) {
Method m = this.assembler.getMethod();
connection.writeFrame(m.toFrame(channelNumber));
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();

connection.writeFrame(this.assembler.getContentHeader()
.toFrame(channelNumber, body.length));

int frameMax = connection.getFrameMax();
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
- EMPTY_FRAME_SIZE;

for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;

int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
}
}

connection.flush();
}


13,调用AMQConnection的writeFrame方法送数据

public void writeFrame(Frame f) throws IOException {
_frameHandler.writeFrame(f);
_heartbeatSender.signalActivity();
}


14,调用SocketFrameHandler的方法writeFrame的方法,完成发送

/**
* Public API - writes this Frame to the given DataOutputStream
*/
public void writeTo(DataOutputStream os) throws IOException {
os.writeByte(type);
os.writeShort(channel);
if (accumulator != null) {
os.writeInt(accumulator.size());
accumulator.writeTo(os);
} else {
os.writeInt(payload.length);
os.write(payload);
}
os.write(AMQP.FRAME_END);
}

 

附图:



 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: