您的位置:首页 > 其它

InAction-根据LBS数据手机用户移动轨迹

2014-11-19 13:05 239 查看
看了以后学了不少通信运营商关于用户数据记录的知识啊。

本来想从网上找真实数据集的,但是网上的数据不合这个DEMO的场景要求,于是用作者提供的python脚本生成一定数据量的数据来实践(当然,这些数据结构是简化了的)。

生成数据:



查看数据:



第一个是位置数据,是手机定期想基站通信报告情况产生的;第二个是上网数据,是手机上网产生的记录;

数据格式:



两种数据都共同包含了分析计算用到的IMSI、LOC、TIME。形成时间互补。

目标是根据数据分析计算指定日期指定时间分割里用户在不同基站的停留时间(我觉得这种计算结果没什么用,要说有用的话估计是一种分析用户下一个地点的预测应用的基础数据)

分析过程:

不同文件提取需要字段并转换时间格式的工具对象类:

package org.admln.LBS;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.Text;

/**
* @author admln
* 定义异常类
*
*/
class LineException extends Exception {

private static final long serialVersionUID = 1L;
int flag;
public LineException(String msg,int flag) {
super(msg);
this.flag = flag;
}
public int getFlag() {
return flag;
}
}

public class DataLine {
private String IMSI,LOC,TIME,TIMEFLAG;
private Date day;
private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static DataLine parser(String line,boolean source,String date,String[] timeFlag) throws LineException {
DataLine dataLine = new DataLine();
String[] lineSplit = line.split("\t");
if(source) {
dataLine.setIMSI(lineSplit[0]);
dataLine.setLOC(lineSplit[3]);
dataLine.setTIME(lineSplit[4]);
}else {
dataLine.setIMSI(lineSplit[0]);
dataLine.setLOC(lineSplit[2]);
dataLine.setTIME(lineSplit[3]);
}

//检查日期合法性
if(!dataLine.getTIME().startsWith(date)) {
throw new LineException("",-1);
}
try {
dataLine.setDay(dataLine.getFormatter().parse(dataLine.getTIME()));
}catch(ParseException e) {
throw new LineException("",0);
}

//计算所属时段
//        int hour = Integer.valueOf(dataLine.getTIME().split(" ")[1].split(":")[1]);
//        System.out.println(hour + " " + Integer.valueOf(timeFlag[0]));
//        for(int i=0;i<timeFlag.length;i++) {
//            if(i==0 && hour<Integer.valueOf(timeFlag[0])) {
//                dataLine.setTIMEFLAG("00-" + timeFlag[i]);
//                break;
//            }else if(hour>Integer.valueOf(timeFlag[timeFlag.length-1])){
//                throw new LineException("time is error",-1);
//            }else if(hour < Integer.valueOf(timeFlag[i])){
//                dataLine.setTIMEFLAG(timeFlag[i-1] + "-" + timeFlag[i]);
//                break;
//            }
//        }
int i = 0, n = timeFlag.length;
int hour = Integer.valueOf( dataLine.getTIME().split(" ")[1].split(":")[0] );
while ( i < n && Integer.valueOf( timeFlag[i] ) <= hour )
i++;
if ( i < n )
{
if ( i == 0 )
dataLine.setTIMEFLAG( "00-" + timeFlag[i] );
else
dataLine.setTIMEFLAG( timeFlag[i-1] + "-" + timeFlag[i] );
}
else                                     //Hour大于最大的时间点
throw new LineException("", -1);

return dataLine;
}

//输出KEY
public Text outKey() {
return new Text(this.IMSI + "|" + this.TIMEFLAG);
}

//输出VALUE
public Text outValue() {
long time = this.day.getTime()/1000L;
return new Text(this.LOC + "|" + String.valueOf(time));
}

//测试主函数
public static void main(String[] args) throws LineException {
//位置数据。                      IMSI          IMEI   UPDATETYPE LOC        TIME
String pos = "0000000000    0047483647    3    00000044    2013-09-12 00:09:17";
//网络数据.       IMSI          IMEI       LOC         TIME                URL
String net = "0000000000    0047483647    00000063    2013-09-12 00:20:14    www.baidu.com";
String[] str = {"09","17","24"};
DataLine temp = DataLine.parser("0000000000    0047483647    3    00000044    2013-09-12 00:15:17", true, "2013-09-12", str);
System.out.println(temp.getIMSI() + " " + temp.getLOC() + " " + temp.getTIME() + " " + temp.getTIMEFLAG());
System.out.println(temp.outKey());

}

public String getIMSI() {
return IMSI;
}

public void setIMSI(String iMSI) {
IMSI = iMSI;
}

public String getLOC() {
return LOC;
}

public void setLOC(String lOC) {
LOC = lOC;
}

public String getTIME() {
return TIME;
}

public void setTIME(String tIME) {
TIME = tIME;
}

public String getTIMEFLAG() {
return TIMEFLAG;
}

public void setTIMEFLAG(String tIMEFLAG) {
TIMEFLAG = tIMEFLAG;
}

public Date getDay() {
return day;
}

public void setDay(Date day) {
this.day = day;
}

public SimpleDateFormat getFormatter() {
return formatter;
}

public void setFormatter(SimpleDateFormat formatter) {
this.formatter = formatter;
}

}


主角MR:

package org.admln.LBS;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* @author admln
*
*/
public class BaseStationDataPreprocess {

/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter {
TIMESKIP,     //时间格式有误
OUTOFTIMESKIP,//时间不在参数指定的时间段内
LINESKIP,     //源文件行有误
USERSKIP      //某个用户某个时间段被整个放弃
}
public static class baseMapper extends Mapper<Object,Text,Text,Text>{
private String date;
private String[] timeFlag;
private boolean dataSource;
@Override
public void setup(Context context) throws IOException {
//取出指定日期和时间间隔
this.date = context.getConfiguration().get("date");
this.timeFlag = context.getConfiguration().get("timeFlag").split("-");

//提取文件名
FileSplit fs = (FileSplit) context.getInputSplit();
String fileName = fs.getPath().getName();
if(fileName.startsWith("POS")) {
this.dataSource = true;
}else if(fileName.startsWith("NET")) {
this.dataSource = false;
}else {
throw new IOException("file error");
}
}
/**
* MAP任务
* 读取基站数据
* 找出数据所对应时间段
* 以IMSI和时间段作为 KEY
* CGI和时间作为 VALUE
* @throws InterruptedException
* @throws IOException
*/
@Override
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
DataLine line;
try {
line = DataLine.parser(value.toString(), this.dataSource, this.date, this.timeFlag);
}catch(LineException e) {
if(e.getFlag()==-1) {
context.getCounter(Counter.OUTOFTIMESKIP).increment(1);
}else {
context.getCounter(Counter.TIMESKIP).increment(1);
}
return;
}catch(Exception e) {
context.getCounter(Counter.LINESKIP).increment(1);
return;
}

context.write(line.outKey(), line.outValue());
}
}
/**
* 统计同一个IMSI在同一时间段
* 在不同CGI停留的时长
*/
public static class baseReducer extends Reducer<Text,Text,NullWritable,Text> {
private String date;
private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void setup(Context context) {
this.date = context.getConfiguration().get("date");
}
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
String imsi = key.toString().split("\\|")[0];
String timeFlag = key.toString().split("\\|")[1];

TreeMap<Long,String> tree = new TreeMap<Long,String>();
String treeKey,treeValue;
for(Text val : values) {
treeKey = val.toString().split("\\|")[1];
treeValue = val.toString().split("\\|")[0];
try {
tree.put(Long.valueOf(treeKey),treeValue);
}catch (NumberFormatException e) {
context.getCounter(Counter.TIMESKIP).increment(1);
continue;
}
}
try {
tree.put((formatter.parse(this.date + " " + timeFlag.split("-")[1] + ":00:00").getTime()/1000L), "OFF");
HashMap<String, Float> locs = getStayTime(tree);

//输出
for( Entry<String, Float> entry : locs.entrySet() )
{
StringBuilder sb = new StringBuilder();
sb.append(imsi).append("|");
sb.append(entry.getKey()).append("|");
sb.append(timeFlag).append("|");
sb.append(entry.getValue());

context.write( NullWritable.get(), new Text(sb.toString()) );
}
} catch (ParseException e) {
e.printStackTrace();
}

}
//获取位置停留时间方法
private HashMap<String,Float> getStayTime(TreeMap<Long,String> tree) {
HashMap<String,Float> hashMap = new HashMap<String,Float>();
Entry<Long,String> branch,nextBranch;
Iterator<Entry<Long,String>> it = tree.entrySet().iterator();
branch = it.next();
while(it.hasNext()) {
nextBranch = it.next();
float stay = (nextBranch.getKey()-branch.getKey())/60.0f;
if(stay<60.0) {
if(hashMap.containsKey(branch.getValue())) {
hashMap.put(branch.getValue(),(float)(Math.round((hashMap.get(branch.getValue())+stay)*100))/100);
}else {
hashMap.put(branch.getValue(), (float)(Math.round(stay*100))/100);
}
}
branch = nextBranch;
}
return hashMap;
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
Path input = new Path("hdfs://hadoop:8020/input/baseStation/");
Path output = new Path("hdfs://hadoop:8020/output/baseStation/");

Configuration conf = new Configuration();

//把输入参数中的日期和一天内的时间间隔标志传入配置对象
conf.set("date", args[1]);
conf.set("timeFlag", args[2]);

Job job = new Job(conf,"BaseStationDataPreprocess");

job.setJarByClass(BaseStationDataPreprocess.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(baseMapper.class);
job.setReducerClass(baseReducer.class);

FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);

System.exit(job.waitForCompletion(true)?0:1);

}

}


计算结果:



(环境是centos6.4;hadoop2.2.0;JDK1.7)

我觉得作者的做法也不是最好的,作者肯定没有透露所有细节,肯定有更好的方法去处理误差。比如记录中第一时间和时间段的起始时间的时间差。

这个视频留了作业,关于作业1就是上面的过程。至于作业二是要求计算出每个时间段中停留时间最长的三个基站并输出。

这其实就是把每个基站的输出再根据停留时间算一个topK。

我觉得自己需要温故和补习的知识点:

1.自定义异常

好处是自己可以定义异常中的各种动作以迎合自己程序的需要。(类java.lang.Throwable是所有异常类的基类,它包括两个子类:Exception和Error,Exception类用于描述程序能够捕获的异常,如ClassNotFoundException;自定义异常类可以继承Throwable类或者Exception,而不要继承Error类。自定义异常类之间也可以有继承关系。)需要为自定义异常类设计构造方法,以方便构造自定义异常对象。

2.setup方法

3.关于竖线的转义,不转义士没有语法错误的,但是实际错误。以及转义的方式,以及其他需要转移的字符也要注意,如括号、单引号等。

特别佩服作者,不愧参加了那么多的实践项目,对数据的异常处理的很细致很全面也特别有利于事后分析修正。这是我最受用的一点。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: