ODPS使用之上传下载数据
2017-07-25 15:08
344 查看
这篇文章里面我们介绍一下如果我们要想从ODPS里面读到数据并加工处理,有一种通道叫批量数据通道。这个需要在程序里面做处理。写一个非常简单的示例程序:
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
public class DownloadSample {
private static String accessId = "<your access id>";
private static String accessKey = "<your access Key>";
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
PartitionSpec partitionSpec = new PartitionSpec(partition);
try {
DownloadSession downloadSession = tunnel.createDownloadSession(project, table,
partitionSpec);
System.out.println("Session Status is : "
+ downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
RecordReader recordReader = downloadSession.openRecordReader(0,
count);
Record record;
while ((record = recordReader.read()) != null) {
consumeRecord(record, downloadSession.getSchema());
}
recordReader.close();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
private static void consumeRecord(Record record, TableSchema schema) {
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
String colValue = null;
switch (column.getType()) {
case BIGINT: {
Long v = record.getBigint(i);
colValue = v == null ? null : v.toString();
break;
}
case BOOLEAN: {
Boolean v = record.getBoolean(i);
colValue = v == null ? null : v.toString();
break;
}
case DATETIME: {
Date v = record.getDatetime(i);
colValue = v == null ? null : v.toString();
break;
}
case DOUBLE: {
Double v = record.getDouble(i);
colValue = v == null ? null : v.toString();
break;
}
case STRING: {
String v = record.getString(i);
colValue = v == null ? null : v.toString();
break;
}
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
System.out.print(colValue == null ? "null" : colValue);
if (i != schema.getColumns().size())
System.out.print("\t");
}
System.out.println();
}
}
使用这种方法最大的一个好处就是避免通过SDK读的时候会有行限制。所以使用SDK的方法会有问题。建议采用这种方式去读数据。
进一步优化的,如果读到的数据量非常大比如百万条,那处理的时候不可能等它全部处理完。可能需要一边读数据,一边处理,采用流式计算来做。
参考如下代码:
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
public class DownloadSample {
private static String accessId = "<your access id>";
private static String accessKey = "<your access Key>";
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
PartitionSpec partitionSpec = new PartitionSpec(partition);
try {
DownloadSession downloadSession = tunnel.createDownloadSession(project, table,
partitionSpec);
System.out.println("Session Status is : "
+ downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
RecordReader recordReader = downloadSession.openRecordReader(0,
count);
Record record;
while ((record = recordReader.read()) != null) {
consumeRecord(record, downloadSession.getSchema());
}
recordReader.close();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
private static void consumeRecord(Record record, TableSchema schema) {
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
String colValue = null;
switch (column.getType()) {
case BIGINT: {
Long v = record.getBigint(i);
colValue = v == null ? null : v.toString();
break;
}
case BOOLEAN: {
Boolean v = record.getBoolean(i);
colValue = v == null ? null : v.toString();
break;
}
case DATETIME: {
Date v = record.getDatetime(i);
colValue = v == null ? null : v.toString();
break;
}
case DOUBLE: {
Double v = record.getDouble(i);
colValue = v == null ? null : v.toString();
break;
}
case STRING: {
String v = record.getString(i);
colValue = v == null ? null : v.toString();
break;
}
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
System.out.print(colValue == null ? "null" : colValue);
if (i != schema.getColumns().size())
System.out.print("\t");
}
System.out.println();
}
}
使用这种方法最大的一个好处就是避免通过SDK读的时候会有行限制。所以使用SDK的方法会有问题。建议采用这种方式去读数据。
进一步优化的,如果读到的数据量非常大比如百万条,那处理的时候不可能等它全部处理完。可能需要一边读数据,一边处理,采用流式计算来做。
参考如下代码:
相关文章推荐
- ie8下使用jquery-file-upload上传文件后返回json格式数据提示下载
- 使用httpclient实现上传下载(javaWeb系统数据传输http实现)
- 通过使用libcurl POST数据和上传与下载文件
- 一个使用WebClient和WebApi上传下载数据的方法
- 使用ajaxFileUpload与SpringMVC实现异步上传下载文件并返回json数据
- 使用cmd命令方式登录ftp实现上传下载文件数据
- IOS使用MKNetworkKit框架实现下载数据和上传数据
- Swift - 使用NSURLSession加载数据、下载、上传文件
- 使用FileUpload工具实现文件上传与使用ThreadLocal容器传递数据实现分页查询下载
- SilverLight学习笔记--使用WebClient实现通讯(一)(上传和下载字符串数据)
- MaxCompute Studio使用心得系列1——本地数据上传下载
- 使用Opengl PBO上传下载数据
- 怎样使用服务器上传下载数据
- 使用httpclient实现上传下载(javaWeb系统数据传输http实现)
- 使用QT界面进行onetnet数据上传与下载
- Swift - 使用NSURLSession加载数据、下载、上传文件
- Swift - 使用URLSession加载数据、下载、上传文件
- MaxCompute Studio使用心得系列1——本地数据上传下载
- 使用httpclient实现上传下载(javaWeb系统数据传输http实现)
- SilverLight学习笔记--使用WebClient实现通讯(一)(上传和下载字符串数据)