您的位置:首页 > 其它

大XML文件解析入库的一个方法

2011-04-02 16:35 495 查看
本来想用ORACLE的外部表导入XML文件的数据,但是太麻烦,而且XMLTYPE对于大数据量似乎性能很差。GOOLE上找到一个上有一个分段XML入库的列子http://forums.oracle.com/forums/thread.jspa?threadID=461009&tstart=0

即把XML文件,分解成N个DOM树写入数据库,作都思路很好,(就是不太爱写注释)。我在他的基础上写了一个解析SAX文件,把数据入库(不是DOM树)的方法,500M的数据在我的PC上10多分钟。

整个思路是这样:一个线程负责解析XML文件,当然采用SAX方式,DOM方式会让内存吃不消。另外一个线程池负责向数据库写入。解析线程把解析出来的数据暂存在workQueue里,写入线程不断读取,并写入数据库。当workQueue队列里的数据超过一定阀值,则解析线程等待,这是因为如果都解析出来,封装成对象也很大,可能导致内存不足。基本上类似于生产者与消费者模式。

刚写完,测试了几例,还算稳定,欢迎大家补充完善。

OK,先贴代码。

SourceProcessor.java:

/*XML文件解析类 我用的是ORACLE的解析包xmlparserv2.jar

* SAX在解析的过程中,在一个元素结尾与另一个元素开始处,解析器会把他当成一个文本结点。

* characters方法会多出很多空格,最后用一个笨的方法解决了他,还请批评指正。

*/

