您的位置:首页 > 其它

启动一个异步线程去执行一个任务

2018-01-19 21:54 453 查看
1.业务场景: 浦发银行充值接口,他行卡充值不是实时将充值结果返回的,需要我们自己去手动去查。银行充值接口实现业务都是预计2小时到账,但如果需求修改发送充值接口后,立马去调用银行提供的状态查询接口,将状态查询回来时,需要启动一个异步线程将结果查询回来.

代码编辑如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
           http://www.springframework.org/schema/aop   
           http://www.springframework.org/schema/aop/spring-aop-3.2.xsd  
           http://www.springframework.org/schema/tx  
           http://www.springframework.org/schema/tx/spring-tx-3.2.xsd  
           http://www.springframework.org/schema/context  
           http://www.springframework.org/schema/context/spring-context-3.2.xsd"     default-autowire="byName" default-lazy-init="false">

    <!-- 使用spring提供的线程池 -->
    <bean id="executePool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 线程池维护线程的最小数量 -->
        <property name="corePoolSize" value="1"></property>
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name="maxPoolSize" value="5"></property>
        <!-- 线程池所使用的最大数量 -->
        <property name="keepAliveSeconds" value="30000" />
        <!-- 线程池所使用的缓冲队列 -->
        <property name="queueCapacity" value="1000" />
    </bean>

    <bean id="asyExecuteService" class="com.adai.asy.service.imp.AsyExecuteServiceImp" destroy-method="destory">
        <property name="executePool"  ref="executePool" />
    </bean>
    
    <context:annotation-config/>
    <context:component-scan base-package="com.adai"/>
</beans>
package com.adai.asy.service;

import java.util.Map;
/**
* 异步服务接口
* @author v-chenk25
* @since 2018-01-19 21:35
*
*/
public interface AsyExecuteService {
/**
* 异步服务
* @param taskId  服务id
* @param sendMap 请求参数
*/
public void execute(String taskId , Map<String,Object> sendMap);
}
package com.adai.asy.service;

import java.util.Map;

/**
* 异步接口
* @author v-chenk25
* @since 2018-01-19 21:35
*
*/
public interface AsyService {
/**
* 异步交易接口
* @param sendMap 请求参数
*/
public void execute(Map<String,Object> sendMap) ;
}
package com.adai.asy.service.imp;

import java.util.Date;
import java.util.Map;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.adai.asy.service.AsyExecuteService;
import com.adai.asy.service.AsyService;
import com.adai.constant.Dict;
import com.adai.util.Utils;
/**
* 异步服务实现类:这里需要注入线程池启动一个线程去执行任务
* @author v-chenk25
* @since 2018-01-19 21:35
*/
public  class AsyExecuteServiceImp implements AsyExecuteService , ApplicationContextAware{
private ApplicationContext context ;
private ThreadPoolTaskExecutor executePool ;
public void execute(String taskId, final Map<String, Object> sendMap) {
sendMap.put(Dict.TASKID, taskId) ;
final AsyService asyService = (AsyService) this.context.getBean(taskId) ;
this.executePool.execute(new Runnable() {
public void run() {
asyService.execute(sendMap);
}
});

}

public Map<String,Object> prepareReq(Map<String,Object> sendMap){
sendMap.put(Dict.ORDERID, Utils.getCurrentFormat(new Date(), Dict.YNRH)) ;
return sendMap;
}

public void destory() {
if(this.executePool != null) {
this.executePool.shutdown();
}
}

public void setApplicationContext(ApplicationContext context) throws BeansException {
this.context = context ;
}
public void setExecutePool(ThreadPoolTaskExecutor executePool) {
this.executePool = executePool;
}

}
package com.adai.asy.service.imp;

import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.adai.asy.service.AsyService;
import com.adai.constant.Dict;
/**
* 异步服务接口实现类
* 这里主要时为了打印每一个任务需要执行的时间,正真执行业务逻辑的方法是executeInnert
* @author v-chenk25
*
*/
public abstract class AsyServiceImp implements AsyService{
private Log log = LogFactory.getLog(AsyServiceImp.class) ;
public void execute(Map<String,Object> sendMap) {
long start = System.currentTimeMillis();
log.info("异步线程开始执行:"+sendMap.get(Dict.TASKID)+"任务");
executeInnert(sendMap) ;
long end = System.currentTimeMillis();
log.info("异步线程执行结束,任务执行时间为:"+(end-start));
}

/** 真正执行业务逻辑 **/
public abstract void executeInnert(Map<String,Object> sendMap);
}
package com.adai.asy.service.imp;

import java.util.Map;

import org.springframework.stereotype.Service;

import com.adai.asy.service.AsyService;
/**
* 需要异步执行任务的具体实现类
* @author v-chenk25
*
*/
@Service("myTaskAsyService1")
public class MyTaskAsyServiceImp extends AsyServiceImp implements AsyService  {

@Override
public void executeInnert(Map<String, Object> sendMap) {
//做自己的业务逻辑处理 , 比如发银行的接口,调用dubbo服务
System.out.println(this.getClass().getName());
System.out.println(sendMap);
}

}
package com.adai.constant;
/**
* 常量字典
* @author v-chenk25
*
*/
public class Dict {
/**交易流水号**/
public static final String ORDERID = "OrderId" ;
/** 日期格式 **/
public static final String YND = "yyyyMMdd" ;
/**时间戳格式**/
public static final String YNRH = "yyyyMMdd HH:mm:ss sss" ;
/**时间格式**/
public static final String HOUR = "HH:mm:ss" ;
/**交易id**/
public static final String TASKID = "taskId" ;

}
package com.adai.util;

import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 工具类
* @author v-chenk25
*
*/
public class Utils {

/**当前日期指定格式字符串**/
public static String getCurrentFormat(Date date ,String format) {
return new SimpleDateFormat(format).format(date);
}
}
package com.adai.asy.service.imp;

import java.util.Map;

import org.springframework.stereotype.Service;

import com.adai.asy.service.AsyService;
/**
* 需要异步执行任务的具体实现类
* @author v-chenk25
*
*/
@Service("myTaskAsyService2")
public class MyTaskAsyServiceImp2 extends AsyServiceImp imp
afc8
lements AsyService  {

@Override
public void executeInnert(Map<String, Object> sendMap) {
//做自己的业务逻辑处理
System.out.println(this.getClass().getName());
System.out.println(sendMap);
}

}
package com.adai;

import java.util.HashMap;
import java.util.Map;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.adai.asy.service.AsyExecuteService;

public class TestAsy {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"classpath:spring-activemq.xml"}) ;
context.start();
AsyExecuteService asyService = (AsyExecuteService) context.getBean("asyExecuteService") ;
for(int i=1 ; i<10 ;i++) {
Map<String,Object> sendMap = new HashMap<String,Object>() ;
sendMap.put(i+"", i) ;
asyService.execute("myTaskAsyService"+1, sendMap);
}
}
}
总结:设计思路为:使用一个线程池开启一个线程去执行一个任务,任务id就是spring bean的名字,通ApplicationContextAware 来获取spring容器的bean,然后调用execute方法执行具体的业务逻辑。为了扩展,所以要求需要异步执行的类需要现实统一的接口,也就是AsyService接口。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