使用函数计算对表格存储中数据做简单清洗
2018-05-16 23:03
489 查看
摘要: 表格存储的增量数据流功能能够使用户使用API获取Table Store表中增量数据,并可以进行增量数据流的实时增量分析、数据增量同步等。通过创建Table Store触发器,能够实现Table Store Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改,充分的利用了函数计算全托管、弹性伸缩的特点。函数计算(Function Compute) 是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费。Table Store Stream是用于获取Table Store表中增量数据的一个数据通道,通过创建Table Store触发器,能够实现Table Store Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改。表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储,我们可以将数据写入到表格存储中,同时在函数计算中对新增的数据做简单的清洗、转换、聚合计算等操作,并将清洗之后的数据写回到表格存储的结果表中,并对原始明细数据及结果数据提供实时访问。下面,我们使用函数计算对表格存储中的数据做简单的清洗,并写入到结果表中。
我们需要将 level>1 的日志写入到另外一张数据表中,用作专门的查询。
Stream记录过期时长 为通过 StreamAPI 能够读取到的增量数据的最长时间。由于触发器只能绑定现有的函数,故先到函数计算的控制台上在同region创建服务及函数。
2.创建函数依次选择:空白函数——不创建触发器。
函数名称为:etl_test,选择 python2.7 环境,在线编辑代码
函数入口为:etl_test.handler
代码稍后编辑,点击下一步。
3.进行服务授权由于函数计算需要将运行中的日志写入到日志服务中,同时,需要对表格存储的表进行读写,故需要对函数计算进行授权,为方便起见,我们先添加 AliyunOTSFullAccess 与 __AliyunLogFullAccess __权限,实际生产中,建议根据权限最小原则来添加权限。
4.点击授权完成,并创建函数。5.修改函数代码。创建好函数之后,点击对应的
使用示例代码如下:
2.绑定成功之后,能够看到如下的信息:
2.在 result 表中查询清洗后的数据
原文链接
数据定义
我们假设写入的为日志数据,包括三个基础字段:字段名称 | 类型 | 含义 |
---|---|---|
id | 整型 | 日志id |
level | 整型 | 日志的等级,越大表明等级越高 |
message | 字符串 | 日志的内容 |
实现过程:
创建实例及数据表
在表格存储的控制台创建表格存储实例(__本次以 华东2 distribute-test 为例__),并创建源表(__source_data__)及结果表(__result__),主键为均 __id (整型)__,由于表格存储是 schemafree 结构,无需预先定义其他属性列字段。开启数据源表的Stream功能
触发器功能需要先开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。Stream记录过期时长 为通过 StreamAPI 能够读取到的增量数据的最长时间。由于触发器只能绑定现有的函数,故先到函数计算的控制台上在同region创建服务及函数。
创建函数计算服务
在函数计算的控制台上创建服务及处理函数,我们继续使用华东2节点。1.在华东2节点创建服务。2.创建函数依次选择:空白函数——不创建触发器。
函数名称为:etl_test,选择 python2.7 环境,在线编辑代码
函数入口为:etl_test.handler
代码稍后编辑,点击下一步。
3.进行服务授权由于函数计算需要将运行中的日志写入到日志服务中,同时,需要对表格存储的表进行读写,故需要对函数计算进行授权,为方便起见,我们先添加 AliyunOTSFullAccess 与 __AliyunLogFullAccess __权限,实际生产中,建议根据权限最小原则来添加权限。
4.点击授权完成,并创建函数。5.修改函数代码。创建好函数之后,点击对应的
函数—
代码执行,编辑代码并保存,其中,INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)需要根据情况进行修改:
使用示例代码如下:
#!/usr/bin/env python# -*- coding: utf-8 -*-import cborimport jsonimport tablestore as ots INSTANCE_NAME = 'distribute-test'REGION = 'cn-shanghai'ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION) RESULT_TABLENAME = 'result'def _utf8(input): return str(bytearray(input, "utf-8"))def get_attrbute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value']def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value']#由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限def get_ots_client(context): creds = context.credentials client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken) return clientdef save_to_ots(client, record): id = int(get_pk_value(record, 'id')) level = int(get_attrbute_value(record, 'level')) msg = get_attrbute_value(record, 'message') pk = [(_utf8('id'), id),] attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),] row = ots.Row(pk, attr) client.put_row(RESULT_TABLENAME, row)def handler(event, context): records = cbor.loads(event) #records = json.loads(event) client = get_ots_client(context) for record in records['Records']: level = int(get_attrbute_value(record, 'level')) if level > 1: save_to_ots(client, record) else: print "Level <= 1, ignore."对表格存储 Stream 数据的格式详情请参考Stream 数据处理
绑定触发器
1.回到表格存储的实例管理页面,点击表 source_data 后的 使用触发器 按钮,进入触发器绑定界面,点击使用已有函数计算, 选择刚创建的服务及函数,勾选
表格存储发送事件通知的权限, 进行确定。
2.绑定成功之后,能够看到如下的信息:
运行验证
1.向 source_data 表中写入数据。2.在 result 表中查询清洗后的数据
点击 result 表的数据管理页面,会查询到刚写入到 source_data 中的数据。 当然,向 soure_data 写入level <=1的数据将不会同步到 result 表中
原文链接
相关文章推荐
- 阿里云API网关、函数计算、表格存储简单结合使用小结
- c#中使用NetCDF存储二维数据的读写操作简单应用
- Python使用shelve模块实现简单数据存储的方法
- Hadoop2.4.1 使用MapReduce简单的数据清洗
- 一个简单的使用Quartz和Oozie调度作业给大数据计算平台执行
- MDX Cookbook 12 - 计算 SMA 简单移动平均 LastPeriods() 函数的使用
- android数据存储_SharedPreferences的简单使用
- android学习记录3(数据存储的方式:sqlite、sp、存文件。listview简单使用)
- Adnroid中的数据存储大全,以及ActiveAndroid的简单使用
- Emacs中使用Forms-mode以表格形式展示/编辑简单的文本数据
- 一共81个,开源大数据处理工具汇总:查询引擎、流式计算、迭代计算、离线计算、键值存储、表格存储、文件存储、资源管理、日志收集系统、消息系统、分布式服务、集群管理、基础设施、搜索引擎、数据挖掘=监控
- 灵活使用struct实现简单结构化数据的存储
- MPI编程--使用最简单的基本函数计算PI值
- iOS 6 编程--Core Data持久化数据存储(2)-使用Core Data实现简单ShoppingCart应用程序
- JSP基础语法之一:Scriptlet使用、简单的JSP获取表单数据再输出表格
- . 有一个一维数组,里面存储整形数据,请写一个函数,将他们按从大到小的顺序排列,要求执行效率高,并说明如何改善执行效率(该函数必须自己实现,不能使用php函数)。
- 简单的数据存储--Preferences的使用
- 编写一个程序,要求用户输入最多10个高尔夫成绩,并将其存储在一个数组中。 程序允许用户提早结束输入,并在一行上显示所有成绩,然后报告平均成绩。 请使用3个数组处理函数来分别进行输入、显示和计算
- 数据表格显示标签 - DisplayTag - 使用简单实例