您的位置:首页 > 运维架构

Sqoop2 API使用详解

2018-03-14 00:40 176 查看
Sqoop2服务端版本 1.99.5

下载地址:http://archive.apache.org/dist/sqoop/

maven 依赖

<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-client</artifactId>
<version>1.99.4</version>
</dependency>


初始化

String url = "http://ip:12000/sqoop/";
SqoopClient client = new SqoopClient(url);

//可以使用客户端对象的setServerUrl(String)方法来修改服务器的URL
client.setServerUrl(newUrl);


创建JDBC Link

创建Link之前首先要了解你需要创建一个什么类型的Link,你的Sqoop2的版本支持什么样的连接器;

ps:通过sqoop的shell命令可以查看到当前版本支持什么类型的连接器以及连接器id对应name(命令:show connector)

public static void saveJdbcLink(SqoopClient client) {
//创建一个源链接 JDBC
//1表示jdbc连接的id,各个版本不同操作方式不同
//如有的版本直接使用连接器名称创建Link,
//如:MLink link = client.createLink("generic-jdbc-connector");
long connectorId = 1;
MLink fromLink = client.createLink(connectorId);
fromLink.setName("jdbcLink");
fromLink.setCreationUser("Buffy");
MLinkConfig linkConfig = fromLink.getConnectorLinkConfig();
// 填入连接配置的值
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://ip:3306/dataswitch");
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
linkConfig.getStringInput("linkConfig.username").setValue("username");
linkConfig.getStringInput("linkConfig.password").setValue("pw");

// 保存填充过的连接对象
Status status = client.saveLink(fromLink);
if (status.canProceed()) {
System.out.println("Created Link with Link Name : " + fromLink.getName() + "-id : " + fromLink.getPersistenceId());
} else {
System.out.println("Something went wrong creating the link");
}
}


创建HDFS Link

public static void saveHdfsLink(SqoopClient client) {
//创建一个目的地链接HDFS
//3表示hdfs连接的id,各个版本不同操作方式不同
//如有的版本直接使用连接器名称创建Link,
//如:MLink link = client.createLink("hdfs-connector");
long toCconnectorId_h = 3;
MLink toLink = client.createLink(toCconnectorId_h);
toLink.setName("hdfsLink");
toLink.setCreationUser("Buffy");
MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://http://ip:8020");
// 保存填充过的连接对象
Status status = client.saveLink(toLink);
if (status.canProceed()) {
System.out.println("Created Link with Link Name : " + toLink.getName() + "-id : " + toLink.getPersistenceId());
} else {
System.out.println("Something went wrong creating the link");
}
}


创建作业

public static void saveCondiJob(SqoopClient client) {
//为创建Link时返回的id -- toLink.getPersistenceId()
long fromLinkId = 29;// for jdbc connector
long toLinkId = 30; // for jdbc  connector

MJob job = client.createJob(fromLinkId, toLinkId);
job.setName("jobCondii");
job.setCreationUser("jobCondia");
// set the "FROM" link job config values
MFromConfig fromJobConfig = job.getFromJobConfig();
//fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("dataswitch");
//以下按指定条件查询${CONDITIONS}为必须,代码会解析sql是否包含子字符串,如果指定sql上一行代码不需指定会报错
fromJobConfig.getStringInput("fromJobConfig.sql").setValue("SELECT id,name,dept FROM dataswitch.zcstest where ${CONDITIONS} and name like \'%狗%\' ");
fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");//分区字段

// set the "TO" link job config values
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");//指定hdfs路径
// set the driver config values
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(2); //指定运行MR程序map个数
driverConfig.getIntegerInput("throttlingConfig.numLoaders").setValue(1);    //指定运行MR程序reduce个数
Status status = client.saveJob(job);
List<MConfig> configs = job.getFromJobConfig().getConfigs();
printMessage(configs);
if (status.canProceed()) {
System.out.println("Created Job with Job Id: " + job.getPersistenceId());
} else {
System.out.println("Something went wrong creating the job");
}
}


启动作业

public static void startJob(SqoopClient client) {
//为创建作业时返回的id -- job.getPersistenceId()
long jobId = 30;
MSubmission submission = client.startJob(jobId);
System.out.println("Job Submission Status : " + submission.getStatus());
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
System.out.println("Hadoop job id :" + submission.getExternalId());
System.out.println("Job link : " + submission.getExternalLink());
Counters counters = submission.getCounters();
if(counters != null) {
System.out.println("Counters:");
for(CounterGroup group : counters) {
System.out.print("\t");
System.out.println(group.getName());
for(Counter counter : group) {
System.out.print("\t\t");
System.out.print(counter.getName());
System.out.print(": ");
System.out.println(counter.getValue());
}
}
}
if(submission.getExceptionInfo() != null) {
System.out.println("Exception info : " +submission.getExceptionInfo());
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: