ActiveMQ实现消息队列发送邮件
2016-08-17 14:05
519 查看
新建yncp.mq.mailservice工程,编写以下类
Mail类
消费者session监听类
发送邮件的模板类
spring-context.xml配置
spring-mail.xml 配置
spring-mq.xml配置
mail.properties属性
mq.properties属性配置
测试类:
新建 yncp.mq.mailclient工程。实现发送邮件。
MQProducer类
MQClientTest测试
spring-context.xml配置
spring-mq.xml配置
运行MQClientTest就可以发送邮件啦。
当我们服务器端没有运行时,此时运行MQClientTest。
我们会发现此时的activemq有一个消息挂起。
当启动服务器时,消息会发送到服务器端,服务器端接受消息后会发送邮件。
Mail类
package com.yncp.mq.entity; /*** * 邮件实体 * @author Administrator * */ public class Mail { /** 发件人 **/ private String from; /** 收件人 **/ private String to; /** 主题 **/ private String subject; /** 邮件内容 **/ private String content; public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
消费者session监听类
package com.yncp.mq.listener; import javax.jms.Message; import javax.jms.Session; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.listener.SessionAwareMessageListener; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.yncp.mq.entity.Mail; import com.yncp.mq.mail.MailTest; @Component public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<Message> { private static final Log log = LogFactory .getLog(ConsumerSessionAwareMessageListener.class); @Autowired private MailTest mailTest; public synchronized void onMessage(Message message, Session session) { try { ActiveMQTextMessage msg = (ActiveMQTextMessage) message; final String ms = msg.getText(); log.info("==>receive message:" + ms); Mail mail = JSONObject.parseObject(ms, Mail.class); if (mail == null) { return; } try { mailTest.sendMail(mail); } catch (Exception e) { log.error("==>MailException:", e); } } catch (Exception e) { log.error("==>", e); } } }
发送邮件的模板类
package com.yncp.mq.mail; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.mail.MailException; import org.springframework.mail.SimpleMailMessage; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import com.yncp.mq.entity.Mail; @Component("mailTest") public class MailTest { @Autowired private JavaMailSender mailSender; @Autowired private SimpleMailMessage simpleMailMessage; @Autowired private ThreadPoolTaskExecutor threadPool; /** * 发送邮件 * @param mail */ public void sendMail(final Mail mail) { threadPool.execute(new Runnable() { public void run() { try { simpleMailMessage.setFrom(simpleMailMessage.getFrom()); simpleMailMessage.setTo(mail.getTo()); simpleMailMessage.setSubject(mail.getSubject()); simpleMailMessage.setText(mail.getContent()); mailSender.send(simpleMailMessage); } catch (MailException e) { throw e; } } }); } }
spring-context.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-autowire="byName" default-lazy-init="false"> <!-- 采用注释的方式配置bean --> <context:annotation-config /> <!-- 配置要扫描的包 --> <context:component-scan base-package="com.yncp.mq" /> <!-- 读入配置属性文件 --> <context:property-placeholder location="classpath:mq.properties,classpath:mail.properties" /> <aop:aspectj-autoproxy proxy-target-class="true" /> <import resource="spring-mq.xml" /> <import resource="spring-mail.xml" /> </beans>
spring-mail.xml 配置
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:cache="http://www.springframework.org/schema/cache" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-3.2.xsd"> <!-- Spring提供的发送电子邮件的高级抽象类 --> <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl"> <property name="host" value="${mail.host}" /> <property name="username" value="${mail.username}" /> <property name="password" value="${mail.password}" /> <property name="port" value="${mail.port}" /> <property name="javaMailProperties"> <props> <prop key="mail.smtp.timeout">${mail.smtp.timeout}</prop> <prop key="mail.smtp.auth">${mail.smtp.auth}</prop> <prop key="mail.smtp.starttls.enable">${mail.smtp.starttls.enable}</prop> <prop key="mail.smtp.socketFactory.port">${mail.port}</prop> <prop key="mail.smtp.socketFactory.class">javax.net.ssl.SSLSocketFactory</prop> <prop key="mail.smtp.socketFactory.fallback">false</prop> </props> </property> </bean> <bean id="simpleMailMessage" class="org.springframework.mail.SimpleMailMessage"> <property name="from"> <value>${mail.default.from}</value> </property> </bean> <!-- 配置线程池 --> <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 线程池维护线程的最少数量 --> <property name="corePoolSize" value="5" /> <!-- 线程池维护线程所允许的空闲时间 --> <property name="keepAliveSeconds" value="30000" /> <!-- 线程池维护线程的最大数量 --> <property name="maxPoolSize" value="50" /> <!-- 线程池所使用的缓冲队列 --> <property name="queueCapacity" value="100" /> </bean> </beans>
spring-mq.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-autowire="byName" default-lazy-init="false"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服务地址 --> <property name="brokerURL" value="${mq.brokerURL}" /> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="${mq.pool.maxConnections}" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <!-- 队列模板 --> <bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestinationName" value="${queueName}"></property> </bean> <!--这个是sessionAwareQueue目的地 --> <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>${queueName}</value> </constructor-arg> </bean> <!-- 可以获取session的MessageListener --> <bean id="consumerSessionAwareMessageListener" class="com.yncp.mq.listener.ConsumerSessionAwareMessageListener"></bean> <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="sessionAwareQueue" /> <property name="messageListener" ref="consumerSessionAwareMessageListener" /> </bean> </beans>
mail.properties属性
mail.host=smtp.qq.com mail.port=465 mail.username= //开通了smtp服务的QQ邮箱 mail.password=//授权码 mail.smtp.auth=true mail.smtp.timeout=30000 mail.smtp.starttls.enable=true mail.default.from=1000@qq.com mail.replyTo=1000@qq.com
mq.properties属性配置
mq.brokerURL=tcp\://127.0.0.1\:61616 mq.userName=admin mq.password=admin mq.pool.maxConnections=10 #queueName queueName=mq.maintest
测试类:
import org.springframework.context.support.ClassPathXmlApplicationContext; public class MQMailTest { public static void main(String[] args) { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "classpath:spring-context.xml"); context.start(); } catch (Exception e) { System.exit(0); } } }
新建 yncp.mq.mailclient工程。实现发送邮件。
MQProducer类
package com.yncp.mq.client; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; import com.yncp.mq.entity.Mail; @Service("mqProducer") public class MQProducer { @Autowired private JmsTemplate activeMqJmsTemplate; /** * 发送消息. * * @param mail */ public void sendMessage(final Mail mail) { activeMqJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(JSONObject.toJSONString(mail)); } }); } }
MQClientTest测试
import org.springframework.context.support.ClassPathXmlApplicationContext; import com.yncp.mq.client.MQProducer; import com.yncp.mq.entity.Mail; public class MQClientTest { public static void main(String[] args) { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-context.xml"); context.start(); MQProducer mqProducer = (MQProducer) context.getBean("mqProducer"); // 发送邮件 Mail mail = new Mail (); mail.setTo("xxx@qq.com");//邮件接收方 mail.setSubject("hello");//邮件标题 mail.setContent("hello,this is first email");//邮件内容 mqProducer.sendMessage(mail); context.stop(); } catch (Exception e) { System.exit(0); } finally { System.exit(0); } } }
spring-context.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-autowire="byName" default-lazy-init="false"> <!-- 采用注释的方式配置bean --> <context:annotation-config /> <!-- 配置要扫描的包 --> <context:component-scan base-package="com.yncp.mq" /> <!-- 读入配置属性文件 --> <context:property-placeholder location="classpath:mq.properties" /> <!-- proxy-target-class默认"false",更改为"ture"使用CGLib动态代理 --> <aop:aspectj-autoproxy proxy-target-class="true" /> <import resource="spring-mq.xml" /> </beans>
spring-mq.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-autowire="byName" default-lazy-init="false"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服务地址 --> <property name="brokerURL" value="${mq.brokerURL}" /> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="${mq.pool.maxConnections}" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <!-- 队列模板 --> <bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestinationName" value="${queueName}"></property> </bean> </beans>
运行MQClientTest就可以发送邮件啦。
当我们服务器端没有运行时,此时运行MQClientTest。
我们会发现此时的activemq有一个消息挂起。
当启动服务器时,消息会发送到服务器端,服务器端接受消息后会发送邮件。
相关文章推荐
- 深入浅出JMS(六)--ActiveMQ实现消息队列发送邮件
- ActiveMQ的Topic方式实现异步的邮件发送和接收消息(m 3ff0
- Activemq 消息发送、接收java代码实现队列模式
- ActiveMQ5.0.1+Spring实现JMS异步消息发送
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- Spring学习笔记3之消息队列(rabbitmq)发送邮件功能
- spring-jms(activemq实现)使用queue发送消息简单例子
- Linux系统下使用mail发送一封简单的Internet邮件【以及验证邮件是否发送成功sendmail -bp,必须是root用户才可以使用此命令查看邮件消息队列中的内容】
- PHP 命令行模式实战之cli+mysql 模拟队列批量发送邮件(在Linux环境下PHP 异步执行脚本发送事件通知消息实际案例)
- 在Spring下集成ActiveMQ 实现点到点的消息发送
- Spring学习笔记3——消息队列(rabbitmq), 发送邮件
- 消息队列实现接收发送的例子
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- Linux消息队列的使用:实现server和client相互发送消息
- 消息队列实现从一个进程向另一个进程发送一个数据块的方法
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型
- JMS_使用ActiveMQ实现消息的发送和接收
- MSMQ?不,太弱了。使用ActiveMQ实现消息队列服务
- Activemq队列已满向消息发送端发送通知