public class SourceProcessor extends Thread implements ContentHandler {

private List<List<String>> cacheList = new ArrayList<List<String>>();

private SaxProcessor saxProcessor;

private String targetFilename = null;

private boolean recordStart = false;

private boolean useable = false;

private List<String> curDatas = new ArrayList<String>();

private StringBuffer curData = new StringBuffer();

public boolean isRecordStart() {

return recordStart;

}

public void setRecordStart(boolean recordStart) {

this.recordStart = recordStart;

}

public SourceProcessor(String threadName) {

super(threadName);

}

public String getTargetFilename() {

return targetFilename;

}

public void setTargetFilename(String targetFilename) {

this.targetFilename = targetFilename;

}

public void characters(char[] ch, int start, int length)

throws SAXException {

if (this.useable == true) {

curData.append(new String(ch, start, length));

}

}

public void endDocument() throws SAXException {

if (cacheList.size() > 0) {

this.saxProcessor.addToQueue(cacheList);

}

this.saxProcessor.setParsingComplete();

System.out.println("over");

}

public void endElement(String uri, String localName, String name)

throws SAXException {

/* 一条记录完成 */

if (localName.equals(this.saxProcessor.getSetting("Element", "Record"))) {

cacheList.add(curDatas);

this.curDatas = new ArrayList<String>();

if (cacheList.size() >= 100) {

this.saxProcessor.addToQueue(cacheList);

cacheList = new ArrayList<List<String>>();

}

} else if (this.recordStart == true) {

curDatas.add(curData.toString().trim());

curData = new StringBuffer();

this.useable = false;

}

}

public void endPrefixMapping(String prefix) throws SAXException {

}

public void ignorableWhitespace(char[] ch, int start, int length)

throws SAXException {

}

public void processingInstruction(String target, String data)

throws SAXException {

}

public void setDocumentLocator(Locator locator) {

}

public void skippedEntity(String name) throws SAXException {

// TODO Auto-generated method stub

}

public void startDocument() throws SAXException {

}

public void startElement(String uri, String localName, String name,

Attributes atts) throws SAXException {

if (localName.equals(this.saxProcessor.getSetting("Element", "Record"))) {

this.recordStart = true;

} else if (recordStart == true) {

/* 可以收集 */

this.useable = true;

}

}

public void startPrefixMapping(String prefix, String uri)

throws SAXException {

}

public SaxProcessor getSaxProcessor() {

return saxProcessor;

}

public void setSaxProcessor(SaxProcessor saxProcessor) {

this.saxProcessor = saxProcessor;

}

public void run() {

try {

SAXParser parser = new SAXParser();

parser.setAttribute(SAXParser.STANDALONE, Boolean.valueOf(true));

parser.setValidationMode(SAXParser.NONVALIDATING);

parser.setContentHandler(this);

this.saxProcessor.setParserActive();

parser.parse(new FileInputStream(this.targetFilename));

} catch (ProcessingCompleteException pce) {

pce.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

public boolean isUseable() {

return useable;

}

public void setUseable(boolean useable) {

this.useable = useable;

}

}

DatabaseWriter.java:

/*数据库写入线程类,记录数达到commitCharge条则提交,最后提交剩余记录*/

public class DatabaseWriter extends Thread {

private Connection connection;

private SaxProcessor processor;

private String threadName;

private int commitCharge;

private int recordCount = 0;

public Connection getConnection() {

return connection;

}

public void setConnection(Connection connection) {

this.connection = connection;

}

public SaxProcessor getProcessor() {

return processor;

}

public void setProcessor(SaxProcessor processor) {

this.processor = processor;

}

DatabaseWriter(SaxProcessor processor, String threadName,

Connection connection) {

this.connection = connection;

this.processor = processor;

this.threadName = threadName;

}

public void setParameters(int commitCharge) {

this.commitCharge = commitCharge;

}

public void run() {

PreparedStatement stat = null;

try {

connection.setAutoCommit(false);

stat = connection

.prepareStatement("insert /*+ append*/ into testMT nologging values(?,?,?,?)");

} catch (SQLException e1) {

e1.printStackTrace();

}

while (!this.processor.processingComplete()) {

List datas = this.processor.getNextData(this.threadName);

System.out.println(this.threadName + " run!!");

if (datas != null && stat != null) {

for (int i = 0; i < datas.size(); i++) {

try {

List record = (List) datas.get(i);

stat.setString(1, (String) record.get(0));

stat.setString(2, (String) record.get(1));

stat.setString(3, (String) record.get(2));

stat.setString(4, (String) record.get(3));

stat.execute();

this.recordCount++;

} catch (SQLException e) {

e.printStackTrace();

}

}

}

if (recordCount >= commitCharge) {

try {

connection.commit();
this.recordCount = 0;

} catch (SQLException e) {

e.printStackTrace();

}

}

}

try {

if (!connection.isClosed()) {

connection.commit();

}

} catch (SQLException e) {

e.printStackTrace();

}

}

}

SaxProcessor.java

/*

* 解析控制程序

*/

public class SaxProcessor extends ConnectionProvider {

/*工作队列,暂存解析的数据*/

private Vector workQueue = new Vector();

/*线程池,源作者用Hashtable实现的线程池,我觉得JDK1.5已经有现成的了,何必再造轮子呢*/

BlockingDeque<Runnable> queue = new LinkedBlockingDeque<Runnable>();

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1,

TimeUnit.MINUTES, queue);

private int threadCount;

Thread saxReader;

private boolean parserActive = false;

private void setWriterCount(int count) {

this.threadCount = count;

}

private int getWriterCount() {

return this.threadCount;

}

protected synchronized void setParserActive() {

this.parserActive = true;

}

protected synchronized void setParsingComplete() {

this.parserActive = false;

notifyAll();

}

public synchronized boolean parsingComplete() {

return !this.parserActive;

}

public synchronized boolean processingComplete() {

boolean result = (parsingComplete()) && (this.workQueue.size() == 0);

return result;

}

private boolean listQueueFull() {

return (this.workQueue.size() >= (2 * getWriterCount()));

}

/* 向工作队列添加一个任务,并通知所有受阻线程 */

protected synchronized void addToQueue(List data) throws SAXException {

if (listQueueFull()) {

try {

wait();

} catch (InterruptedException ie) {

ie.printStackTrace();

}

}

this.workQueue.addElement(data);

notifyAll();

}

public synchronized List getNextData(String thread) {

List data = null;

while (!parsingComplete() && (this.workQueue.size() == 0)) {

try {

wait();

} catch (InterruptedException ioe) {

}

}

if (this.workQueue.size() > 0) {

data = (List) this.workQueue.remove(0);

notifyAll();

}

return data;

}

public SaxProcessor() throws SQLException, IOException, SAXException {

super();

}

public void doSomething(String[] args) {

try {

setWriterCount(Integer.parseInt(getSetting("ThreadCount","4")));

this.saxReader = createSourceProcessor();

this.setParserActive();

this.saxReader.start();

createDatabaseWriters();

waitForCompletion();

} catch (Exception e) {

e.printStackTrace();

this.setParsingComplete();

}

}

private synchronized void waitForCompletion() {

while (!parsingComplete()) {

try {

wait();

} catch (InterruptedException ioe) {

}

}

this.executor.shutdown();

}

private void createDatabaseWriters() throws SQLException {

DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance();

df.applyPattern("000000");

int commitCharge = Integer.parseInt(getSetting("CommitCharge", "50"));

for (int i = 0; i < getWriterCount(); i++) {

System.out.println(getWriterCount());

String threadName = "Writer_" + df.format(i + 1);

Connection conn = getNewConnection();

conn.setAutoCommit(false);

DatabaseWriter writer = new DatabaseWriter(this, threadName, conn);

writer.setParameters(commitCharge);

this.executor.execute(writer);

}

}

private Thread createSourceProcessor() throws SQLException {

String threadName = "SaxReader";

SourceProcessor saxReader = new SourceProcessor(threadName);

saxReader.setSaxProcessor(this);

saxReader.setTargetFilename(getSetting("SourceXML","DIR"));

return saxReader;

}

protected synchronized void printXML(Document xml, PrintWriter pw)

throws IOException {

((XMLDocument) xml).print(pw);

}

/*主函数*/

public static void main(String[] args) {

try {

SaxProcessor app = new SaxProcessor();

app.initializeConnection();

app.doSomething(args);

} catch (Exception e) {

e.printStackTrace();

}

}

public Vector getWorkQueue() {

return workQueue;

}

public void setWorkQueue(Vector workQueue) {

this.workQueue = workQueue;

}

}

ConnectionProvider.java

/*配置文件解析类,很简单,不说了*/

public class ConnectionProvider extends Object {

public static final boolean DEBUG = true;

protected OracleConnection connection;

protected XMLDocument connectionDefinition;

public static final String CONNECTION = "Connection";

public static final String DRIVER = "Driver";

public static final String HOSTNAME = "Hostname";

public static final String PORT = "Port";

public static final String SID = "SID";

public static final String SERVICENAME = "ServiceName";

public static final String SERVERMODE = "Server";

public static final String SCHEMA = "Schema";

public static final String PASSWORD = "Password";

public static final String POOL = "Pool";

public static final String THIN_DRIVER = "thin";

// public static final String OCI_DRIVER = "oci8";

public static final String DEFAULT_CONNECTION_DEFINITION = "c:\\temp\\connection.xml";

public static final String DEFAULT_DRIVER = THIN_DRIVER;

public static final String DEFAULT_HOSTNAME = "localhost";

public static final String DEFAULT_PORT = "1521";

public static final String DEFAULT_SERVERMODE = "DEDICATED";

public static final String TARGET_DIRECTORY = "targetDirectory";

protected PrintStream log;

public ConnectionProvider() {

}

public void initializeConnection() throws SAXException, IOException,

SQLException {

this.initializeConnection(System.out);

}

public void initializeConnection(PrintStream log) throws SAXException,

IOException, SQLException {

DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver());

this.log = log;

loadConnectionSettings();

this.connection = openConnection();

}

public ConnectionProvider getConnectionProvider() {

return this;

}

public void initalizeConnection(String connectionLocation, PrintStream log)

throws SAXException, IOException, SQLException {

DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver());

this.log = log;

loadConnectionSettings(connectionLocation);

this.connection = openConnection();

}

public void setLogger(PrintStream log) {

this.log = log;

}

private void setConnectionSettings(XMLDocument doc) {

this.connectionDefinition = doc;

}

private void dumpConnectionSettings() throws IOException {

StringWriter sw = new StringWriter();

PrintWriter pw = new PrintWriter(sw);

this.connectionDefinition.print(pw);

pw.close();

sw.close();

}

public OracleConnection getConnection() throws SQLException {

return this.connection;

}

public void closeConnection(Connection conn) throws Exception {

if (isPooled()) {

conn.close();

}

}

public Connection getConnection(String schema, String passwd)

throws Exception {

if (isPooled()) {

return (OracleOCIConnection) this.getConnection(schema, passwd);

} else {

return this.connection;

}

}

public String getSetting(String nodeName) {

return getSetting(nodeName, null);

}

public String getSetting(String nodeName, String defaultValue) {

XMLElement root = (XMLElement) this.connectionDefinition

.getDocumentElement();

NodeList children = root.getChildrenByTagName(nodeName);

if (children.getLength() != 0) {

Element element = (Element) children.item(0);

Text text = (Text) element.getFirstChild();

if (text != null) {

return text.getData();

}

}

return defaultValue;

}

protected String getDriver() {

return getSetting(DRIVER, DEFAULT_DRIVER);

}

