您的位置:首页 > 其它

使用bcp进行sybase 的批量导入导出

2018-03-19 15:27 363 查看
1,使用Runtime runtime = Runtime.getRuntime()
    Process process = runtime.exec();
     执行export LANG=C;export SYBASE=/tmp/syabse/;export PATH=$PATH:$SYBASE;bcp
要按照以下方式执行process = runTime.exec(new String[]{"/bin/bash", "-c", command});以上的还不知道原理

2 使用 shell 生成 fifo 文件  mkfifo  + fileName
    (1) 为了防止java中获得输入流阻塞 到时程序不执行,
        先要生成输出流或者执行输出流的命令,即生成fifo文件后 先用一个线程执行bcp in
  (2) 然后使用另一线程 执行bcp out

具体实现如下package org.pentaho.di.trans.steps.sybaseimp;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;

import org.pentaho.di.core.encryption.Encr;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.vfs.KettleVFS;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.apache.commons.vfs2.FileObject;

public class SybaseImp extends BaseStep implements StepInterface {
private static Class<?> PKG = SybaseImp.class;
private SybaseImpMeta meta;
protected SybaseImpData data;

// -------------------------- construction
// ------------------------------------------------------
public SybaseImp(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}

// -------------------------- unload one table data to file
// ----------------------------------------------------------
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {

meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
//是否等待 此参数控制导入插件是否可直接使用
if ("Y".equalsIgnoreCase( meta.getToWait()) ) {
//if 动态加载 此参数控制 导出插件生成数据文件则直接执行导入
if("Y".equalsIgnoreCase( meta.getIsFIFO()) ){
File dataFile = new File(meta.getFileName());
while (true) {
//若停止则 删除数据文件 返回
if(isStopped()){
if(dataFile.exists()){
dataFile.delete();
}
return false;
}
//判断导出插件是否生成数据文件 生成数据文件跳出循环 执行bcp
if(dataFile.exists()){
break;
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
//else 非动态加载 要等待 导出插件生成 标志文件证明导出结束 再执行 bcp
}else{
String flagFile = meta.getFileName().substring(0, meta.getFileName().lastIndexOf(".")) + ".flag";
File f = new File(flagFile);
while (true) {
if(isStopped()){
File dataFile = new File(meta.getFileName());
if(dataFile.exists()){
dataFile.delete();
}
if (f.exists()) {
f.delete();
}
return false;
}
if (f.exists()) {
f.delete();
break;
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 此eles 就证明导入插件单独使用 不做任何动作
}else{

// do nothing
}

// all commands
String[] commands = generateCommandFromMeta(meta);

// execute all commands

// any exception is failed
for (int i = 0; i < commands.length; i++) {
int exitValue = executeCommand(commands[i]);
if (exitValue != 0) {
stopAll();
setErrors(1);
setOutputDone();
throw new KettleException("Error while run SybaseImp : " + commands[i]);
}
}
setOutputDone();
return false;
}

private int executeCommand(String command) throws KettleException {
logBasic("execute command :" +command);
Runtime runTime = Runtime.getRuntime();

Process process = null;
int exitValue = -1;
try {
process = runTime.exec(new String[]{"/bin/bash", "-c", command});
StreamLogger streamLogger = new StreamLogger(process.getInputStream(), "OUTPUT");
StreamLogger errorStreamLogger = new StreamLogger(process.getErrorStream(), "ERROR");
errorStreamLogger.start();
streamLogger.start();
exitValue = process.waitFor();
long cur = System.currentTimeMillis();
streamLogger.join();
logBasic(">>>>>wait for :"+(System.currentTimeMillis()-cur));
String rows = streamLogger.rows;
if(null != rows ){
//super.setLinesInput(Long.parseLong(rows));
super.setLinesOutput(Long.parseLong(rows));
//super.setLinesRead(Long.parseLong(rows));
super.setLinesWritten(Long.parseLong(rows));
deleteInterfacesFile(meta);
}
} catch (Exception e) {
e.printStackTrace();
throw new KettleException(
BaseMessages.getString(PKG, "SybaseImp.Execption.execute", command));
}
logBasic("execute command :" +(exitValue==0?"succeed":"failed"));
return exitValue;
}

private void deleteInterfacesFile(SybaseImpMeta meta) {
String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_imp";
String interfaces = meta.getSybasePath()+filename;
File f = new File(interfaces);
if(f.exists()){
f.delete();
}

}

private String[] generateCommandFromMeta(SybaseImpMeta meta) {
String[] commandLines = new String[1];
String exportLang = SybaseImpMeta.EXPORT_LANG;
String exportSybase = "export SYBASE=" + meta.getSybasePath().trim();
String exportPath = "export PATH=$PATH:" + meta.getBcpPath().trim();
String bcpCommand = generateBcpFromMeta(meta);
String exportAndBcpCommand = exportLang + ";" + exportSybase + ";" + exportPath + ";" + bcpCommand;
commandLines[0] = exportAndBcpCommand;
return commandLines;

}

private String generateBcpFromMeta(SybaseImpMeta meta) {
//bcp64 zxs.dbo.zxs in "zxs.txt" -c -t "|" -b 10000 -Y -U sa -P 123456 -S KFMOXIN01 -J cp936
StringBuilder bcpCommand = new StringBuilder("bcp64 ");
bcpCommand.append(meta.getDatabaseName());
bcpCommand.append(".");
bcpCommand.append(meta.getSchemaName());
bcpCommand.append(".");
bcpCommand.append(meta.getTableName());
bcpCommand.append(" in \"");
bcpCommand.append(meta.getFileName());
bcpCommand.append("\" -c -t \"");
bcpCommand.append(meta.getDelimiter());
bcpCommand.append("\" -b ");
bcpCommand.append(meta.getBatchSize());
bcpCommand.
b8a2
append(SybaseImpMeta.KEEP_SAME);
bcpCommand.append(" -U ");
bcpCommand.append(meta.getUserName());
bcpCommand.append(" -P ");
bcpCommand.append(Encr.decryptPassword(meta.getPassword()));
bcpCommand.append(" -S ");
bcpCommand.append(meta.getServerName());
bcpCommand.append(" -J ");
bcpCommand.append(meta.getCharacterSet());
bcpCommand.append(" -m ");
bcpCommand.append(meta.getMaxerrors());
bcpCommand.append(" -I ");
bcpCommand.append(mkInterfaceFile(meta));
return bcpCommand.toString();
}

private String mkInterfaceFile(SybaseImpMeta meta){
/*
* KFMOXIN01
master tcp ether 192.168.12.94 5000
query tcp ether 192.168.12.94 5000
*/
StringBuilder context = new StringBuilder();
context.append(meta.getServerName());
context.append("\n\t");
context.append(meta.getDatabaseName());
context.append(" tcp ether ");
context.append(meta.getIp());
context.append(" ");
context.append(meta.getPort());
context.append("\n\tquery tcp ether ");
context.append(meta.getIp());
context.append(" ");
context.append(meta.getPort());
context.append("\n");
String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_imp";
String interfaces = meta.getSybasePath()+filename;
File f = new File(interfaces);
if(f.exists()){
f.delete();
}
try {
f.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
try {
OutputStream os = new FileOutputStream(f);
byte [] b = context.toString().getBytes();
os.write(b);
os.flush();
os.close();
} catch (Exception e) {
e.printStackTrace();
}
return interfaces;
}

private final class StreamLogger extends Thread {
private InputStream input;
private String type;
String rows = null;
StreamLogger( InputStream is, String type ) {
this.input = is;
this.type = type + ">";
}

public void run() {
try {
final BufferedReader br = new BufferedReader(new InputStreamReader(input));
String line;
while ((line = br.readLine()) != null) {
//48 rows copied.
if (line.contains("rows copied")) {
rows =line.split("rows copied")[0].trim();
System.out.println(">>>>>>>>>>>>>>rows is :" + rows + " loaded <<<<<<<<<<<");
}
if (log.isBasic()) {
logBasic(type + line);
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
}

}

}

public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
if ( super.init( smi, sdi ) ) {
return true;
}
return false;
}

public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
super.dispose( smi, sdi );
if ( "Y".equalsIgnoreCase(meta.getEraseFiles()) ) {
FileObject fileObject = null;
if ( meta.getFileName() != null ) {
try {
fileObject = KettleVFS.getFileObject( environmentSubstitute( meta.getFileName() ), getTransMeta() );
fileObject.delete();
fileObject.close();
} catch ( Exception ex ) {
logError( "Error deleting data file \'" + KettleVFS.getFilename( fileObject ) + "\': " + ex.getMessage() );
}
}
}

}
}

package org.pentaho.di.trans.steps.sybaseexp;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;

import org.pentaho.di.core.encryption.Encr;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;

public class SybaseExp extends BaseStep implements StepInterface {
private static Class<?> PKG = SybaseExp.class;
private SybaseExpMeta meta;
protected SybaseExpData data;

// -------------------------- construction
// ------------------------------------------------------
public SybaseExp(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}

// -------------------------- unload one table data to file
// ----------------------------------------------------------
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (SybaseExpMeta) smi;
data = (SybaseExpData) sdi;

// delete history data file
deleteHistoryDataFile(meta);

// all commands
String[] commands = generateCommandFromMeta(meta);
if(isStopped()){
return false;
}

// execute all commands
// any exception is failed
if("Y".equalsIgnoreCase(meta.getIsFIFO())){

createFifoFile(meta);
}
for(int i = 0;i<commands.length;i++){
int exitValue = executeCommand(commands[i]);
if(exitValue != 0){
stopAll();
setErrors(1);
setOutputDone();
throw new KettleException("Error while run SybaseExp : "+commands[i]);
}
}
// create flag file
if("N".equalsIgnoreCase(meta.getIsFIFO())){
createFlagFile(meta);
}
setOutputDone();
return false;
}

private void deleteHistoryDataFile(SybaseExpMeta meta) {
String dataFile = meta.getFileName();
logBasic("if file :" +dataFile+ "  exist delete ");
File f = new File(dataFile);
if(f.exists()){
f.delete();
}
}

private void createFlagFile(SybaseExpMeta meta) throws KettleException {
String flagFile = meta.getFileName().substring(0, meta.getFileName().lastIndexOf(".")+1)+"flag";
logBasic("create file :" +flagFile);
File f = new File(flagFile);
if(f.exists()){
f.delete();
}
try {
f.createNewFile();
} catch (IOException e) {
e.printStackTrace();
throw new KettleException("Error while create flag file : ");
}
}

private int executeCommand(String command) throws KettleException {
logBasic("execute command  :" +command);
Runtime runTime = Runtime.getRuntime();

Process process = null;
int exitValue = -1;
try {

process = runTime.exec(new String[]{"/bin/bash", "-c", command});
StreamLogger streamLogger = new StreamLogger(process.getInputStream(), "OUTPUT");
StreamLogger errorStreamLogger = new StreamLogger(process.getErrorStream(), "ERROR");
errorStreamLogger.start();
streamLogger.start();
exitValue = process.waitFor();
long cur = System.currentTimeMillis();
streamLogger.join();
logBasic(">>>>>wait for :"+(System.currentTimeMillis()-cur));
String rows = streamLogger.rows;
if(null != rows ){
super.setLinesInput(Long.parseLong(rows));
//super.setLinesOutput(Long.parseLong(rows));
super.setLinesRead(Long.parseLong(rows));
//super.setLinesWritten(Long.parseLong(rows));
deleteInterfacesFile(meta);
}
} catch (Exception e) {
e.printStackTrace();
throw new KettleException(
BaseMessages.getString(PKG, "SybaseExp.Execption.execute", command));
}
logBasic("execute command  :" +(exitValue==0?"succeed":"failed"));
return exitValue;
}

private void deleteInterfacesFile(SybaseExpMeta meta) {
String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_exp";
String interfaces = meta.getSybasePath()+filename;
File f = new File(interfaces);
if(f.exists()){
f.delete();
}

}

private String[] generateCommandFromMeta(SybaseExpMeta meta) {
String[] commandLines = new String[1];
String exportLang = SybaseExpMeta.EXPORT_LANG;
String exportSybase = "export SYBASE=" + meta.getSybasePath().trim();
String exportPath = "export PATH=$PATH:" + meta.getBcpPath().trim();
String bcpCommand = generateBcpFromMeta(meta);
String exportAndBcpCommand = exportLang + ";" + exportSybase + ";"  + exportPath +";" + bcpCommand;
commandLines[0] = exportAndBcpCommand;
return commandLines;

}
private void createFifoFile(SybaseExpMeta meta){
String createFifoConnamd = "mkfifo " + meta.getFileName();
try {
executeCommand(createFifoConnamd);
} catch (KettleException e) {
e.printStackTrace();
}
}
private String generateBcpFromMeta(SybaseExpMeta meta) {
// create first in first out  file
//createFifoFile(meta);
//bcp64 zxs.dbo.zxs out "zxs.txt" -c -t"|" -T32000 -Usa -P123456 -S KFMOXIN01 -Jcp936
StringBuilder bcpCommand = new StringBuilder("bcp64 ");
bcpCommand.append(meta.getDatabaseName());
bcpCommand.append(".");
bcpCommand.append(meta.getSchemaName());
bcpCommand.append(".");
bcpCommand.append(meta.getTableName());
bcpCommand.append(" out \"");
bcpCommand.append(meta.getFileName());
bcpCommand.append("\" -c -t \"");
bcpCommand.append(meta.getDelimiter());
bcpCommand.append("\" -U ");
bcpCommand.append(meta.getUserName());
bcpCommand.append(" -P ");
bcpCommand.append(Encr.decryptPassword(meta.getPassword()));
bcpCommand.append(" -S ");
bcpCommand.append(meta.getServerName());
bcpCommand.append(" -J ");
bcpCommand.append(meta.getCharacterSet());
bcpCommand.append(" -I ");
bcpCommand.append(mkInterfaceFile(meta));
return bcpCommand.toString();
}

private String mkInterfaceFile(SybaseExpMeta meta){
/*
* KFMOXIN01
master tcp ether 192.168.12.94 5000
query tcp ether 192.168.12.94 5000
*/
StringBuilder context = new StringBuilder();
context.append(meta.getServerName());
context.append("\n\t");
context.append(meta.getDatabaseName());
context.append(" tcp ether ");
context.append(meta.getIp());
context.append(" ");
context.append(meta.getPort());
context.append("\n\tquery tcp ether ");
context.append(meta.getIp());
context.append(" ");
context.append(meta.getPort());
context.append("\n");
String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_exp";
String interfaces = meta.getSybasePath()+filename;
File f = new File(interfaces);
if(f.exists()){
f.delete();
}
try {
f.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
try {
OutputStream os = new FileOutputStream(f);
byte [] b = context.toString().getBytes();
os.write(b);
os.flush();
os.close();
} catch (Exception e) {
e.printStackTrace();
}
return interfaces;
}

private final class StreamLogger extends Thread {
private InputStream input;
private String type;
String rows = null;
StreamLogger( InputStream is, String type ) {
this.input = is;
this.type = type + ">";
}

public void run() {
try {
final BufferedReader br = new BufferedReader(new InputStreamReader(input));
String line;
int count=0;
while ((line = br.readLine()) != null) {
count++;
if (line.contains("rows copied")) {
rows =line.split("rows copied")[0].trim();
System.out.println(">>>>>>>>>>>>>>rows is :" + rows  + " unloaded <<<<<<<<<<<");
}
if (log.isBasic()&&count%10==0) {
logBasic(type + line);
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
}

}

}

public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (SybaseExpMeta) smi;
data = (SybaseExpData) sdi;
if ( super.init( smi, sdi ) ) {
return true;
}
return false;
}

public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (SybaseExpMeta) smi;
data = (SybaseExpData) sdi;
super.dispose( smi, sdi );
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: