pig 分析 脚本
2015-09-09 22:38
253 查看
--读取数据
data = LOAD '/user/mapred/PigData.txt' USING PigStorage('|') AS ( imsi:chararray,time:chararray,loc:chararray);
--转换格式
REGISTER /home/mapred/software/hadoops/pig/pig-0.11.1/contrib/piggybank/java/piggybank.jar;
REGISTER /home/mapred/practise/joda-time-2.0.jar;
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
toISO = FOREACH data GENERATE imsi, CustomFormatToISO( SUBSTRING(time,0,13),'YYYY-MM-dd HH') AS time:chararray,loc;
--数据分组
grp = GROUP toISO BY imsi;
--连续获取数据
REGISTER /home/mapred/practise/datafu-1.2.0.jar
DEFINE MarkovPairs datafu.pig.stats.MarkovPairs();
pairs = FOREACH grp
{
sorted = ORDER toISO BY time;
pair = MarkovPairs(sorted);
GENERATE FLATTEN(pair) AS (data:tuple(imsi,time,loc),next:tuple(imsi,time,loc) );
}
--展开数据
prj = FOREACH pairs GENERATE data.imsi AS imsi,data.time AS time,next.time AS next_time,data.loc AS loc,next.loc AS next_loc;
DEFINE ISODaysBetween org.apache.pig.piggybank.evaluation.datetime.diff.ISODaysBetween();
flt = FILTER prj BY ISODaysBetween(next_time, time) == 0L;
--计算每一个位置的总数
total_count = FOREACH (GROUP flt BY loc) GENERATE group AS loc,COUNT(flt) AS total;
--计算每一对位置的数目
pairs_count = FOREACH (GROUP flt by (loc,next_loc) ) GENERATE FLATTEN(group) AS (loc,next_loc),COUNT(flt) AS cnt;
jnd = JOIN pairs_count BY loc,total_count BY loc USING 'replicated';
prob = FOREACH jnd GENERATE pairs_count::loc AS loc, pairs_count::next_loc AS next_loc,(double)cnt/(double)total AS probability;
top3 = FOREACH (GROUP prob BY loc)
{
sorted = ORDER prob BY probability DESC;
top = LIMIT sorted 3;
GENERATE FLATTEN(top);
};
STORE top3 INTO 'output';
cat output;
data = LOAD '/user/mapred/PigData.txt' USING PigStorage('|') AS ( imsi:chararray,time:chararray,loc:chararray);
--转换格式
REGISTER /home/mapred/software/hadoops/pig/pig-0.11.1/contrib/piggybank/java/piggybank.jar;
REGISTER /home/mapred/practise/joda-time-2.0.jar;
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
toISO = FOREACH data GENERATE imsi, CustomFormatToISO( SUBSTRING(time,0,13),'YYYY-MM-dd HH') AS time:chararray,loc;
--数据分组
grp = GROUP toISO BY imsi;
--连续获取数据
REGISTER /home/mapred/practise/datafu-1.2.0.jar
DEFINE MarkovPairs datafu.pig.stats.MarkovPairs();
pairs = FOREACH grp
{
sorted = ORDER toISO BY time;
pair = MarkovPairs(sorted);
GENERATE FLATTEN(pair) AS (data:tuple(imsi,time,loc),next:tuple(imsi,time,loc) );
}
--展开数据
prj = FOREACH pairs GENERATE data.imsi AS imsi,data.time AS time,next.time AS next_time,data.loc AS loc,next.loc AS next_loc;
DEFINE ISODaysBetween org.apache.pig.piggybank.evaluation.datetime.diff.ISODaysBetween();
flt = FILTER prj BY ISODaysBetween(next_time, time) == 0L;
--计算每一个位置的总数
total_count = FOREACH (GROUP flt BY loc) GENERATE group AS loc,COUNT(flt) AS total;
--计算每一对位置的数目
pairs_count = FOREACH (GROUP flt by (loc,next_loc) ) GENERATE FLATTEN(group) AS (loc,next_loc),COUNT(flt) AS cnt;
jnd = JOIN pairs_count BY loc,total_count BY loc USING 'replicated';
prob = FOREACH jnd GENERATE pairs_count::loc AS loc, pairs_count::next_loc AS next_loc,(double)cnt/(double)total AS probability;
top3 = FOREACH (GROUP prob BY loc)
{
sorted = ORDER prob BY probability DESC;
top = LIMIT sorted 3;
GENERATE FLATTEN(top);
};
STORE top3 INTO 'output';
cat output;
相关文章推荐
- 思维导图学习一
- 汉诺塔问题探讨
- IG&IC&Pra 闪退的主要原因及解决方法
- 对数据库语言的简单认识
- oh-my-zsh配置你的zsh提高shell逼格终极选择
- 应该有一个自己的作品
- Axure与MarkMa的学习
- Linux.NET学习手记(2)
- linux之网络管理命令
- 杂谈:360 、 酷派 一次联婚,一个爱情故事
- 三种方法:一个简易的员工管理系统(一)
- 0909学习操作系统
- JavaScript词法分析过程
- ccf 201412-3 集合竞价
- 各种编程语言下字符串分割及foreach遍历对比
- Linux.NET学习手记(1)
- 理解Activity Fragment生命周期
- Android中自建应用调用系统联系人并返回结果
- const
- JAVA随笔(二)