protected String getHostname() {

return getSetting(HOSTNAME, DEFAULT_HOSTNAME);

}

protected String getPort() {

return getSetting(PORT, DEFAULT_PORT);

}

protected String getServerMode() {

return getSetting(SERVERMODE, DEFAULT_SERVERMODE);

}

protected String getServiceName() {

return getSetting(SERVICENAME);

}

protected String getSID() {

return getSetting(SID);

}

protected boolean isPooled() {

String usePool = getSetting(POOL, Boolean.FALSE.toString());

return !usePool.equalsIgnoreCase(Boolean.FALSE.toString());

}

protected String getSchema() {

return getSetting(SCHEMA);

}

protected String getPassword() {

return getSetting(PASSWORD);

}

public void loadConnectionSettings() throws IOException, SAXException {

String filename = System.getProperty(

"com.oracle.st.xmldb.pm.ConnectionParameters",

this.DEFAULT_CONNECTION_DEFINITION);

loadConnectionSettings(filename);

}

public void loadConnectionSettings(String filename) throws IOException,

SAXException {

if (DEBUG) {

System.out

.println("Using connection Parameters from : " + filename);

}

Reader reader = new FileReader(new File(filename));

DOMParser parser = new DOMParser();

parser.parse(reader);

XMLDocument doc = parser.getDocument();

setConnectionSettings(doc);

if (DEBUG) {

dumpConnectionSettings();

}

}

protected String getDatabaseURL() {

if (getDriver() != null) {

if (getDriver().equalsIgnoreCase(THIN_DRIVER)) {

return "jdbc:oracle:thin:@" + getHostname() + ":" + getPort()

+ ":" + getSID();

} else {

return "jdbc:oracle:oci8:@(description=(address=(host="

+ getHostname() + ")(protocol=tcp)(port=" + getPort()

+ "))(connect_data=(service_name=" + getServiceName()

+ ")(server=" + getServerMode() + ")))";

}

} else {

return null;

}

}

private OracleConnection openConnection() throws SQLException {

String user = getSchema();

String password = getPassword();

String connectionString = user + "/" + password + "@"

+ getDatabaseURL();

OracleConnection conn = null;

if (DEBUG) {

this.log

.println("ConnectionProvider.establishConnection(): Connecting as "

+ connectionString);

}

try {

conn = (OracleConnection) DriverManager.getConnection(

getDatabaseURL(), user, password);

if (DEBUG) {

this.log

.println("ConnectionProvider.establishConnection(): Database Connection Established");

}

} catch (SQLException sqle) {

int err = sqle.getErrorCode();

this.log

.println("ConnectionProvider.establishConnection(): Failed to connect using "

+ connectionString);

sqle.printStackTrace(this.log);

throw sqle;

}

return conn;

}

public OracleConnection getNewConnection() throws SQLException {

return openConnection();

}

public XMLDocument getConnectionSettings() {

return this.connectionDefinition;

}

}

ProcessingCompleteException.java

public class ProcessingCompleteException extends SAXException {

public ProcessingCompleteException() {

super("Processing Complete");

}

}

配置文件例子(connection.xml):

<?xml version="1.0" encoding="UTF-8"?>

<Connection>

<Driver>Thin</Driver>

<Hostname>10.1.199.250</Hostname>

<Port>1521</Port>

<ServiceName>orcl.xp.mark.drake.oracle.com</ServiceName>

<SID>idm</SID>

<ServerMode>DEDICATED</ServerMode>

<Schema>dcaudit</Schema>

<Password>dcaudit</Password>

<SourceXML>C:\temp\AuditDemo2.xml</SourceXML>

<Element>Record</Element>

<Table>RDF_DOCUMENT_TABLE</Table>

<ErrorTable>RDF_ERROR_TABLE</ErrorTable>

<schemaInstancePrefix>xsi</schemaInstancePrefix>

<schemaLocation/>

<noNamespaceSchemaLocation/>

<CommitCharge>10</CommitCharge>

<ThreadCount>2</ThreadCount>

</Connection>

下班了,不多说了,88 。

本文出自 “天下无贼” 博客,请务必保留此出处/article/4233347.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: