您的位置:首页 > 其它

使用Hive实现时间拉链功能

2017-05-17 11:13 531 查看
背景:

在数据仓库的数据模型设计过程中,经常会遇到如下的业务需求:

1. 表的数据量很大,大几千万或上亿;

2. 表中的部分字段会被update更新操作,如用户的上级领导,产品的描述信息,订单的状态等等;

3. 需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态;

4. 变化的比例和频率不是很大,比如,总共有8000万的用户,每天新增和发生变化的有30万左右;

5. 如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费;

6. 时间拉链历史表,既能满足反应数据的历史状态,又可以最大程度的节省存储空间。

一. 演示的数据

2015-08-21 以及之前的订单表数据:

1|2015-08-18|2015-08-18|创建

2|2015-08-18|2015-08-18|创建

3|2015-08-19|2015-08-21|支付

4|2015-08-19|2015-08-21|完成

5|2015-08-19|2015-08-20|支付

6|2015-08-20|2015-08-20|创建

7|2015-08-20|2015-08-21|支付

8|2015-08-21|2015-08-21|创建

2015-08-22 订单表数据:

1|2015-08-18|2015-08-22|支付

2|2015-08-18|2015-08-22|完成

6|2015-08-20|2015-08-22|支付

8|2015-08-21|2015-08-22|支付

9|2015-08-22|2015-08-22|创建

10|2015-08-22|2015-08-22|支付

2015-08-23 订单表数据:

1|2015-08-18|2015-08-23|完成

3|2015-08-19|2015-08-23|完成

5|2015-08-19|2015-08-23|完成

8|2015-08-21|2015-08-23|完成

11|2015-08-23|2015-08-23|创建

12|2015-08-23|2015-08-23|创建

13|2015-08-23|2015-08-23|支付

将上面所有的数据全部保存到如下文件中:

/home/Hadoop/hivetestdata/Time_zipper/orders.txt

二. 表结构

源系统中订单表结构:

[sql] view
plain copy

 





use timezipper;  

create table orders (  

    orderid int,  

    createtime string,  

    modifiedtime string,  

    status string  

) row format delimited fields terminated by '|';  

  

load data local inpath "file:///home/hadoop/hivetestdata/Time_zipper/orders.txt" into table orders;  

在数据仓库的ODS层,有一张订单的增量数据表,按天分区,存放每天的增量数据:

[sql] view
plain copy

 





create table t_ods_orders_inc (  

    orderid int,  

    createtime string,  

    modifiedtime string,  

    status string  

) partitioned by (day string) row format delimited fields terminated by '|';  

在数据仓库的DW层,有一张订单的历史数据拉链表,存放订单的历史状态数据:

[sql] view
plain copy

 





create table t_dw_orders_his (  

    orderid int,  

    createtime string,  

    modifiedtime string,  

    status string,  

    dw_start_date string,  

    dw_end_date string  

);  

三. 全量数据初始化

在数据从源业务系统每天正常抽取和刷新到DW订单历史表之前,

需要做一次全量的初始化,就是从源订单表中指定某一天以前的数据全部抽取到ODW,并刷新到DW。

以上面的数据为例,比如在2015-08-22这天做全量初始化,那么我需要将2015-08-21以及之前的所有的数据都抽取并刷新到DW,步骤如下:

第一步:抽取全量数据到ODS:

[sql] view
plain copy

 





use timezipper;  

INSERT overwrite TABLE t_ods_orders_inc PARTITION (day='2015-08-21')  

SELECT orderid,createtime,modifiedtime,status  

FROM orders  

WHERE modifiedtime <= '2015-08-21';  

第二步:从ODS刷新到DW:

[sql] view
plain copy

 





use timezipper;  

INSERT overwrite TABLE t_dw_orders_his  

SELECT orderid,createtime,modifiedtime,status,  

createtime AS dw_start_date,  

'9999-12-31' AS dw_end_date  

FROM t_ods_orders_inc  

WHERE day = '2015-08-21';  

四. 增量抽取历史数据并计算生成时间拉链结果表

从2015-08-23开始,需要每天正常刷新前一天(2015-08-22)的增量数据到历史表,步骤如下:

第一步:通过增量抽取,将2015-08-22的数据抽取到ODS:

[sql] view
plain copy

 





INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = '2015-08-22')  

SELECT orderid,createtime,modifiedtime,status  

FROM orders  

WHERE modifiedtime = '2015-08-22';  

第二步:通过DW历史数据(数据日期为2015-08-21(包含2015-08-21以及之前的数据)),和ODS增量数据(2015-08-22),刷新历史表:

先把数据放到一张临时表中:

[sql] view
plain copy

 





drop table if exists t_dw_orders_his_tmp;  

create table t_dw_orders_his_tmp as   

select  oo.orderid,  

       oo.createtime,  

       oo.modifiedtime,  

       oo.status,  

       oo.dw_start_date,  

       oo.dw_end_date  

from (select   

       tt.orderid,  

       tt.createtime,  

       tt.modifiedtime,  

       tt.status,  

       tt.dw_start_date,  

       tt.dw_end_date,  

       row_number() over(distribute by tt.orderid,tt.createtime,tt.modifiedtime,status 

sort by tt.dw_end_date desc) rn
 

    from (  

         select a.orderid,  

                a.createtime,  

                a.modifiedtime,  

                a.status,  

                a.dw_start_date,  

                -- 2015-08-21保存的都是历史数据  

                case when b.orderid is not null and a.dw_end_date > '2015-08-22' then '2015-08-21'   

                     else a.dw_end_date end as dw_end_date   

                from t_dw_orders_his a   

                left outer join (select * from t_ods_orders_inc where day = '2015-08-22') b   

                on (a.orderid = b.orderid)   

         

        union all   

       

        select orderid,  

               createtime,  

               modifiedtime,  

               status,  

               modifiedtime as dw_start_date,  

               '9999-12-31' as dw_end_date   

          from t_ods_orders_inc   

        where day = '2015-08-22') tt   

) oo where oo.rn =1;  

说明:

UNION ALL的两个结果集中,

第一个是用历史表left outer join 日期为 ${yyy-MM-dd} 的增量表,能关联上的,并且dw_end_date > ${yyy-MM-dd},说明状态有变化,则把原来的dw_end_date置为(${yyy-MM-dd} – 1), 

关联不上的,说明状态无变化,dw_end_date无变化。

第二个结果集是直接将增量数据插入历史表,并将dw_end_date设为9999-12-31​。

最后把临时表中数据插入历史表:

[sql] view
plain copy

 





INSERT overwrite TABLE t_dw_orders_his  

SELECT * FROM t_dw_orders_his_tmp;  

五. 同上面的步骤一样,增量抽取历史数据并计算生成时间拉链结果表

再看将2015-08-23的增量数据刷新到历史表:

[sql] view
plain copy

 





insert overwrite table t_ods_orders_inc partition (day = '2015-08-23')   

select orderid,createtime,modifiedtime,status   

from orders   

where modifiedtime = '2015-08-23';  

   

drop table if exists t_dw_orders_his_tmp;  

create table t_dw_orders_his_tmp as   

select  oo.orderid,  

       oo.createtime,  

       oo.modifiedtime,  

       oo.status,  

       oo.dw_start_date,  

       oo.dw_end_date  

from (select   

       tt.orderid,  

       tt.createtime,  

       tt.modifiedtime,  

       tt.status,  

       tt.dw_start_date,  

       tt.dw_end_date,  

       row_number() over(distribute by tt.orderid,tt.createtime,tt.modifiedtime,status 

sort by tt.dw_end_date desc) rn  

from (  

     select a.orderid,  

            a.createtime,  

            a.modifiedtime,  

            a.status,  

            a.dw_start_date,  

            -- 2015-08-20保存的都是历史数据  

            case when b.orderid is not null and a.dw_end_date > '2015-08-23' then '2015-08-22'   

                 else a.dw_end_date end as dw_end_date   

            from t_dw_orders_his a   

            left outer join (select * from t_ods_orders_inc where day = '2015-08-23') b   

            on (a.orderid = b.orderid)   

     

    union all   

   

    select orderid,  

           createtime,  

           modifiedtime,  

           status,  

           modifiedtime as dw_start_date,  

           '9999-12-31' as dw_end_date   

      from t_ods_orders_inc   

    where day = '2015-08-23') tt   

) oo where oo.rn =1;  

   

   

insert overwrite table t_dw_orders_his   

select * from t_dw_orders_his_tmp;  

六. 查看上面步骤生成的时间拉链结果表

按照上面的方法刷新完后,生成的时间拉链的历史表数据如下:

1 2015-08-18 2015-08-18 创建 2015-08-182015-08-21

1 2015-08-18 2015-08-22 支付 2015-08-222015-08-22

1 2015-08-18 2015-08-23 完成 2015-08-239999-12-31

2 2015-08-18 2015-08-18 创建 2015-08-182015-08-21

2 2015-08-18 2015-08-22 完成 2015-08-229999-12-31

3 2015-08-19 2015-08-21 支付 2015-08-192015-08-22

3 2015-08-19 2015-08-23 完成 2015-08-239999-12-31

4 2015-08-19 2015-08-21 完成 2015-08-199999-12-31

5 2015-08-19 2015-08-20 支付 2015-08-192015-08-22

5 2015-08-19 2015-08-23 完成 2015-08-239999-12-31

6 2015-08-20 2015-08-20 创建 2015-08-202015-08-21

6 2015-08-20 2015-08-22 支付 2015-08-229999-12-31

7 2015-08-20 2015-08-21 支付 2015-08-209999-12-31
8 2015-08-21 2015-08-21 创建 2015-08-21 2015-08-21

8 2015-08-21 2015-08-22 支付 2015-08-22 2015-08-22

8 2015-08-21 2015-08-23 完成 2015-08-23 9999-12-31

9 2015-08-22 2015-08-22 创建 2015-08-229999-12-31

10 2015-08-22 2015-08-22 支付 2015-08-229999-12-31

11 2015-08-23 2015-08-23 创建 2015-08-239999-12-31

12 2015-08-23 2015-08-23 创建 2015-08-239999-12-31

13 2015-08-23 2015-08-23 支付 2015-08-239999-12-31

比如我们查看订单8, 可以发现订单8从2015-08-21-2015-08-23号,状态变化了三次(创建->支付->完成),因此历史表中有三条记录。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hive