您的位置:首页 > 其它

activeMQ性能优化--对象池管理connection

2016-12-18 13:13 232 查看
activeMQ的某个应用场景,消费者和服务器只需建立一个长连接,而生产者的情况集中在服务器,需要对服务器端的生产者连接进行优化。
首先maven引入jar包依赖

[java]
view plain
copy





<dependency>  
       <groupId>org.activemq</groupId>  
       <artifactId>activemq-all</artifactId>  
       <version>5.9.0</version>  
   </dependency>  
   <dependency>  
       <groupId>org.apache.activemq</groupId>  
       <artifactId>activemq-pool</artifactId>  
       <version>5.9.0</version>  
       <exclusions>  
           <exclusion>  
               <groupId>org.apache.geronimo.specs</groupId>  
               <artifactId>geronimo-jms_1.1_spec</artifactId>  
           </exclusion>  
       </exclusions>  
   </dependency>  

下面是实现代码

[java]
view plain
copy





import org.apache.activemq.ActiveMQConnectionFactory;  
import org.apache.activemq.pool.PooledConnection;  
import org.apache.activemq.pool.PooledConnectionFactory;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
  
import javax.jms.*;  
  
public class MQProductHelper {  
  public static final Logger LOG = LoggerFactory.getLogger(MQProductHelper.class);  
  private static PooledConnectionFactory poolFactory;  
  
  /** 
   * 获取单例的PooledConnectionFactory 
   *  @return 
   */  
  private static synchronized PooledConnectionFactory getPooledConnectionFactory() {  
    LOG.info("getPooledConnectionFactory");  
    if (poolFactory != null) return poolFactory;  
    LOG.info("getPooledConnectionFactory create new");  
    IConfigService configService = ServiceManager.getService(IConfigService.class);  
    String userName = configService.getConfig("MQ_USER_NAME", ShopConstant.BC_SHOP_ID);  
    String password = configService.getConfig("MQ_USER_PASS", ShopConstant.BC_SHOP_ID);  
    String url = configService.getConfig("MQ_BROKER_URL", ShopConstant.BC_SHOP_ID);  
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);  
    poolFactory = new PooledConnectionFactory(factory);        
    // 池中借出的对象的最大数目  
    poolFactory.setMaxConnections(100);  
    poolFactory.setMaximumActiveSessionPerConnection(50);        
    //后台对象清理时,休眠时间超过了3000毫秒的对象为过期  
    poolFactory.setTimeBetweenExpirationCheckMillis(3000);  
    LOG.info("getPooledConnectionFactory create success");  
    return poolFactory;  
  }  
  
  /** 
   * 1.对象池管理connection和session,包括创建和关闭等 
   * 2.PooledConnectionFactory缺省设置MaxIdle为1, 
   *  官方解释Set max idle (not max active) since our connections always idle in the pool.   * 
   *  @return   * @throws JMSException 
   */  
  public static Session createSession() throws JMSException {  
    PooledConnectionFactory poolFactory = getPooledConnectionFactory();  
    PooledConnection pooledConnection = (PooledConnection) poolFactory.createConnection();  
    //false 参数表示 为非事务型消息,后面的参数表示消息的确认类型(见4.消息发出去后的确认模式)  
    return pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  }  
  
  public static void produce(String subject, String msg) {  
    LOG.info("producer send msg: {} ", msg);  
    if (StringUtil.isEmpty(msg)) {  
      LOG.warn("发送消息不能为空。");  
      return;  
    }  
    try {  
      Session session = createSession();  
      LOG.info("create session");  
      TextMessage textMessage = session.createTextMessage(msg);  
      Destination destination = session.createQueue(subject);  
      MessageProducer producer = session.createProducer(destination);  
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
      producer.send(textMessage);  
      LOG.info("create session success");  
    } catch (JMSException e) {  
      LOG.error(e.getMessage(), e);  
    }  
  }  
  
  public static void main(String[] args) {  
    MQProductHelper.produce("test.subject", "hello");  
  }  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq 线程池