Sqoop2 API使用详解
2018-03-14 00:40
176 查看
Sqoop2服务端版本 1.99.5
下载地址:http://archive.apache.org/dist/sqoop/
ps:通过sqoop的shell命令可以查看到当前版本支持什么类型的连接器以及连接器id对应name(命令:show connector)
下载地址:http://archive.apache.org/dist/sqoop/
maven 依赖
<dependency> <groupId>org.apache.sqoop</groupId> <artifactId>sqoop-client</artifactId> <version>1.99.4</version> </dependency>
初始化
String url = "http://ip:12000/sqoop/"; SqoopClient client = new SqoopClient(url); //可以使用客户端对象的setServerUrl(String)方法来修改服务器的URL client.setServerUrl(newUrl);
创建JDBC Link
创建Link之前首先要了解你需要创建一个什么类型的Link,你的Sqoop2的版本支持什么样的连接器;ps:通过sqoop的shell命令可以查看到当前版本支持什么类型的连接器以及连接器id对应name(命令:show connector)
public static void saveJdbcLink(SqoopClient client) { //创建一个源链接 JDBC //1表示jdbc连接的id,各个版本不同操作方式不同 //如有的版本直接使用连接器名称创建Link, //如:MLink link = client.createLink("generic-jdbc-connector"); long connectorId = 1; MLink fromLink = client.createLink(connectorId); fromLink.setName("jdbcLink"); fromLink.setCreationUser("Buffy"); MLinkConfig linkConfig = fromLink.getConnectorLinkConfig(); // 填入连接配置的值 linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://ip:3306/dataswitch"); linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); linkConfig.getStringInput("linkConfig.username").setValue("username"); linkConfig.getStringInput("linkConfig.password").setValue("pw"); // 保存填充过的连接对象 Status status = client.saveLink(fromLink); if (status.canProceed()) { System.out.println("Created Link with Link Name : " + fromLink.getName() + "-id : " + fromLink.getPersistenceId()); } else { System.out.println("Something went wrong creating the link"); } }
创建HDFS Link
public static void saveHdfsLink(SqoopClient client) { //创建一个目的地链接HDFS //3表示hdfs连接的id,各个版本不同操作方式不同 //如有的版本直接使用连接器名称创建Link, //如:MLink link = client.createLink("hdfs-connector"); long toCconnectorId_h = 3; MLink toLink = client.createLink(toCconnectorId_h); toLink.setName("hdfsLink"); toLink.setCreationUser("Buffy"); MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig(); toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://http://ip:8020"); // 保存填充过的连接对象 Status status = client.saveLink(toLink); if (status.canProceed()) { System.out.println("Created Link with Link Name : " + toLink.getName() + "-id : " + toLink.getPersistenceId()); } else { System.out.println("Something went wrong creating the link"); } }
创建作业
public static void saveCondiJob(SqoopClient client) { //为创建Link时返回的id -- toLink.getPersistenceId() long fromLinkId = 29;// for jdbc connector long toLinkId = 30; // for jdbc connector MJob job = client.createJob(fromLinkId, toLinkId); job.setName("jobCondii"); job.setCreationUser("jobCondia"); // set the "FROM" link job config values MFromConfig fromJobConfig = job.getFromJobConfig(); //fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("dataswitch"); //以下按指定条件查询${CONDITIONS}为必须,代码会解析sql是否包含子字符串,如果指定sql上一行代码不需指定会报错 fromJobConfig.getStringInput("fromJobConfig.sql").setValue("SELECT id,name,dept FROM dataswitch.zcstest where ${CONDITIONS} and name like \'%狗%\' "); fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");//分区字段 // set the "TO" link job config values MToConfig toJobConfig = job.getToJobConfig(); toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");//指定hdfs路径 // set the driver config values MDriverConfig driverConfig = job.getDriverConfig(); driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(2); //指定运行MR程序map个数 driverConfig.getIntegerInput("throttlingConfig.numLoaders").setValue(1); //指定运行MR程序reduce个数 Status status = client.saveJob(job); List<MConfig> configs = job.getFromJobConfig().getConfigs(); printMessage(configs); if (status.canProceed()) { System.out.println("Created Job with Job Id: " + job.getPersistenceId()); } else { System.out.println("Something went wrong creating the job"); } }
启动作业
public static void startJob(SqoopClient client) { //为创建作业时返回的id -- job.getPersistenceId() long jobId = 30; MSubmission submission = client.startJob(jobId); System.out.println("Job Submission Status : " + submission.getStatus()); if(submission.getStatus().isRunning() && submission.getProgress() != -1) { System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100)); } System.out.println("Hadoop job id :" + submission.getExternalId()); System.out.println("Job link : " + submission.getExternalLink()); Counters counters = submission.getCounters(); if(counters != null) { System.out.println("Counters:"); for(CounterGroup group : counters) { System.out.print("\t"); System.out.println(group.getName()); for(Counter counter : group) { System.out.print("\t\t"); System.out.print(counter.getName()); System.out.print(": "); System.out.println(counter.getValue()); } } } if(submission.getExceptionInfo() != null) { System.out.println("Exception info : " +submission.getExceptionInfo()); } }
相关文章推荐
- FCKeditor API使用详解
- c#中使用api(shfileoperation)进行文件操作,特别详解了回收站相关参数
- Google Map API使用详解(如何在自己的网页中嵌入地图)
- FastDFS的配置、部署与API使用解读(8)FastDFS多种文件上传接口详解
- FastDFS的配置、部署与API使用解读(5)FastDFS配置详解之Tracker配置
- FastDFS的配置、部署与API使用解读(6)FastDFS配置详解之Storage配置
- 详解Java的JDBC API的存储过程与SQL转义语法的使用
- Google Map API使用详解(三)——Google Map基本常识(上)
- Google Map API使用详解(七)——加载Google Map API URL的详细解读
- Google Map API使用详解(五)——Google Map基本常识(下)
- Google Map API使用详解(十)——使用JavaScript创建地图详解(上)
- Google Map API使用详解(十二)——如何在自己的网页中嵌入地图
- Google Map API使用详解(十三)——使用Google Map API实现自定义控件
- FCKeditor API使用详解
- FastDFS的配置、部署与API使用解读(5)FastDFS配置详解之Tracker配置
- Google Map API使用详解(八)——Google Map坐标系统总结(上)
- Windows消息大全使用详解API
- Google Map API使用详解(四)——Google Map基本常识(中)
- Win32 API 函数大全使用详解十 鼠标输入函数
- FastDFS的配置、部署与API使用解读(8)FastDFS多种文件上传接口详解