您的位置:首页 > 运维架构

UPOP数据统计系统中的批量导入数据功能

2012-07-05 16:43 501 查看
1.CommonUtils.java(工具类)

package com.unionpay.upa.importer;

import java.io.File;

import java.io.FileReader;

import java.io.IOException;

import java.io.LineNumberReader;

import java.io.PrintWriter;

import java.io.StringWriter;

import java.util.Locale;

import java.util.ResourceBundle;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

/**

* 工具类

* @author Jonathan

*

*/

public class CommonUtils {

/**

* 得到抛出异常的详细信息

* @param throwable抛出的异常

* @return

* @author huangdou

*/

public static String getTrace(Throwable throwable) {

StringWriter stringWriter = new StringWriter();

PrintWriter writer = new PrintWriter(stringWriter);

throwable.printStackTrace(writer);

StringBuffer buffer = stringWriter.getBuffer();

return buffer.toString();

}

/**

* 匹配一个字符串是否是数字类型

* @param str

* @return

*/

public static boolean isNumeric(String str) {

Pattern pattern = Pattern.compile("[0-9]*");

Matcher isNum = pattern.matcher(str);

if (!isNum.matches()) {

return false;

}

return true;

}

/**

* 得到文件总行数

* @param file

* @return

* @throws IOException

* @author huangdou

*/

public static int getTotalLines(File file) throws IOException {

FileReader in = new FileReader(file);

LineNumberReader reader = new LineNumberReader(in);

String line = reader.readLine();

int lines = 0;

while (line != null) {

lines++;

line = reader.readLine();

}

if (null != reader) {

reader.close();

}

if (null != in) {

in.close();

}

return lines;

}

/**

* 通过配置名字得到配置信息

* @param configNm

* @return

*/

public static String getSysConfigure(String configNm){

String strConfig="";

String BUNDLE_NAME = "sys_configure";

ResourceBundle RESOURCE_BUNDLE = ResourceBundle.getBundle(BUNDLE_NAME, Locale.getDefault());

strConfig = RESOURCE_BUNDLE.getString(configNm).trim();

return strConfig;

}

}

2.ConfigDataVO.java

package com.unionpay.upa.importer;

/**

* 配置文件信息VO

* @author jonathan

*

*/

public class ConfigDataVO {

//subFileNm 截取的文件名

private String subFileNm;

//subTblNm 截取的表名

private String subTblNm;

//tableNm 表名

private String tableNm;

//mappingNum 表字段的总数

private int mappingNum;

//tableFields 表字段的名称

private String[] tableFields;

//columnTypes 表字段的类型

private String[] columnTypes;

//columnLengths 表字段的长度

private int[] columnLengths;

//columnNullable 表字段是否为空

private String[] columnNullable;

//fieldMatchColunm 表字段对应的数据列数

private int[] fieldMatchColunm;

/**

* 构造器

* @param subFileNm

* @param subTblNm

* @param tableNm

* @param mappingNum

* @param tableFields

* @param columnTypes

* @param columnLengths

* @param columnNullable

* @param fieldMatchColunm

*/

public ConfigDataVO(String subFileNm, String subTblNm, String tableNm,

int mappingNum, String[] tableFields,

String[] columnTypes, int[] columnLengths, String[] columnNullable,

int[] fieldMatchColunm) {

super();

this.subFileNm = subFileNm;

this.subTblNm = subTblNm;

this.tableNm = tableNm;

this.mappingNum = mappingNum;

this.tableFields = tableFields;

this.columnTypes = columnTypes;

this.columnLengths = columnLengths;

this.columnNullable = columnNullable;

this.fieldMatchColunm = fieldMatchColunm;

}

public String getSubFileNm() {

return subFileNm;

}

public void setSubFileNm(String subFileNm) {

this.subFileNm = subFileNm;

}

public String getSubTblNm() {

return subTblNm;

}

public void setSubTblNm(String subTblNm) {

this.subTblNm = subTblNm;

}

public String getTableNm() {

return tableNm;

}

public void setTableNm(String tableNm) {

this.tableNm = tableNm;

}

public int getMappingNum() {

return mappingNum;

}

public void setMappingNum(int mappingNum) {

this.mappingNum = mappingNum;

}

public String[] getTableFields() {

return tableFields;

}

public void setTableFields(String[] tableFields) {

this.tableFields = tableFields;

}

public String[] getColumnTypes() {

return columnTypes;

}

public void setColumnTypes(String[] columnTypes) {

this.columnTypes = columnTypes;

}

public int[] getColumnLengths() {

return columnLengths;

}

public void setColumnLengths(int[] columnLengths) {

this.columnLengths = columnLengths;

}

public String[] getColumnNullable() {

return columnNullable;

}

public void setColumnNullable(String[] columnNullable) {

this.columnNullable = columnNullable;

}

public int[] getFieldMatchColunm() {

return fieldMatchColunm;

}

public void setFieldMatchColunm(int[] fieldMatchColunm) {

this.fieldMatchColunm = fieldMatchColunm;

}

}

3.连接池工具类DBConnection.java

package com.unionpay.upa.importer;

import java.sql.SQLException;

import org.apache.commons.dbcp.BasicDataSource;

/**

* @author 陈海洋

*

*/

public final class DBConnection {

// 数据库驱动

private static final String DRIVER =CommonUtils.getSysConfigure("db.driverClassName");

// 远程数据库地址

private static final String URL = CommonUtils.getSysConfigure("db.url");

// 连接数据库用户名

private static final String USERNAME = CommonUtils.getSysConfigure("db.username");

// 连接数据库密码

private static final String PASSWORD = CommonUtils.getSysConfigure("db.password");

// 初始化连接池数量

private static final String INITIALSIZE = CommonUtils.getSysConfigure("db.initialSize");

// 最大可活跃数

private static final String MAXACTIVE = CommonUtils.getSysConfigure("db.maxActive");

// 连接池最大空闲连接

private static final String MAXIDLE = CommonUtils.getSysConfigure("db.maxIdle");

// 连接池最大等待连接数量

private static final String MAXWAIT = CommonUtils.getSysConfigure("db.maxWait");

// Apache DBCP数据连接类

private static BasicDataSource ds ;

DBConnection() {

}

public static BasicDataSource getInstance() {

if(ds == null){

synchronized (DBConnection.class) {

if (ds == null) {

ds = new BasicDataSource();

}

// 驱动类名字

ds.setDriverClassName(DRIVER);

// 驱动的url

ds.setUrl(URL);

// 数据库的用户名和密码

ds.setUsername(USERNAME);

ds.setPassword(PASSWORD);

// 初始化连接数量

ds.setInitialSize(Integer.valueOf(INITIALSIZE));

// 最大可活跃数

ds.setMaxActive(Integer.valueOf(MAXACTIVE));

// 连接池最大空闲连接

ds.setMaxIdle(Integer.valueOf(MAXIDLE));

//最大等待时间

ds.setMaxWait(Integer.valueOf(MAXWAIT));

//设定连接在多少秒内被认为是放弃的连接,即可进行恢复利用

ds.setRemoveAbandonedTimeout(10);

//c. 输出回收的日志,可以详细打印出异常从而发现是在那里发生了泄漏

ds.setLogAbandoned(true);

}

}

return ds;

}

/**

* 关闭数据源

*/

public static void shutdownDataSource() throws SQLException {

BasicDataSource bds = (BasicDataSource) ds;

bds.close();

}

}

4.DownloadGzipFile.java(文件下载工具类)

package com.unionpay.upa.importer;

import java.io.File;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.InputStream;

import java.net.URL;

public class DownloadGzipFile {

public static void downloadFile(String remoteUrl,String localDir){

try {

String name = remoteUrl.substring(remoteUrl.trim().lastIndexOf("/"));

URL url = new URL(remoteUrl);

InputStream in = url.openConnection().getInputStream();

File file = new File(localDir + name);

FileOutputStream out = new FileOutputStream(file, true);

int counter = 0;

int ch;

byte[] buffer = new byte[1024];

while ((ch = in.read(buffer)) != -1) {

out.write(buffer, 0, ch);

counter += ch;

}

out.flush();

in.close();

out.close();

}catch (IOException e) {

e.printStackTrace();

}

}

}

5.解压文件工具类
package com.unionpay.upa.importer;

import java.io.BufferedInputStream;

import java.io.BufferedOutputStream;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.zip.GZIPInputStream;

import org.apache.commons.compress.archivers.ArchiveException;

import org.apache.commons.compress.archivers.ArchiveInputStream;

import org.apache.commons.compress.archivers.ArchiveStreamFactory;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;

/**

* 解压tar.gz文件包

*/

public class GZipUtil {

private BufferedOutputStream bufferedOutputStream;

String zipfileName = null;

public GZipUtil(String fileName) {

this.zipfileName = fileName;

}

/*

* 执行入口,rarFileName为需要解压的文件路径(具体到文件),destDir为解压目标路径

*/

public static void unTargzFile(String rarFileName, String destDir) {

GZipUtil gzip = new GZipUtil(rarFileName);

String outputDirectory = destDir;

File file = new File(outputDirectory);

if (!file.exists()) {

file.mkdir();

}

gzip.unzipOarFile(outputDirectory);

}

public void unzipOarFile(String outputDirectory) {

FileInputStream fis = null;

ArchiveInputStream in = null;

BufferedInputStream bufferedInputStream = null;

try {

fis = new FileInputStream(zipfileName);

GZIPInputStream is = new GZIPInputStream(new BufferedInputStream(

fis));

in = new ArchiveStreamFactory().createArchiveInputStream("tar", is);

bufferedInputStream = new BufferedInputStream(in);

TarArchiveEntry entry = (TarArchiveEntry) in.getNextEntry();

while (entry != null) {

String name = entry.getName();

String[] names = name.split("/");

String fileName = outputDirectory;

for (int i = 0; i < names.length; i++) {

String str = names[i];

fileName = fileName + File.separator + str;

}

if (name.endsWith("/")) {

mkFolder(fileName);

} else {

File file = mkFile(fileName);

bufferedOutputStream = new BufferedOutputStream(

new FileOutputStream(file));

int b;

while ((b = bufferedInputStream.read()) != -1) {

bufferedOutputStream.write(b);

}

bufferedOutputStream.flush();

bufferedOutputStream.close();

}

entry = (TarArchiveEntry) in.getNextEntry();

}

} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

} catch (ArchiveException e) {

e.printStackTrace();

} finally {

try {

if (bufferedInputStream != null) {

bufferedInputStream.close();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

private void mkFolder(String fileName) {

File f = new File(fileName);

if (!f.exists()) {

f.mkdir();

}

}

private File mkFile(String fileName) {

File f = new File(fileName);

try {

f.createNewFile();

} catch (IOException e) {

e.printStackTrace();

}

return f;

}

public static List<String> readfile(String filepath) {

List<String> lst = new ArrayList<String>();

readfile(filepath, lst);

return lst;

}

/**

* 读取某个文件夹下的所有文件

*/

private static void readfile(String filepath, List<String> lst) {

File file = new File(filepath);

if (!file.isDirectory()) {

System.out.println("文件");

System.out.println("path=" + file.getPath());

System.out.println("absolutepath=" + file.getAbsolutePath());

System.out.println("name=" + file.getName());

} else if (file.isDirectory()) {

String[] filelist = file.list();

for (int i = 0; i < filelist.length; i++) {

File readfile = new File(filepath + File.separator + filelist[i]);

if (!readfile.isDirectory()) {

if (readfile.getName().matches(

"^\\w+\\.txt\\.\\d{4}-\\d{2}-\\d{2}|\\w+\\.txt$")) {

lst.add(readfile.getAbsolutePath());

}

} else if (readfile.isDirectory()) {

readfile(filepath + File.separator + filelist[i], lst);

}

}

}

}

/**

* 删除某个文件夹下的所有文件

*/

public static void deleteFile(String delpath) {

File file = new File(delpath);

if (!file.isDirectory()) {

file.delete();

} else if (file.isDirectory()) {

String[] filelist = file.list();

for (int i = 0; i < filelist.length; i++) {

File delfile = new File(delpath + File.separator + filelist[i]);

if (!delfile.isDirectory()) {

delfile.delete();

} else if (delfile.isDirectory()) {

deleteFile(delpath + File.separator + filelist[i]);

delfile.delete();

}

}

}

}

}

6.https协议导入证书工具类SSLConnetionUtils

package com.unionpay.upa.importer;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileOutputStream;

import java.security.KeyStore;

import java.security.cert.Certificate;

import java.security.cert.CertificateException;

import java.text.MessageFormat;

import java.util.Enumeration;

import javax.net.ssl.SSLContext;

import javax.net.ssl.SSLSession;

import javax.net.ssl.SSLSocket;

import javax.net.ssl.SSLSocketFactory;

import javax.net.ssl.TrustManager;

import javax.net.ssl.X509TrustManager;

public class SSLConnetionUtils {

private static final String upopHost = CommonUtils.getSysConfigure("upopHost");

private static final int HTTPS_PORT = Integer.parseInt(CommonUtils.getSysConfigure("https_port"));

// 要导入的证书的别名

private static final String certFileAlias = CommonUtils.getSysConfigure("certFileAlias");

// Java Trust Store密码,初始密码为changeit

private static final String trustStorePassword = CommonUtils.getSysConfigure("trustStorePassword");

// Java Trust Store的路径

private static final String JAVA_HOME_SYS = System.getProperty("java.home");

private static final String trustStoreFileRelativePath = MessageFormat.format(CommonUtils.getSysConfigure("trustStoreFileRelativePath"),File.separator,File.separator,File.separator);

/**

* 导入证书到java key store

*

* @param certPath

* 要导入的证书的路径

* @param jksPath

* Trust Store的路径

* @param jksPassword

* Trust Store密码

*/

public static void importCertificate(String jksPath, String jksPassword) {

try {

Certificate certificate = getUpopCertificate();

KeyStore ks = KeyStore.getInstance("JKS");

ks.load(null, jksPassword.toCharArray());

setRemainCertificates(ks, jksPath, jksPassword);

ks.setCertificateEntry(certFileAlias, certificate);

ks.store(new FileOutputStream(jksPath), jksPassword.toCharArray());

System.out.println("Import Certificate " + certFileAlias + " Successful!");

} catch (Exception e) {

throw new RuntimeException("Exception while importing certificate to Java Key Store for unionpaysecure certificate.",

e);

}

}

/**

* 导入证书到java key store

*

* @param certificate

*/

public static void importCertificate() {

try {

String jksPath = JAVA_HOME_SYS + trustStoreFileRelativePath;

KeyStore ks = KeyStore.getInstance("JKS");

ks.load(null, trustStorePassword.toCharArray());

setRemainCertificates(ks, jksPath, trustStorePassword);

Certificate certificate = getUpopCertificate();

ks.setCertificateEntry(certFileAlias, certificate);

ks.store(new FileOutputStream(jksPath), trustStorePassword.toCharArray());

System.out.println("Import Certificate " + certFileAlias + " Successful!");

} catch (Exception e) {

throw new RuntimeException("Exception while importing certificate to Java Key Store for unionpaysecure certificate.",

e);

}

}

/**

* 保留原来java key store中的证书

*

* @param ks

* @param jksPath

* @param jksPassword

*/

private static void setRemainCertificates(KeyStore ks, String jksPath, String jksPassword) {

try {

FileInputStream in = new FileInputStream(jksPath);

KeyStore ks4RemainCerts = KeyStore.getInstance("JKS");

Certificate c = null;

ks4RemainCerts.load(in, jksPassword.toCharArray());

Enumeration<String> e = ks4RemainCerts.aliases();

String alias;

while (e.hasMoreElements()) {

alias = (String) e.nextElement();

c = ks4RemainCerts.getCertificate(alias);

ks.setCertificateEntry(alias, c);

}

} catch (Exception e) {

throw new RuntimeException(

"Exception while setting remain certificates to Java Key Store for unionpaysecure certificate.", e);

}

}

/**

* 删除某个证书

*

* @param alias

* @param jksPath

* @param jksPassword

*/

public static void deleteCertificate(String alias, String jksPath, String jksPassword) {

try {

FileInputStream in = new FileInputStream(jksPath);

KeyStore ks = KeyStore.getInstance("JKS");

ks.load(in, jksPassword.toCharArray());

if (ks.containsAlias(alias)) {

ks.deleteEntry(alias);

ks.store(new FileOutputStream(jksPath), jksPassword.toCharArray());

}

} catch (Exception e) {

throw new RuntimeException("Exception while deleting certificate from Java Key Store.", e);

}

}

/**

* 删除某个证书

*

* @param alias

*/

public static void deleteCertificate(String alias) {

try {

String jksPath = JAVA_HOME_SYS + trustStoreFileRelativePath;

FileInputStream in = new FileInputStream(jksPath);

KeyStore ks = KeyStore.getInstance("JKS");

ks.load(in, trustStorePassword.toCharArray());

if (ks.containsAlias(alias)) {

ks.deleteEntry(alias);

ks.store(new FileOutputStream(jksPath), trustStorePassword.toCharArray());

}

} catch (Exception e) {

throw new RuntimeException("Exception while deleting certificate from Java Key Store.", e);

}

}

/**

* 打印证书信息

*

* @param jksPath

* @param jksPassword

*/

public static void printCertificateInfo(String jksPath, String jksPassword) {

try {

FileInputStream in = new FileInputStream(jksPath);

KeyStore ks = KeyStore.getInstance("JKS");

ks.load(in, jksPassword.toCharArray());

Enumeration<String> e = ks.aliases();

String alias;

while (e.hasMoreElements()) {

alias = (String) e.nextElement();

if (certFileAlias.equals(alias)) {

System.out.println("Your imported certificate " + alias);

Certificate c = ks.getCertificate(alias);

System.out.println("Content: " + c.toString());

}

}

} catch (Exception e) {

throw new RuntimeException("Exception while printing certificates in Java Key Store.", e);

}

}

/**

* 打印证书信息

*/

public static void printCertificateInfo() {

try {

String jksPath = JAVA_HOME_SYS + trustStoreFileRelativePath;

FileInputStream in = new FileInputStream(jksPath);

KeyStore ks = KeyStore.getInstance("JKS");

ks.load(in, trustStorePassword.toCharArray());

Enumeration<String> e = ks.aliases();

String alias;

while (e.hasMoreElements()) {

alias = (String) e.nextElement();

if (certFileAlias.equals(alias)) {

System.out.println("Your imported certificate " + alias);

Certificate c = ks.getCertificate(alias);

System.out.println("Content: " + c.toString());

}

}

} catch (Exception e) {

throw new RuntimeException("Exception while printing certificates in Java Key Store.", e);

}

}

/**

* 从银联在线支付网站获取证书

*

* @return

*/

public static Certificate getUpopCertificate() {

try {

TrustManager trm = new X509TrustManager() {

public java.security.cert.X509Certificate[] getAcceptedIssuers() {

return null;

}

@Override

public void checkClientTrusted(java.security.cert.X509Certificate[] arg0, String arg1)

throws CertificateException {

}

@Override

public void checkServerTrusted(java.security.cert.X509Certificate[] arg0, String arg1)

throws CertificateException {

}

};

SSLContext sc = SSLContext.getInstance("SSL");

sc.init(null, new TrustManager[] { trm }, null);

SSLSocketFactory factory = sc.getSocketFactory();

SSLSocket socket = (SSLSocket) factory.createSocket(upopHost, HTTPS_PORT);

socket.startHandshake();

SSLSession session = socket.getSession();

Certificate[] servercerts = session.getPeerCertificates();

socket.close();

return servercerts[0];

} catch (Exception e) {

throw new RuntimeException("Exception while get upop certificate.", e);

}

}

/**

* 主方法导入证书

* @param args

*/

public static void main(String[] args) {

// 如果网站证书存在删除证书

SSLConnetionUtils.deleteCertificate(certFileAlias);

// 导入网站证书

SSLConnetionUtils.importCertificate();

}

}

7.线程池工具类

package com.unionpay.upa.importer;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* 线程池

*

* @author jonathan

*

*/

public class ThreadPool {

// 线程池维护线程的最大数量

public final static int maximumPoolSize = Integer.parseInt(CommonUtils

.getSysConfigure("thread.maximumPoolSize"));

// 线程池维护线程的最少数量

final static int corePoolSize = Integer.parseInt(CommonUtils

.getSysConfigure("thread.corePoolSize"));;

// 线程池维护线程所允许的空闲时间

final static long keepAliveTime = Integer.parseInt(CommonUtils

.getSysConfigure("thread.keepAliveTime"));

// 线程池所使用的最大缓冲队列

public final static int maxBlockingQueue = Integer.parseInt(CommonUtils

.getSysConfigure("thread.maxBlockingQueue"));

// 线程池

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(

corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,

new ArrayBlockingQueue<Runnable>(maxBlockingQueue),

new ThreadPoolExecutor.CallerRunsPolicy());

/**

* 添加任务

*

* @param thread

*/

public static void addTask(Runnable thread) {

try {

// 执行任务

executor.execute(thread);

// 便于观察,等待一段时间

Thread.sleep(0);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

/**

* 得到当前运行的线程的数量

*

* @return

*/

public static int getCurrentQueueSize() {

return executor.getQueue().size();

}

/**

* 关闭线程池

*/

public static void existThreadPool() {

executor.shutdown();

}

/**

* 判断线程池中的线程是否都已经完成任务

*

* @return

*/

public static boolean isEndTask() {

if (executor.getActiveCount() == 0) {

// 如果任务执行完成,则关闭线程池

// existThreadPool();

return true;

}else{

return false;

}

}

}

8.线程接口

package com.unionpay.upa.importer;

import java.util.List;

public interface ThreadWork {

/**

* 数据导入的方法

* @param configDataVO 配置文件的信息vo

* @param linesList 读取的行集合

*/

void writer(ConfigDataVO configDataVO,List<String> addList);

}

9.线程接口实现类DBImportWork

package com.unionpay.upa.importer;

import java.util.ArrayList;

import java.util.List;

/**

* 线程读取文件导入数据

*

* @author huangdou

*

*/

public class DBImportWork implements ThreadWork {

/**

* 数据导入的方法

* @param configDataVO 配置文件的信息vo

* @param linesList 读取的行集合

*/

@Override

public void writer(ConfigDataVO configDataVO,List<String> addList) {

// 批量执行的条数

List<String> list = new ArrayList<String>();

// 拼接而成的sql

String sql = FileImportUtils.spliceSql(configDataVO.getTableNm(), configDataVO.getMappingNum(),configDataVO.getTableFields());

// 从log文件将数据导入到对应表中

// if (configDataVO.getSubFileNm().equals(configDataVO.getSubTblNm())) {

list.addAll(addList);

// 批量100条插入数据库

if (list.size()>0) {

FileImportUtils.importData(sql,configDataVO.getTableNm(), list, configDataVO.getColumnTypes(),configDataVO.getColumnLengths(), configDataVO.getColumnNullable(),configDataVO.getFieldMatchColunm());

}

// }

}

}

10.批量导入数据的工具类

package com.unionpay.upa.importer;

import java.math.BigDecimal;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.List;

import org.apache.commons.dbcp.BasicDataSource;

import org.apache.commons.lang.StringUtils;

import org.apache.log4j.Logger;

/**

* 批量导入数据的工具类

*

* @author huangdou

*

*/

public class FileImportUtils {

/**

* logger日志

*/

private static Logger logger = Logger.getLogger(FileImportUtils.class);

/**

* 拼接sql的方法

*

* @param database_table_name 表名

* @param columnNum 列编号

* @param dataColumn 列名

* @return

* @author huangdou

*/

public static String spliceSql(String database_table_name, int columnNum,

String[] dataColumn) {

String sql = "insert into " + database_table_name + "(";

String val = " values (";

for (int i = 0; i < columnNum; i++) {

String fieldName = dataColumn[i];

if (i == columnNum - 1) {

sql += fieldName + ")";

val += "?);";

} else {

sql += fieldName + ",";

val += "?,";

}

}

sql += val;

return sql;

}

/**

* 导入数据

*

* @param sql

* sql执行语句

* @param list

* 执行的数据

* @param tableColumnType

* 表字段的数据类型

* @param tableColumnLength

* 表字段的长度

* @param tableColumnNullable

* 表字段是否为空

* @param fieldMatchColunm

* 表字段对应的列数

* @param subTblNm

* 表名

*/

public static void importData(String sql, String tableNm,

List<String> list, String[] tableColumnType,

int[] tableColumnLength, String[] tableColumnNullable,

int[] fieldMatchColunm) {

// 获取连接对象

BasicDataSource dbcn = DBConnection.getInstance();

// 获取连接

Connection con = null;

PreparedStatement mstmt = null;

try {

con = dbcn.getConnection();

con.setAutoCommit(false);

mstmt = con.prepareStatement(sql);

// 五、验证数据的内容

List<String> addList = validateContent(list, tableColumnType,

tableColumnLength, tableColumnNullable);

// 六、开始设置值插入数据库

if (addList.size() > 0) {

for (String lineCon : addList) {

String readoneline[] = lineCon.split("\\|");

// 设置值

setValueForTable(mstmt, tableColumnType, fieldMatchColunm,

readoneline, addList);

mstmt.addBatch();

}

}

mstmt.executeBatch();

con.commit();

} catch (SQLException e) {

logger.error(CommonUtils.getTrace(e) + "\nSQLSTATE="

+ e.getSQLState() + "\nERROR=" + e.getErrorCode());

} finally {

try {

if (mstmt != null) {

mstmt.close();

}

if (con != null && !con.isClosed()) {

con.close();

}

} catch (Exception e2) {

e2.printStackTrace();

}

}

}

/**

* 为表字段设置值

*

* @param mstmt

* @param tableColumnType

* @param fieldMatchColunm

* @param readoneline

*/

private static void setValueForTable(PreparedStatement mstmt,

String[] tableColumnType, int[] fieldMatchColunm,

String[] readoneline, List<String> lineLists) {

try {

for (int j = 0; j < lineLists.size(); j++) {

for (int i = 0; i < readoneline.length; i++) {

if ("varchar".equalsIgnoreCase(tableColumnType[i])

|| "char".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setString(i + 1,

readoneline[fieldMatchColunm[i] - 1].trim());

} else if ("int".equalsIgnoreCase(tableColumnType[i])

|| "smallint".equalsIgnoreCase(tableColumnType[i])) {

if (StringUtils

.isEmpty(readoneline[fieldMatchColunm[i] - 1])) {

mstmt.setInt(

i + 1,

Integer.parseInt(readoneline[fieldMatchColunm[i] - 1] + 0));

} else {

mstmt.setInt(

i + 1,

Integer.parseInt(readoneline[fieldMatchColunm[i] - 1]));

}

} else if ("bigint".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setLong(

i + 1,

Long.parseLong(readoneline[fieldMatchColunm[i] - 1]));

} else if ("float".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setFloat(

i + 1,

Float.parseFloat(readoneline[fieldMatchColunm[i] - 1]));

} else if ("double".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setDouble(

i + 1,

Double.parseDouble(readoneline[fieldMatchColunm[i] - 1]));

} else if ("decimal".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setBigDecimal(

i + 1,

BigDecimal.valueOf(Double

.parseDouble(readoneline[fieldMatchColunm[i] - 1])));

} else if ("date".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setString(i + 1,

readoneline[fieldMatchColunm[i] - 1]);

} else if ("timestamp".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setString(i + 1,

readoneline[fieldMatchColunm[i] - 1]);

}

}

}

} catch (NumberFormatException e) {

e.printStackTrace();

} catch (SQLException e) {

e.printStackTrace();

}

}

/**

* 找到LogAction表里面最大的编号

*

* @return

* @author huangdou

*/

public static int findMaxLogActionId() {

// 获取连接对象

BasicDataSource dbcn = DBConnection.getInstance();

Connection con = null;

PreparedStatement mstmt = null;

int maxLogActionId = 0;

try {

// 获取连接

con = dbcn.getConnection();

con.setAutoCommit(false);

String sql = "select max(log_action_id) from TBL_UPA_LOG_ACTION";

mstmt = con.prepareStatement(sql);

ResultSet rs = mstmt.executeQuery(sql);

con.commit();

while (rs.next()) {

maxLogActionId = rs.getInt(1);

}

} catch (SQLException e) {

logger.error(CommonUtils.getTrace(e));

} finally {

try {

if(null!=mstmt){

mstmt.close();

}

if(null!=con&&!con.isClosed()){

con.close();

}

} catch (Exception e2) {

e2.printStackTrace();

}

}

return maxLogActionId;

}

/**

* 清空表数据

*

* @param tableNm

*/

public static void clearTableData(String tableNm) {

// 获取连接对象

BasicDataSource dbcn = DBConnection.getInstance();

Connection con = null;

PreparedStatement mstmt = null;

try {

// 获取连接

con = dbcn.getConnection();

con.setAutoCommit(false);

String sql = "TRUNCATE TABLE " + tableNm + ";";

mstmt = con.prepareStatement(sql);

mstmt.executeUpdate(sql);

con.commit();

} catch (SQLException e) {

logger.error(CommonUtils.getTrace(e));

} finally {

try {

if(null!=mstmt){

mstmt.close();

}

if(null!=con&&!con.isClosed()){

con.close();

}

} catch (Exception e2) {

e2.printStackTrace();

}

}

}

/**

* 验证字段的长度和数据类型是否匹配

*

* @param list

* @param tableColumnType

* @return

*/

private static List<String> validateContent(List<String> list,

String[] tableColumnType, int[] tableColumnLength,

String[] tableColumnNullable) {

List<String> addList = new ArrayList<String>();

for (String contents : list) {

String[] contentArr = contents.split("\\|");

if (validateColumnSums(contentArr, tableColumnType.length)

&& validateContentDetail(contentArr, tableColumnType,

tableColumnLength, tableColumnNullable)) {

addList.add(contents);

}

}

return addList;

}

/**

* 验证以|符号分隔后的数组长度是否与表字段数相等

*

* @param contentArr 数组

* @param clomunSums 字段数

* @return

*/

private static boolean validateColumnSums(String[] contentArr,

int clomunSums) {

if (contentArr.length != clomunSums) {

logger.error("Data sums ErrorID=" + contentArr[0]);

return false;

}

return true;

}

/**

* 验证字段的长度、数据类型是否正确

*

* @param contentArr

* 分隔后的数组

* @param tableColumnType

* 字段类型数组

* @param tableColumnLength

* 字段长度数组

* @param tableColumnNullable

* 字段是否为空的数组

* @return

*/

private static boolean validateContentDetail(String[] contentArr,

String[] tableColumnType, int[] tableColumnLength,

String[] tableColumnNullable) {

boolean flag = true;

for (int i = 0; i < tableColumnLength.length; i++) {

// 字段的长度要小于等于数据库定义的长度

if (!(tableColumnType[i].equals("timestamp"))

&& !(tableColumnType[i].equals("date"))) {

if (contentArr[i].length() > tableColumnLength[i]) {

logger.error("The Field length is too long!="

+ contentArr[i] + "ErrorID=" + contentArr[0]);

flag = false;

break;

}

}

if (!validateColumnType(contentArr[i], tableColumnType[i],

tableColumnNullable[i])) {

logger.error("Data type is invalid or data can not be empty!ErrorID="

+ contentArr[0]);

flag = false;

break;

}

}

return flag;

}

/**

* 验证所传数据与数据库类型是否正确

*

* @param content

* @param columnType

* @return

*/

private static boolean validateColumnType(String content,

String columnType, String columnNullable) {

boolean flag = true;

// 如果字段为非空,所传数据为空,则返回false

// if (StringUtils.isEmpty(content)&&columnNullable.equals("N")) {

// return false;

// }

// 验证数据类型

try {

if ("timestamp".equalsIgnoreCase(columnType)) {

try {

SimpleDateFormat df1 = new SimpleDateFormat(

"yyyy-MM-dd HH:mm:ss");

SimpleDateFormat df2 = new SimpleDateFormat(

"yyyy/MM/dd HH:mm:ss");

if (content.indexOf("-") > 0) {

df1.parse(content);

} else if (content.indexOf("/") > 0) {

df2.parse(content);

}

} catch (ParseException e) {

flag = false;

}

} else if ("date".equalsIgnoreCase(columnType)) {

try {

SimpleDateFormat df1 = new SimpleDateFormat("yyyy-MM-dd");

SimpleDateFormat df2 = new SimpleDateFormat("yyyy/MM/dd");

if (content.indexOf("-") > 0) {

df1.parse(content);

} else if (content.indexOf("/") > 0) {

df2.parse(content);

}

} catch (ParseException e) {

flag = false;

}

} else if ("int".equalsIgnoreCase(columnType)

|| "bigint".equalsIgnoreCase(columnType)) {

return CommonUtils.isNumeric(content);

}

} catch (NumberFormatException e) {

logger.error(CommonUtils.getTrace(e));

flag = false;

}

return flag;

}

}

package com.unionpay.upa.importer;

import java.math.BigDecimal;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.List;

import org.apache.commons.dbcp.BasicDataSource;

import org.apache.commons.lang.StringUtils;

import org.apache.log4j.Logger;

/**

* 批量导入数据的工具类

*

* @author huangdou

*

*/

public class FileImportUtils {

/**

* logger日志

*/

private static Logger logger = Logger.getLogger(FileImportUtils.class);

/**

* 拼接sql的方法

*

* @param database_table_name 表名

* @param columnNum 列编号

* @param dataColumn 列名

* @return

* @author huangdou

*/

public static String spliceSql(String database_table_name, int columnNum,

String[] dataColumn) {

String sql = "insert into " + database_table_name + "(";

String val = " values (";

for (int i = 0; i < columnNum; i++) {

String fieldName = dataColumn[i];

if (i == columnNum - 1) {

sql += fieldName + ")";

val += "?);";

} else {

sql += fieldName + ",";

val += "?,";

}

}

sql += val;

return sql;

}

/**

* 导入数据

*

* @param sql

* sql执行语句

* @param list

* 执行的数据

* @param tableColumnType

* 表字段的数据类型

* @param tableColumnLength

* 表字段的长度

* @param tableColumnNullable

* 表字段是否为空

* @param fieldMatchColunm

* 表字段对应的列数

* @param subTblNm

* 表名

*/

public static void importData(String sql, String tableNm,

List<String> list, String[] tableColumnType,

int[] tableColumnLength, String[] tableColumnNullable,

int[] fieldMatchColunm) {

// 获取连接对象

BasicDataSource dbcn = DBConnection.getInstance();

// 获取连接

Connection con = null;

PreparedStatement mstmt = null;

try {

con = dbcn.getConnection();

con.setAutoCommit(false);

mstmt = con.prepareStatement(sql);

// 五、验证数据的内容

List<String> addList = validateContent(list, tableColumnType,

tableColumnLength, tableColumnNullable);

// 六、开始设置值插入数据库

if (addList.size() > 0) {

for (String lineCon : addList) {

String readoneline[] = lineCon.split("\\|");

// 设置值

setValueForTable(mstmt, tableColumnType, fieldMatchColunm,

readoneline, addList);

mstmt.addBatch();

}

}

mstmt.executeBatch();

con.commit();

} catch (SQLException e) {

logger.error(CommonUtils.getTrace(e) + "\nSQLSTATE="

+ e.getSQLState() + "\nERROR=" + e.getErrorCode());

} finally {

try {

if (mstmt != null) {

mstmt.close();

}

if (con != null && !con.isClosed()) {

con.close();

}

} catch (Exception e2) {

e2.printStackTrace();

}

}

}

/**

* 为表字段设置值

*

* @param mstmt

* @param tableColumnType

* @param fieldMatchColunm

* @param readoneline

*/

private static void setValueForTable(PreparedStatement mstmt,

String[] tableColumnType, int[] fieldMatchColunm,

String[] readoneline, List<String> lineLists) {

try {

for (int j = 0; j < lineLists.size(); j++) {

for (int i = 0; i < readoneline.length; i++) {

if ("varchar".equalsIgnoreCase(tableColumnType[i])

|| "char".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setString(i + 1,

readoneline[fieldMatchColunm[i] - 1].trim());

} else if ("int".equalsIgnoreCase(tableColumnType[i])

|| "smallint".equalsIgnoreCase(tableColumnType[i])) {

if (StringUtils

.isEmpty(readoneline[fieldMatchColunm[i] - 1])) {

mstmt.setInt(

i + 1,

Integer.parseInt(readoneline[fieldMatchColunm[i] - 1] + 0));

} else {

mstmt.setInt(

i + 1,

Integer.parseInt(readoneline[fieldMatchColunm[i] - 1]));

}

} else if ("bigint".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setLong(

i + 1,

Long.parseLong(readoneline[fieldMatchColunm[i] - 1]));

} else if ("float".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setFloat(

i + 1,

Float.parseFloat(readoneline[fieldMatchColunm[i] - 1]));

} else if ("double".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setDouble(

i + 1,

Double.parseDouble(readoneline[fieldMatchColunm[i] - 1]));

} else if ("decimal".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setBigDecimal(

i + 1,

BigDecimal.valueOf(Double

.parseDouble(readoneline[fieldMatchColunm[i] - 1])));

} else if ("date".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setString(i + 1,

readoneline[fieldMatchColunm[i] - 1]);

} else if ("timestamp".equalsIgnoreCase(tableColumnType[i])) {

mstmt.setString(i + 1,

readoneline[fieldMatchColunm[i] - 1]);

}

}

}

} catch (NumberFormatException e) {

e.printStackTrace();

} catch (SQLException e) {

e.printStackTrace();

}

}

/**

* 找到LogAction表里面最大的编号

*

* @return

* @author huangdou

*/

public static int findMaxLogActionId() {

// 获取连接对象

BasicDataSource dbcn = DBConnection.getInstance();

Connection con = null;

PreparedStatement mstmt = null;

int maxLogActionId = 0;

try {

// 获取连接

con = dbcn.getConnection();

con.setAutoCommit(false);

String sql = "select max(log_action_id) from TBL_UPA_LOG_ACTION";

mstmt = con.prepareStatement(sql);

ResultSet rs = mstmt.executeQuery(sql);

con.commit();

while (rs.next()) {

maxLogActionId = rs.getInt(1);

}

} catch (SQLException e) {

logger.error(CommonUtils.getTrace(e));

} finally {

try {

if(null!=mstmt){

mstmt.close();

}

if(null!=con&&!con.isClosed()){

con.close();

}

} catch (Exception e2) {

e2.printStackTrace();

}

}

return maxLogActionId;

}

/**

* 清空表数据

*

* @param tableNm

*/

public static void clearTableData(String tableNm) {

// 获取连接对象

BasicDataSource dbcn = DBConnection.getInstance();

Connection con = null;

PreparedStatement mstmt = null;

try {

// 获取连接

con = dbcn.getConnection();

con.setAutoCommit(false);

String sql = "TRUNCATE TABLE " + tableNm + ";";

mstmt = con.prepareStatement(sql);

mstmt.executeUpdate(sql);

con.commit();

} catch (SQLException e) {

logger.error(CommonUtils.getTrace(e));

} finally {

try {

if(null!=mstmt){

mstmt.close();

}

if(null!=con&&!con.isClosed()){

con.close();

}

} catch (Exception e2) {

e2.printStackTrace();

}

}

}

/**

* 验证字段的长度和数据类型是否匹配

*

* @param list

* @param tableColumnType

* @return

*/

private static List<String> validateContent(List<String> list,

String[] tableColumnType, int[] tableColumnLength,

String[] tableColumnNullable) {

List<String> addList = new ArrayList<String>();

for (String contents : list) {

String[] contentArr = contents.split("\\|");

if (validateColumnSums(contentArr, tableColumnType.length)

&& validateContentDetail(contentArr, tableColumnType,

tableColumnLength, tableColumnNullable)) {

addList.add(contents);

}

}

return addList;

}

/**

* 验证以|符号分隔后的数组长度是否与表字段数相等

*

* @param contentArr 数组

* @param clomunSums 字段数

* @return

*/

private static boolean validateColumnSums(String[] contentArr,

int clomunSums) {

if (contentArr.length != clomunSums) {

logger.error("Data sums ErrorID=" + contentArr[0]);

return false;

}

return true;

}

/**

* 验证字段的长度、数据类型是否正确

*

* @param contentArr

* 分隔后的数组

* @param tableColumnType

* 字段类型数组

* @param tableColumnLength

* 字段长度数组

* @param tableColumnNullable

* 字段是否为空的数组

* @return

*/

private static boolean validateContentDetail(String[] contentArr,

String[] tableColumnType, int[] tableColumnLength,

String[] tableColumnNullable) {

boolean flag = true;

for (int i = 0; i < tableColumnLength.length; i++) {

// 字段的长度要小于等于数据库定义的长度

if (!(tableColumnType[i].equals("timestamp"))

&& !(tableColumnType[i].equals("date"))) {

if (contentArr[i].length() > tableColumnLength[i]) {

logger.error("The Field length is too long!="

+ contentArr[i] + "ErrorID=" + contentArr[0]);

flag = false;

break;

}

}

if (!validateColumnType(contentArr[i], tableColumnType[i],

tableColumnNullable[i])) {

logger.error("Data type is invalid or data can not be empty!ErrorID="

+ contentArr[0]);

flag = false;

break;

}

}

return flag;

}

/**

* 验证所传数据与数据库类型是否正确

*

* @param content

* @param columnType

* @return

*/

private static boolean validateColumnType(String content,

String columnType, String columnNullable) {

boolean flag = true;

// 如果字段为非空,所传数据为空,则返回false

// if (StringUtils.isEmpty(content)&&columnNullable.equals("N")) {

// return false;

// }

// 验证数据类型

try {

if ("timestamp".equalsIgnoreCase(columnType)) {

try {

SimpleDateFormat df1 = new SimpleDateFormat(

"yyyy-MM-dd HH:mm:ss");

SimpleDateFormat df2 = new SimpleDateFormat(

"yyyy/MM/dd HH:mm:ss");

if (content.indexOf("-") > 0) {

df1.parse(content);

} else if (content.indexOf("/") > 0) {

df2.parse(content);

}

} catch (ParseException e) {

flag = false;

}

} else if ("date".equalsIgnoreCase(columnType)) {

try {

SimpleDateFormat df1 = new SimpleDateFormat("yyyy-MM-dd");

SimpleDateFormat df2 = new SimpleDateFormat("yyyy/MM/dd");

if (content.indexOf("-") > 0) {

df1.parse(content);

} else if (content.indexOf("/") > 0) {

df2.parse(content);

}

} catch (ParseException e) {

flag = false;

}

} else if ("int".equalsIgnoreCase(columnType)

|| "bigint".equalsIgnoreCase(columnType)) {

return CommonUtils.isNumeric(content);

}

} catch (NumberFormatException e) {

logger.error(CommonUtils.getTrace(e));

flag = false;

}

return flag;

}

}

11.读写文件的线程

package com.unionpay.upa.importer;

import java.io.File;

import java.io.FileReader;

import java.io.IOException;

import java.io.LineNumberReader;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import java.util.Queue;

import java.util.concurrent.ArrayBlockingQueue;

import org.apache.log4j.Logger;

/**

* 读写文件的线程

*

* @author huangdou

*

*/

public class ReadAndWriterThread extends Thread {

/**

* logger

*/

private static Logger logger = Logger.getLogger(ReadAndWriterThread.class);

// 线程池是否开始做任务

private boolean isStarted = false;

// 下载的

private File downFile;

// 下载文件的日期

private String yesterDayDate;

// 线程结果处理

private static ThreadWork threadWork = new DBImportWork();

// 每次必须读取的行数

private int readSums = Integer.parseInt(CommonUtils

.getSysConfigure("readfile.number"));

// 新建queue队列(先进先出)

private Queue<String> queue = new ArrayBlockingQueue<String>(

Integer.parseInt(CommonUtils.getSysConfigure("queue.size")));

/**

* 构造器

*

* @param fileUrlList

*/

public ReadAndWriterThread(List<String> fileUrlList, String yesDate) {

queue.addAll(fileUrlList);

downFile = new File(queue.poll());

yesterDayDate = yesDate;

}

/**

* 预处理一个文件

*/

private void pretreatmentQueue() {

String str = queue.poll();

if (null != str) {

downFile = new File(str);

// 开始处理文件

startProcess();

} else {

interrupt();

logger.info("<<<<<<<<读取完成,主线程终止>>>>>>>>");

}

}

/**

* 返回线程池中的任务是否已经完成

*

* @return

*/

public boolean isComplete() {

if(isStarted==true&&ThreadPool.isEndTask()){

return true;

}

return false;

}

@Override

public void run() {

// 一、读文件

try {

logger.info("<<<<<start readFile="+downFile.getName());

// 构建LineNumberReader对象

LineNumberReader reader = new LineNumberReader(new FileReader(

downFile));

// 读取配置文件

ConfigDataVO configDataVO = readConfigFile(downFile);

// 把读取的数据放在list

List<String> linesList = new ArrayList<String>();

// 读取每行数据

String lineCon = reader.readLine();

// 得到logAction中Id最大的记录

String maxIdLine = null;

// 得到数据库中已经存在的最大记录的下一条记录

if (downFile.getName().trim().toLowerCase().equals("logaction.txt")) {

maxIdLine = readAppointedLineNumber(downFile);

if (maxIdLine.equals("0")||maxIdLine.indexOf("|")>0) {

dealLogic(reader, lineCon, linesList, configDataVO);

}else{

// 当一个文件读取完之后,接着读第二个文件

pretreatmentQueue();

}

} else {

dealLogic(reader, lineCon, linesList, configDataVO);

}

if (null != reader) {

reader.close();

}

logger.info("<<<<<Import data end!>>>>>");

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 读取文件时的处理逻辑

* @param reader

* @param lineCon

* @param linesList

* @param configDataVO

*/

private void dealLogic(LineNumberReader reader, String lineCon,

List<String> linesList, ConfigDataVO configDataVO) {

// 是否在运行的标志

try {

boolean runflag = true;

while (runflag) {

while (lineCon != null

&& (ThreadPool.getCurrentQueueSize() >= ThreadPool.maxBlockingQueue * 0.9 ? false

: true)) {

linesList.add(lineCon);

if (linesList.size() % readSums == 0

&& linesList.size() > 0 && configDataVO != null) {

linesList = prepareTask(configDataVO, linesList);

}

lineCon = reader.readLine();

}

// 如果最后一个list的大小没有满足readSums,则也加入任务管理处理

if (linesList.size() > 0 && configDataVO != null) {

linesList = prepareTask(configDataVO, linesList);

}

// 如果文件读取完毕,则结束运行

if (lineCon == null) {

runflag = false;

// 当一个文件读取完之后,接着读第二个文件

pretreatmentQueue();

}

}

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

/**

* 开始读取并处理文件

*/

private void startProcess() {

// 执行任务

run();

}

/**

* 准备任务

*

* @param configDataVO

* @param linesList

* @return

*/

private List<String> prepareTask(ConfigDataVO configDataVO,

List<String> linesList) {

List<String> addList = new ArrayList<String>();

addList.addAll(linesList);

ThreadPool.addTask(creatTask(configDataVO, addList));

isStarted=true;

return new ArrayList<String>();

}

/**

* 创建任务

*

* @param configDataVO

* 配置文件信息对象

* @param addList

* 要添加的记录数

* @return

*/

private static Runnable creatTask(final ConfigDataVO configDataVO,

final List<String> addList) {

return new Runnable() {

public void run() {

try {

// 导入数据库

threadWork.writer(configDataVO, addList);

} catch (Exception ex) {

logger.error(CommonUtils.getTrace(ex));

}

}

};

}

/**

* 读取指定行数的文件

*

* @param sourceFile

* @param lineNumber

* @return

* @throws IOException

*/

private String readAppointedLineNumber(File sourceFile) throws IOException {

FileReader in = new FileReader(sourceFile);

LineNumberReader reader = new LineNumberReader(in);

int maxLogActionId = FileImportUtils.findMaxLogActionId();

String str = String.valueOf(maxLogActionId);

String lineCon = reader.readLine();

while (null != lineCon) {

String[] contentArr = lineCon.split("\\|");

int logActionId = Integer.parseInt(contentArr[0]);

// 如果在文件里面已经有插入该条数据的话,则从下一行开始读

if (maxLogActionId == logActionId) {

int lineNum = reader.getLineNumber();

logger.info("当前行号为:" + lineNum);

int nextNum = lineNum + 1;

if (nextNum < 0

|| nextNum > CommonUtils.getTotalLines(sourceFile)) {

logger.info("该行号:" + nextNum + "不在文件的行数范围之内!");

break;

} else if (nextNum < CommonUtils.getTotalLines(sourceFile)) {

reader.setLineNumber(nextNum);

lineCon = reader.readLine();

str = lineCon;

break;

}

}

// 读取每行数据

lineCon = reader.readLine();

}

return str;

}

/**

* 读配置文件

*

* @param dfile

* @return

*/

public ConfigDataVO readConfigFile(File dfile) {

// 二、读config文件

// 得到几个表的配置信息

Properties configProp = new Properties();

Properties prop = null;

// 得到文件的名字

String subFileNm = dfile.getName().trim()

.substring(0, dfile.getName().indexOf(".")).toLowerCase();

try {

if (subFileNm.equals("linkvisit")) {

configProp.load(ReadAndWriterThread.class.getClassLoader()

.getResourceAsStream("link_visit_config.properties"));

prop = configProp;

} else if (subFileNm.equals("visit")) {

configProp.load(ReadAndWriterThread.class.getClassLoader()

.getResourceAsStream("visit_config.properties"));

prop = configProp;

} else if (subFileNm.equals("logaction")) {

configProp.load(ReadAndWriterThread.class.getClassLoader()

.getResourceAsStream("log_action_config.properties"));

prop = configProp;

} else if (subFileNm.equals("upopbslinkvisit")) {

configProp.load(ReadAndWriterThread.class.getClassLoader()

.getResourceAsStream(

"back_stage_link_visit_config.properties"));

prop = configProp;

} else if (subFileNm.equals("upopbsvisit")) {

configProp.load(ReadAndWriterThread.class.getClassLoader()

.getResourceAsStream(

"back_stage_visit_config.properties"));

prop = configProp;

}

} catch (IOException e) {

logger.error("属性文件读取错误", e);

}

// 三、读取具体config配置信息开始

if (null != prop) {

// 得到数据库表名

String tableNm = prop.getProperty("batch.insert.table." + subFileNm

+ ".name");

// 得到数据库表的总列数

int mappingNum = Integer.parseInt(prop

.getProperty("batch.insert.table." + subFileNm

+ ".column.no"));

// 得到该表字段数目

String[] tableFields = new String[mappingNum];

// 得到该表字段类型数组

String[] columnTypes = new String[mappingNum];

// 得到该表字段是否为空数组

String[] columnNullable = new String[mappingNum];

// 得到该表字段长度数组

int[] columnLengths = new int[mappingNum];

// 获取数据库字段所对应的列

int[] fieldMatchColunm = new int[mappingNum];

for (int i = 1; i <= mappingNum; i++) {

tableFields[i - 1] = prop.getProperty("batch.insert.table."

+ subFileNm + ".field." + i + ".name");

columnTypes[i - 1] = prop.getProperty("batch.insert.table."

+ subFileNm + ".field." + i + ".type");

columnNullable[i - 1] = prop.getProperty("batch.insert.table."

+ subFileNm + ".field." + i + ".nullable");

columnLengths[i - 1] = Integer.parseInt(prop

.getProperty("batch.insert.table." + subFileNm

+ ".field." + i + ".length"));

fieldMatchColunm[i - 1] = Integer.parseInt(prop

.getProperty("batch.insert.table." + subFileNm

+ ".field." + i + ".order"));

}

// 文件名和表名匹配

String subTblNm = "";

if (tableNm.equals("TBL_UPA_LOG_ACTION")) {

subTblNm = tableNm.replaceAll("_", "").substring(6)

.toLowerCase();

} else {

subTblNm = tableNm.substring(tableNm.indexOf("G") + 1)

.replaceAll("_", "").toLowerCase();

tableNm = tableNm + yesterDayDate;

// 清空数据库表中的数据

FileImportUtils.clearTableData(tableNm);

}

// 读取配置信息结束

// 设置值

ConfigDataVO configDataVO = new ConfigDataVO(subFileNm, subTblNm,

tableNm, mappingNum, tableFields, columnTypes,

columnLengths, columnNullable, fieldMatchColunm);

return configDataVO;

} else {

return null;

}

}

}

数据导入接口

package com.unionpay.upa.importer;

import java.sql.SQLException;

public interface Importer {

/**

* 导入数据

* @param date yyyy-MM-dd

*/

public void run(String date);

/**

* 是否导入完毕

*

* @return

*/

public boolean isComplete();

/**

* 批量导入完成之后删除文件

*/

public void deleteFile();

/**

* 关闭数据库连接池,释放资源

* @throws SQLException

*/

public void shutDownDS() throws SQLException;

}

数据导入实现:

package com.unionpay.upa.importer;

import java.io.File;

import java.sql.SQLException;

import java.text.MessageFormat;

import java.util.List;

import org.apache.log4j.Logger;

/**

* 导入数据的实现接口

*

* @author huangdou

*

*/

public class ImporterImpl implements Importer {

/**

* logger日志

*/

private static Logger logger = Logger.getLogger(ImporterImpl.class);

// 下载文件地址

private static final String REMOTE_GZIPFILEPATH = CommonUtils.getSysConfigure("gzip.remote.url");

// 解压文件的本地路径

private static final String TARGETDIR = CommonUtils.getSysConfigure("ungzip.url");

//主线程

private ReadAndWriterThread thread ;

/**

* 真正运行的代码

*/

public synchronized void run(String date) {

//设置参数

String remoteUrl=MessageFormat.format(REMOTE_GZIPFILEPATH, date);

//得到压缩文件的路径

String zipFilePath=TARGETDIR+File.separator+remoteUrl.substring(remoteUrl.lastIndexOf("/")+1);

// 从远程服务器下载gzip log文件

DownloadGzipFile.downloadFile(remoteUrl, TARGETDIR);

logger.info("<<<<<下载文件end>>>>>");

//解压tar.gz后缀的压缩文件

GZipUtil.unTargzFile(zipFilePath, TARGETDIR);

logger.info("<<<<<解压文件end>>>>>");

// 获取压缩后文件的路径

List<String> urlList = GZipUtil.readfile(TARGETDIR);

// 循环导入各个表的数据

thread = new ReadAndWriterThread(urlList, date);

thread.start();

}

/**

* 判断线程任务是否完成

*/

public boolean isComplete() {

return thread.isComplete();

}

/**

* 批量导入完成之后删除文件

*/

@Override

public void deleteFile() {

// TODO Auto-generated method stub

GZipUtil.deleteFile(TARGETDIR);

}

/**

* 关闭数据库连接池,释放资源

*/

@Override

public void shutDownDS() throws SQLException {

// TODO Auto-generated method stub

DBConnection.shutdownDataSource();

}

}

11.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: