您的位置:首页 > 其它

ODPS使用之上传下载数据

2017-07-25 15:08 344 查看
 这篇文章里面我们介绍一下如果我们要想从ODPS里面读到数据并加工处理,有一种通道叫批量数据通道。这个需要在程序里面做处理。写一个非常简单的示例程序:

import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
public class DownloadSample {
private static String accessId = "<your access id>";
private static String accessKey = "<your access Key>";
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
PartitionSpec partitionSpec = new PartitionSpec(partition);
try {
DownloadSession downloadSession = tunnel.createDownloadSession(project, table,
partitionSpec);
System.out.println("Session Status is : "
+ downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
RecordReader recordReader = downloadSession.openRecordReader(0,
count);
Record record;
while ((record = recordReader.read()) != null) {
consumeRecord(record, downloadSession.getSchema());
}
recordReader.close();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
private static void consumeRecord(Record record, TableSchema schema) {
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
String colValue = null;
switch (column.getType()) {
case BIGINT: {
Long v = record.getBigint(i);
colValue = v == null ? null : v.toString();
break;
}
case BOOLEAN: {
Boolean v = record.getBoolean(i);
colValue = v == null ? null : v.toString();
break;
}
case DATETIME: {
Date v = record.getDatetime(i);
colValue = v == null ? null : v.toString();
break;
}
case DOUBLE: {
Double v = record.getDouble(i);
colValue = v == null ? null : v.toString();
break;
}
case STRING: {
String v = record.getString(i);
colValue = v == null ? null : v.toString();
break;
}
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
System.out.print(colValue == null ? "null" : colValue);
if (i != schema.getColumns().size())
System.out.print("\t");
}
System.out.println();
}
}
使用这种方法最大的一个好处就是避免通过SDK读的时候会有行限制。所以使用SDK的方法会有问题。建议采用这种方式去读数据。

进一步优化的,如果读到的数据量非常大比如百万条,那处理的时候不可能等它全部处理完。可能需要一边读数据,一边处理,采用流式计算来做。

参考如下代码:
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: