使用bcp进行sybase 的批量导入导出
2018-03-19 15:27
363 查看
1,使用Runtime runtime = Runtime.getRuntime()
Process process = runtime.exec();
执行export LANG=C;export SYBASE=/tmp/syabse/;export PATH=$PATH:$SYBASE;bcp
要按照以下方式执行process = runTime.exec(new String[]{"/bin/bash", "-c", command});以上的还不知道原理
2 使用 shell 生成 fifo 文件 mkfifo + fileName
(1) 为了防止java中获得输入流阻塞 到时程序不执行,
先要生成输出流或者执行输出流的命令,即生成fifo文件后 先用一个线程执行bcp in
(2) 然后使用另一线程 执行bcp out
具体实现如下package org.pentaho.di.trans.steps.sybaseimp;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import org.pentaho.di.core.encryption.Encr;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.vfs.KettleVFS;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.apache.commons.vfs2.FileObject;
public class SybaseImp extends BaseStep implements StepInterface {
private static Class<?> PKG = SybaseImp.class;
private SybaseImpMeta meta;
protected SybaseImpData data;
// -------------------------- construction
// ------------------------------------------------------
public SybaseImp(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}
// -------------------------- unload one table data to file
// ----------------------------------------------------------
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
//是否等待 此参数控制导入插件是否可直接使用
if ("Y".equalsIgnoreCase( meta.getToWait()) ) {
//if 动态加载 此参数控制 导出插件生成数据文件则直接执行导入
if("Y".equalsIgnoreCase( meta.getIsFIFO()) ){
File dataFile = new File(meta.getFileName());
while (true) {
//若停止则 删除数据文件 返回
if(isStopped()){
if(dataFile.exists()){
dataFile.delete();
}
return false;
}
//判断导出插件是否生成数据文件 生成数据文件跳出循环 执行bcp
if(dataFile.exists()){
break;
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//else 非动态加载 要等待 导出插件生成 标志文件证明导出结束 再执行 bcp
}else{
String flagFile = meta.getFileName().substring(0, meta.getFileName().lastIndexOf(".")) + ".flag";
File f = new File(flagFile);
while (true) {
if(isStopped()){
File dataFile = new File(meta.getFileName());
if(dataFile.exists()){
dataFile.delete();
}
if (f.exists()) {
f.delete();
}
return false;
}
if (f.exists()) {
f.delete();
break;
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 此eles 就证明导入插件单独使用 不做任何动作
}else{
// do nothing
}
// all commands
String[] commands = generateCommandFromMeta(meta);
// execute all commands
// any exception is failed
for (int i = 0; i < commands.length; i++) {
int exitValue = executeCommand(commands[i]);
if (exitValue != 0) {
stopAll();
setErrors(1);
setOutputDone();
throw new KettleException("Error while run SybaseImp : " + commands[i]);
}
}
setOutputDone();
return false;
}
private int executeCommand(String command) throws KettleException {
logBasic("execute command :" +command);
Runtime runTime = Runtime.getRuntime();
Process process = null;
int exitValue = -1;
try {
process = runTime.exec(new String[]{"/bin/bash", "-c", command});
StreamLogger streamLogger = new StreamLogger(process.getInputStream(), "OUTPUT");
StreamLogger errorStreamLogger = new StreamLogger(process.getErrorStream(), "ERROR");
errorStreamLogger.start();
streamLogger.start();
exitValue = process.waitFor();
long cur = System.currentTimeMillis();
streamLogger.join();
logBasic(">>>>>wait for :"+(System.currentTimeMillis()-cur));
String rows = streamLogger.rows;
if(null != rows ){
//super.setLinesInput(Long.parseLong(rows));
super.setLinesOutput(Long.parseLong(rows));
//super.setLinesRead(Long.parseLong(rows));
super.setLinesWritten(Long.parseLong(rows));
deleteInterfacesFile(meta);
}
} catch (Exception e) {
e.printStackTrace();
throw new KettleException(
BaseMessages.getString(PKG, "SybaseImp.Execption.execute", command));
}
logBasic("execute command :" +(exitValue==0?"succeed":"failed"));
return exitValue;
}
private void deleteInterfacesFile(SybaseImpMeta meta) {
String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_imp";
String interfaces = meta.getSybasePath()+filename;
File f = new File(interfaces);
if(f.exists()){
f.delete();
}
}
private String[] generateCommandFromMeta(SybaseImpMeta meta) {
String[] commandLines = new String[1];
String exportLang = SybaseImpMeta.EXPORT_LANG;
String exportSybase = "export SYBASE=" + meta.getSybasePath().trim();
String exportPath = "export PATH=$PATH:" + meta.getBcpPath().trim();
String bcpCommand = generateBcpFromMeta(meta);
String exportAndBcpCommand = exportLang + ";" + exportSybase + ";" + exportPath + ";" + bcpCommand;
commandLines[0] = exportAndBcpCommand;
return commandLines;
}
private String generateBcpFromMeta(SybaseImpMeta meta) {
//bcp64 zxs.dbo.zxs in "zxs.txt" -c -t "|" -b 10000 -Y -U sa -P 123456 -S KFMOXIN01 -J cp936
StringBuilder bcpCommand = new StringBuilder("bcp64 ");
bcpCommand.append(meta.getDatabaseName());
bcpCommand.append(".");
bcpCommand.append(meta.getSchemaName());
bcpCommand.append(".");
bcpCommand.append(meta.getTableName());
bcpCommand.append(" in \"");
bcpCommand.append(meta.getFileName());
bcpCommand.append("\" -c -t \"");
bcpCommand.append(meta.getDelimiter());
bcpCommand.append("\" -b ");
bcpCommand.append(meta.getBatchSize());
bcpCommand.
b8a2
append(SybaseImpMeta.KEEP_SAME);
bcpCommand.append(" -U ");
bcpCommand.append(meta.getUserName());
bcpCommand.append(" -P ");
bcpCommand.append(Encr.decryptPassword(meta.getPassword()));
bcpCommand.append(" -S ");
bcpCommand.append(meta.getServerName());
bcpCommand.append(" -J ");
bcpCommand.append(meta.getCharacterSet());
bcpCommand.append(" -m ");
bcpCommand.append(meta.getMaxerrors());
bcpCommand.append(" -I ");
bcpCommand.append(mkInterfaceFile(meta));
return bcpCommand.toString();
}
private String mkInterfaceFile(SybaseImpMeta meta){
/*
* KFMOXIN01
master tcp ether 192.168.12.94 5000
query tcp ether 192.168.12.94 5000
*/
StringBuilder context = new StringBuilder();
context.append(meta.getServerName());
context.append("\n\t");
context.append(meta.getDatabaseName());
context.append(" tcp ether ");
context.append(meta.getIp());
context.append(" ");
context.append(meta.getPort());
context.append("\n\tquery tcp ether ");
context.append(meta.getIp());
context.append(" ");
context.append(meta.getPort());
context.append("\n");
String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_imp";
String interfaces = meta.getSybasePath()+filename;
File f = new File(interfaces);
if(f.exists()){
f.delete();
}
try {
f.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
try {
OutputStream os = new FileOutputStream(f);
byte [] b = context.toString().getBytes();
os.write(b);
os.flush();
os.close();
} catch (Exception e) {
e.printStackTrace();
}
return interfaces;
}
private final class StreamLogger extends Thread {
private InputStream input;
private String type;
String rows = null;
StreamLogger( InputStream is, String type ) {
this.input = is;
this.type = type + ">";
}
public void run() {
try {
final BufferedReader br = new BufferedReader(new InputStreamReader(input));
String line;
while ((line = br.readLine()) != null) {
//48 rows copied.
if (line.contains("rows copied")) {
rows =line.split("rows copied")[0].trim();
System.out.println(">>>>>>>>>>>>>>rows is :" + rows + " loaded <<<<<<<<<<<");
}
if (log.isBasic()) {
logBasic(type + line);
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
if ( super.init( smi, sdi ) ) {
return true;
}
return false;
}
public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
super.dispose( smi, sdi );
if ( "Y".equalsIgnoreCase(meta.getEraseFiles()) ) {
FileObject fileObject = null;
if ( meta.getFileName() != null ) {
try {
fileObject = KettleVFS.getFileObject( environmentSubstitute( meta.getFileName() ), getTransMeta() );
fileObject.delete();
fileObject.close();
} catch ( Exception ex ) {
logError( "Error deleting data file \'" + KettleVFS.getFilename( fileObject ) + "\': " + ex.getMessage() );
}
}
}
}
}
Process process = runtime.exec();
执行export LANG=C;export SYBASE=/tmp/syabse/;export PATH=$PATH:$SYBASE;bcp
要按照以下方式执行process = runTime.exec(new String[]{"/bin/bash", "-c", command});以上的还不知道原理
2 使用 shell 生成 fifo 文件 mkfifo + fileName
(1) 为了防止java中获得输入流阻塞 到时程序不执行,
先要生成输出流或者执行输出流的命令,即生成fifo文件后 先用一个线程执行bcp in
(2) 然后使用另一线程 执行bcp out
具体实现如下package org.pentaho.di.trans.steps.sybaseimp;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import org.pentaho.di.core.encryption.Encr;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.vfs.KettleVFS;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.apache.commons.vfs2.FileObject;
public class SybaseImp extends BaseStep implements StepInterface {
private static Class<?> PKG = SybaseImp.class;
private SybaseImpMeta meta;
protected SybaseImpData data;
// -------------------------- construction
// ------------------------------------------------------
public SybaseImp(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}
// -------------------------- unload one table data to file
// ----------------------------------------------------------
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
//是否等待 此参数控制导入插件是否可直接使用
if ("Y".equalsIgnoreCase( meta.getToWait()) ) {
//if 动态加载 此参数控制 导出插件生成数据文件则直接执行导入
if("Y".equalsIgnoreCase( meta.getIsFIFO()) ){
File dataFile = new File(meta.getFileName());
while (true) {
//若停止则 删除数据文件 返回
if(isStopped()){
if(dataFile.exists()){
dataFile.delete();
}
return false;
}
//判断导出插件是否生成数据文件 生成数据文件跳出循环 执行bcp
if(dataFile.exists()){
break;
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//else 非动态加载 要等待 导出插件生成 标志文件证明导出结束 再执行 bcp
}else{
String flagFile = meta.getFileName().substring(0, meta.getFileName().lastIndexOf(".")) + ".flag";
File f = new File(flagFile);
while (true) {
if(isStopped()){
File dataFile = new File(meta.getFileName());
if(dataFile.exists()){
dataFile.delete();
}
if (f.exists()) {
f.delete();
}
return false;
}
if (f.exists()) {
f.delete();
break;
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 此eles 就证明导入插件单独使用 不做任何动作
}else{
// do nothing
}
// all commands
String[] commands = generateCommandFromMeta(meta);
// execute all commands
// any exception is failed
for (int i = 0; i < commands.length; i++) {
int exitValue = executeCommand(commands[i]);
if (exitValue != 0) {
stopAll();
setErrors(1);
setOutputDone();
throw new KettleException("Error while run SybaseImp : " + commands[i]);
}
}
setOutputDone();
return false;
}
private int executeCommand(String command) throws KettleException {
logBasic("execute command :" +command);
Runtime runTime = Runtime.getRuntime();
Process process = null;
int exitValue = -1;
try {
process = runTime.exec(new String[]{"/bin/bash", "-c", command});
StreamLogger streamLogger = new StreamLogger(process.getInputStream(), "OUTPUT");
StreamLogger errorStreamLogger = new StreamLogger(process.getErrorStream(), "ERROR");
errorStreamLogger.start();
streamLogger.start();
exitValue = process.waitFor();
long cur = System.currentTimeMillis();
streamLogger.join();
logBasic(">>>>>wait for :"+(System.currentTimeMillis()-cur));
String rows = streamLogger.rows;
if(null != rows ){
//super.setLinesInput(Long.parseLong(rows));
super.setLinesOutput(Long.parseLong(rows));
//super.setLinesRead(Long.parseLong(rows));
super.setLinesWritten(Long.parseLong(rows));
deleteInterfacesFile(meta);
}
} catch (Exception e) {
e.printStackTrace();
throw new KettleException(
BaseMessages.getString(PKG, "SybaseImp.Execption.execute", command));
}
logBasic("execute command :" +(exitValue==0?"succeed":"failed"));
return exitValue;
}
private void deleteInterfacesFile(SybaseImpMeta meta) {
String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_imp";
String interfaces = meta.getSybasePath()+filename;
File f = new File(interfaces);
if(f.exists()){
f.delete();
}
}
private String[] generateCommandFromMeta(SybaseImpMeta meta) {
String[] commandLines = new String[1];
String exportLang = SybaseImpMeta.EXPORT_LANG;
String exportSybase = "export SYBASE=" + meta.getSybasePath().trim();
String exportPath = "export PATH=$PATH:" + meta.getBcpPath().trim();
String bcpCommand = generateBcpFromMeta(meta);
String exportAndBcpCommand = exportLang + ";" + exportSybase + ";" + exportPath + ";" + bcpCommand;
commandLines[0] = exportAndBcpCommand;
return commandLines;
}
private String generateBcpFromMeta(SybaseImpMeta meta) {
//bcp64 zxs.dbo.zxs in "zxs.txt" -c -t "|" -b 10000 -Y -U sa -P 123456 -S KFMOXIN01 -J cp936
StringBuilder bcpCommand = new StringBuilder("bcp64 ");
bcpCommand.append(meta.getDatabaseName());
bcpCommand.append(".");
bcpCommand.append(meta.getSchemaName());
bcpCommand.append(".");
bcpCommand.append(meta.getTableName());
bcpCommand.append(" in \"");
bcpCommand.append(meta.getFileName());
bcpCommand.append("\" -c -t \"");
bcpCommand.append(meta.getDelimiter());
bcpCommand.append("\" -b ");
bcpCommand.append(meta.getBatchSize());
bcpCommand.
b8a2
append(SybaseImpMeta.KEEP_SAME);
bcpCommand.append(" -U ");
bcpCommand.append(meta.getUserName());
bcpCommand.append(" -P ");
bcpCommand.append(Encr.decryptPassword(meta.getPassword()));
bcpCommand.append(" -S ");
bcpCommand.append(meta.getServerName());
bcpCommand.append(" -J ");
bcpCommand.append(meta.getCharacterSet());
bcpCommand.append(" -m ");
bcpCommand.append(meta.getMaxerrors());
bcpCommand.append(" -I ");
bcpCommand.append(mkInterfaceFile(meta));
return bcpCommand.toString();
}
private String mkInterfaceFile(SybaseImpMeta meta){
/*
* KFMOXIN01
master tcp ether 192.168.12.94 5000
query tcp ether 192.168.12.94 5000
*/
StringBuilder context = new StringBuilder();
context.append(meta.getServerName());
context.append("\n\t");
context.append(meta.getDatabaseName());
context.append(" tcp ether ");
context.append(meta.getIp());
context.append(" ");
context.append(meta.getPort());
context.append("\n\tquery tcp ether ");
context.append(meta.getIp());
context.append(" ");
context.append(meta.getPort());
context.append("\n");
String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_imp";
String interfaces = meta.getSybasePath()+filename;
File f = new File(interfaces);
if(f.exists()){
f.delete();
}
try {
f.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
try {
OutputStream os = new FileOutputStream(f);
byte [] b = context.toString().getBytes();
os.write(b);
os.flush();
os.close();
} catch (Exception e) {
e.printStackTrace();
}
return interfaces;
}
private final class StreamLogger extends Thread {
private InputStream input;
private String type;
String rows = null;
StreamLogger( InputStream is, String type ) {
this.input = is;
this.type = type + ">";
}
public void run() {
try {
final BufferedReader br = new BufferedReader(new InputStreamReader(input));
String line;
while ((line = br.readLine()) != null) {
//48 rows copied.
if (line.contains("rows copied")) {
rows =line.split("rows copied")[0].trim();
System.out.println(">>>>>>>>>>>>>>rows is :" + rows + " loaded <<<<<<<<<<<");
}
if (log.isBasic()) {
logBasic(type + line);
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
if ( super.init( smi, sdi ) ) {
return true;
}
return false;
}
public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (SybaseImpMeta) smi;
data = (SybaseImpData) sdi;
super.dispose( smi, sdi );
if ( "Y".equalsIgnoreCase(meta.getEraseFiles()) ) {
FileObject fileObject = null;
if ( meta.getFileName() != null ) {
try {
fileObject = KettleVFS.getFileObject( environmentSubstitute( meta.getFileName() ), getTransMeta() );
fileObject.delete();
fileObject.close();
} catch ( Exception ex ) {
logError( "Error deleting data file \'" + KettleVFS.getFilename( fileObject ) + "\': " + ex.getMessage() );
}
}
}
}
}
package org.pentaho.di.trans.steps.sybaseexp; import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import org.pentaho.di.core.encryption.Encr; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.BaseStep; import org.pentaho.di.trans.step.StepDataInterface; import org.pentaho.di.trans.step.StepInterface; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMetaInterface; public class SybaseExp extends BaseStep implements StepInterface { private static Class<?> PKG = SybaseExp.class; private SybaseExpMeta meta; protected SybaseExpData data; // -------------------------- construction // ------------------------------------------------------ public SybaseExp(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans) { super(stepMeta, stepDataInterface, copyNr, transMeta, trans); } // -------------------------- unload one table data to file // ---------------------------------------------------------- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { meta = (SybaseExpMeta) smi; data = (SybaseExpData) sdi; // delete history data file deleteHistoryDataFile(meta); // all commands String[] commands = generateCommandFromMeta(meta); if(isStopped()){ return false; } // execute all commands // any exception is failed if("Y".equalsIgnoreCase(meta.getIsFIFO())){ createFifoFile(meta); } for(int i = 0;i<commands.length;i++){ int exitValue = executeCommand(commands[i]); if(exitValue != 0){ stopAll(); setErrors(1); setOutputDone(); throw new KettleException("Error while run SybaseExp : "+commands[i]); } } // create flag file if("N".equalsIgnoreCase(meta.getIsFIFO())){ createFlagFile(meta); } setOutputDone(); return false; } private void deleteHistoryDataFile(SybaseExpMeta meta) { String dataFile = meta.getFileName(); logBasic("if file :" +dataFile+ " exist delete "); File f = new File(dataFile); if(f.exists()){ f.delete(); } } private void createFlagFile(SybaseExpMeta meta) throws KettleException { String flagFile = meta.getFileName().substring(0, meta.getFileName().lastIndexOf(".")+1)+"flag"; logBasic("create file :" +flagFile); File f = new File(flagFile); if(f.exists()){ f.delete(); } try { f.createNewFile(); } catch (IOException e) { e.printStackTrace(); throw new KettleException("Error while create flag file : "); } } private int executeCommand(String command) throws KettleException { logBasic("execute command :" +command); Runtime runTime = Runtime.getRuntime(); Process process = null; int exitValue = -1; try { process = runTime.exec(new String[]{"/bin/bash", "-c", command}); StreamLogger streamLogger = new StreamLogger(process.getInputStream(), "OUTPUT"); StreamLogger errorStreamLogger = new StreamLogger(process.getErrorStream(), "ERROR"); errorStreamLogger.start(); streamLogger.start(); exitValue = process.waitFor(); long cur = System.currentTimeMillis(); streamLogger.join(); logBasic(">>>>>wait for :"+(System.currentTimeMillis()-cur)); String rows = streamLogger.rows; if(null != rows ){ super.setLinesInput(Long.parseLong(rows)); //super.setLinesOutput(Long.parseLong(rows)); super.setLinesRead(Long.parseLong(rows)); //super.setLinesWritten(Long.parseLong(rows)); deleteInterfacesFile(meta); } } catch (Exception e) { e.printStackTrace(); throw new KettleException( BaseMessages.getString(PKG, "SybaseExp.Execption.execute", command)); } logBasic("execute command :" +(exitValue==0?"succeed":"failed")); return exitValue; } private void deleteInterfacesFile(SybaseExpMeta meta) { String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_exp"; String interfaces = meta.getSybasePath()+filename; File f = new File(interfaces); if(f.exists()){ f.delete(); } } private String[] generateCommandFromMeta(SybaseExpMeta meta) { String[] commandLines = new String[1]; String exportLang = SybaseExpMeta.EXPORT_LANG; String exportSybase = "export SYBASE=" + meta.getSybasePath().trim(); String exportPath = "export PATH=$PATH:" + meta.getBcpPath().trim(); String bcpCommand = generateBcpFromMeta(meta); String exportAndBcpCommand = exportLang + ";" + exportSybase + ";" + exportPath +";" + bcpCommand; commandLines[0] = exportAndBcpCommand; return commandLines; } private void createFifoFile(SybaseExpMeta meta){ String createFifoConnamd = "mkfifo " + meta.getFileName(); try { executeCommand(createFifoConnamd); } catch (KettleException e) { e.printStackTrace(); } } private String generateBcpFromMeta(SybaseExpMeta meta) { // create first in first out file //createFifoFile(meta); //bcp64 zxs.dbo.zxs out "zxs.txt" -c -t"|" -T32000 -Usa -P123456 -S KFMOXIN01 -Jcp936 StringBuilder bcpCommand = new StringBuilder("bcp64 "); bcpCommand.append(meta.getDatabaseName()); bcpCommand.append("."); bcpCommand.append(meta.getSchemaName()); bcpCommand.append("."); bcpCommand.append(meta.getTableName()); bcpCommand.append(" out \""); bcpCommand.append(meta.getFileName()); bcpCommand.append("\" -c -t \""); bcpCommand.append(meta.getDelimiter()); bcpCommand.append("\" -U "); bcpCommand.append(meta.getUserName()); bcpCommand.append(" -P "); bcpCommand.append(Encr.decryptPassword(meta.getPassword())); bcpCommand.append(" -S "); bcpCommand.append(meta.getServerName()); bcpCommand.append(" -J "); bcpCommand.append(meta.getCharacterSet()); bcpCommand.append(" -I "); bcpCommand.append(mkInterfaceFile(meta)); return bcpCommand.toString(); } private String mkInterfaceFile(SybaseExpMeta meta){ /* * KFMOXIN01 master tcp ether 192.168.12.94 5000 query tcp ether 192.168.12.94 5000 */ StringBuilder context = new StringBuilder(); context.append(meta.getServerName()); context.append("\n\t"); context.append(meta.getDatabaseName()); context.append(" tcp ether "); context.append(meta.getIp()); context.append(" "); context.append(meta.getPort()); context.append("\n\tquery tcp ether "); context.append(meta.getIp()); context.append(" "); context.append(meta.getPort()); context.append("\n"); String filename = meta.getSchemaName()+"_"+meta.getTableName()+"_exp"; String interfaces = meta.getSybasePath()+filename; File f = new File(interfaces); if(f.exists()){ f.delete(); } try { f.createNewFile(); } catch (IOException e) { e.printStackTrace(); } try { OutputStream os = new FileOutputStream(f); byte [] b = context.toString().getBytes(); os.write(b); os.flush(); os.close(); } catch (Exception e) { e.printStackTrace(); } return interfaces; } private final class StreamLogger extends Thread { private InputStream input; private String type; String rows = null; StreamLogger( InputStream is, String type ) { this.input = is; this.type = type + ">"; } public void run() { try { final BufferedReader br = new BufferedReader(new InputStreamReader(input)); String line; int count=0; while ((line = br.readLine()) != null) { count++; if (line.contains("rows copied")) { rows =line.split("rows copied")[0].trim(); System.out.println(">>>>>>>>>>>>>>rows is :" + rows + " unloaded <<<<<<<<<<<"); } if (log.isBasic()&&count%10==0) { logBasic(type + line); } } } catch (IOException ioe) { ioe.printStackTrace(); } } } public boolean init( StepMetaInterface smi, StepDataInterface sdi ) { meta = (SybaseExpMeta) smi; data = (SybaseExpData) sdi; if ( super.init( smi, sdi ) ) { return true; } return false; } public void dispose( StepMetaInterface smi, StepDataInterface sdi ) { meta = (SybaseExpMeta) smi; data = (SybaseExpData) sdi; super.dispose( smi, sdi ); } }
相关文章推荐
- 使用bcp工具导入和导出批量数据
- 学习总结:sybase中bcp命令批量导出和导入
- 使用bcp进行数据导出导入
- SQL Server 使用bcp进行大数据量导出导入
- Sql Server 使用bcp进行数据导出导入 .
- 使用bcp进行大数据量导出导入
- sql 使用bcp进行导入导出数据
- 使用Transact-SQL进行数据导入导出方法详解
- PuTTY使用笔记:登录设置的批量备份导出/导入
- 用poi框架进行批量导入导出实例
- Oracle使用par文件进行全库导入导出
- [ZT]PuTTY使用笔记:登录设置的批量备份导出/导入
- SQL Server中bcp命令的用法以及数据批量导入导出
- SQL Server数据导入导出工具BCP使用详解
- 使用Transact-SQL进行数据导入导出方法详解
- 使用BCP导出导入数据
- SQL2000中BCP轻松使用导入导出数据
- SQL server 数据导入导出BCP工具使用详解
- SQL Server BI Step by Step 2--- 使用SSIS进行简单的数据导入导出
- 使用BCP从Sybase远程数据库中导出数据