基于storm的实时GPS数据客流特征分析系统 源码分析之(一):GPSReceiverSpout
2015-01-17 22:33
627 查看
先介绍背景再贴源码:
项目背景:实时GPS数据客流特征分析系统,项目来源于深圳交委,数据来源是深圳大约5万两出租车和公交车的车载GPS仪,其目的是要研究出行者的出行特征、实时路况、客流特征等。
开发环境:请参考:storm的开发环境部署配置教程
函数详解:
最重要的两个函数是nextTuple()和declareOutputFields(OutputFieldsDeclarer
declarer)。
nextTuple()告诉storm下一个tuple是什么内容,其处理过程是先用一个socket函数接受来自网络的实时GPS数据,用lineSplit()将GPS以逗号分隔成字符串数组,发送给下一个处理单元
DistrictMatchingBolt。
例如一条原始GPS记录:粤BXXXXX,114.121765,22.569218,2013-02-0817:29:58,1065382,28,101,0,蓝色
_collector.emit(new
Values(GPSRecord[0],GPSRecord[3],GPSRecord[7],GPSRecord[5],GPSRecord[6]
,
GPSRecord[2],GPSRecord[1]))
这一条语句则提取了GPS记录中的第0、3、7、5、6、2、1列字符串发送给下一个处理单元。
declareOutputFields()告诉下一个处理单元DistrictMatchingBolt:
spout的输出数据即DistrictMatchingBolt的输入数据格式的列数和内容,即:"vehicle_number","date_time","occupied","speed","bearing","lantitude","longitude"
共7列。
public
class
GPSReceiverSpout
implements
IRichSpout
{
private
static
final
long
serialVersionUID
=
1L;
private
SpoutOutputCollector
_collector;
private
BufferedReader
fileReader;
//privateTopologyContextcontext;
//privateStringfile="/home/ghchen/2013-01-05.1/2013-01-05--11_05_48.txt";
private
TupleInfotupleInfo=new
TupleInfo();
static
Socketsock=null;
@Override
public
void
close()
{
}
@Override
public
void
open(Map
conf,
TopologyContext
context,
SpoutOutputCollector
collector)
{
_collector
=
collector;
System.out.println("This
isopenfunctioninFieldSpout!");
}
@SuppressWarnings("unused")
@Override
public
void
nextTuple()
{
int
count=0;
int
ch=0;
int
err=0;
try
{
if(sock==null){
sock=new
Socket("172.20.14.XXX",portXXXX);}
while(true){
byte[]
b3=
new
byte[3];
if(sock!=null
){
try{
sock.getInputStream().read(b3,0,3);
ch=b3[0];
}catch
(
Exceptione){
System.out.println("connection
reset,reconnecting...");
sock.close();
Thread.sleep(100);
sock=new
Socket("172.20.14.XXX",portXXXX);;
}
}else{
sock=new
Socket("172.20.14.XXX",portXXXX);;
break
;
}
int
len=SocketJava.bytesToShort(b3,
1);
if(len<0)
break;
byte[]
bytelen=
new
byte[len];
sock.getInputStream().read(bytelen);
if(bytelen==null){
System.out.println("read
thesecondpartfrombytefromsocketfailed!");
break;
}
sock.getInputStream().markSupported();
sock.getInputStream().mark(3);
String
gpsString=SocketJava.DissectOneMessage(ch,bytelen);
String[]
GPSRecord=null;
if(gpsString!=null){
GPSRecord
=gpsString.split(TupleInfo.getDelimiter());
_collector.emit(new
Values(GPSRecord[0],GPSRecord[3],GPSRecord[7],GPSRecord[5],
GPSRecord[6]
,
GPSRecord[2],GPSRecord[1]));
//}
}else{
break;
}
}
}
catch
(IOException
e)
{
e.printStackTrace();
}
catch
(Exception
e)
{
e.printStackTrace();
}
}
@Override
public
void
ack(Object
id)
{
System.out.println("OK:"+id);
}
@Override
public
void
fail(Object
id)
{
System.out.println("Fail:"+id);
}
@Override
public
void
declareOutputFields(OutputFieldsDeclarer
declarer)
{
TupleInfo
tuple=
new
TupleInfo();
Fields
fieldsArr;
try
{
fieldsArr=
tuple.getFieldList();
declarer.declare(fieldsArr);
}
catch
(Exception
e)
{
throw
new
RuntimeException("error:fail
tonewTupleobjectindeclareOutputFields,tupleisnull",e);
}
}
@Override
public
void
activate()
{
}
@Override
public
void
deactivate()
{
}
@Override
public
Map<String,
Object>
getComponentConfiguration()
{
return
null;
}
static
int
count=0;
public
static
void
writeToFile(String
fileName,
Objectobj){
try
{
FileWriter
fwriter;
fwriter=
new
FileWriter(fileName,true);
BufferedWriter
writer=
new
BufferedWriter(fwriter);
writer.write(obj.toString());
writer.close();
}
catch
(IOException
e1)
{
e1.printStackTrace();
}
}
}
相关博文:
storm单机版和集群版安装配置过程
使用Storm实现实时大数据分析实例
storm的开发环境部署配置教程
基于storm的实时GPS数据客流特征分析系统
源码分析之(一)
基于storm的实时GPS数据客流特征分析系统
源码分析之(二)
项目背景:实时GPS数据客流特征分析系统,项目来源于深圳交委,数据来源是深圳大约5万两出租车和公交车的车载GPS仪,其目的是要研究出行者的出行特征、实时路况、客流特征等。
开发环境:请参考:
函数详解:
最重要的两个函数是nextTuple()和declareOutputFields(OutputFieldsDeclarer
declarer)。
nextTuple()告诉storm下一个tuple是什么内容,其处理过程是先用一个socket函数接受来自网络的实时GPS数据,用lineSplit()将GPS以逗号分隔成字符串数组,发送给下一个处理单元
DistrictMatchingBolt。
例如一条原始GPS记录:粤BXXXXX,114.121765,22.569218,2013-02-0817:29:58,1065382,28,101,0,蓝色
_collector.emit(new
Values(GPSRecord[0],GPSRecord[3],GPSRecord[7],GPSRecord[5],GPSRecord[6]
,
GPSRecord[2],GPSRecord[1]))
这一条语句则提取了GPS记录中的第0、3、7、5、6、2、1列字符串发送给下一个处理单元。
declareOutputFields()告诉下一个处理单元DistrictMatchingBolt:
spout的输出数据即DistrictMatchingBolt的输入数据格式的列数和内容,即:"vehicle_number","date_time","occupied","speed","bearing","lantitude","longitude"
共7列。
public
class
GPSReceiverSpout
implements
IRichSpout
{
private
static
final
long
serialVersionUID
=
1L;
private
SpoutOutputCollector
_collector;
private
BufferedReader
fileReader;
//privateTopologyContextcontext;
//privateStringfile="/home/ghchen/2013-01-05.1/2013-01-05--11_05_48.txt";
private
TupleInfotupleInfo=new
TupleInfo();
static
Socketsock=null;
@Override
public
void
close()
{
}
@Override
public
void
open(Map
conf,
TopologyContext
context,
SpoutOutputCollector
collector)
{
_collector
=
collector;
System.out.println("This
isopenfunctioninFieldSpout!");
}
@SuppressWarnings("unused")
@Override
public
void
nextTuple()
{
int
count=0;
int
ch=0;
int
err=0;
try
{
if(sock==null){
sock=new
Socket("172.20.14.XXX",portXXXX);}
while(true){
byte[]
b3=
new
byte[3];
if(sock!=null
){
try{
sock.getInputStream().read(b3,0,3);
ch=b3[0];
}catch
(
Exceptione){
System.out.println("connection
reset,reconnecting...");
sock.close();
Thread.sleep(100);
sock=new
Socket("172.20.14.XXX",portXXXX);;
}
}else{
sock=new
Socket("172.20.14.XXX",portXXXX);;
break
;
}
int
len=SocketJava.bytesToShort(b3,
1);
if(len<0)
break;
byte[]
bytelen=
new
byte[len];
sock.getInputStream().read(bytelen);
if(bytelen==null){
System.out.println("read
thesecondpartfrombytefromsocketfailed!");
break;
}
sock.getInputStream().markSupported();
sock.getInputStream().mark(3);
String
gpsString=SocketJava.DissectOneMessage(ch,bytelen);
String[]
GPSRecord=null;
if(gpsString!=null){
GPSRecord
=gpsString.split(TupleInfo.getDelimiter());
_collector.emit(new
Values(GPSRecord[0],GPSRecord[3],GPSRecord[7],GPSRecord[5],
GPSRecord[6]
,
GPSRecord[2],GPSRecord[1]));
//}
}else{
break;
}
}
}
catch
(IOException
e)
{
e.printStackTrace();
}
catch
(Exception
e)
{
e.printStackTrace();
}
}
@Override
public
void
ack(Object
id)
{
System.out.println("OK:"+id);
}
@Override
public
void
fail(Object
id)
{
System.out.println("Fail:"+id);
}
@Override
public
void
declareOutputFields(OutputFieldsDeclarer
declarer)
{
TupleInfo
tuple=
new
TupleInfo();
Fields
fieldsArr;
try
{
fieldsArr=
tuple.getFieldList();
declarer.declare(fieldsArr);
}
catch
(Exception
e)
{
throw
new
RuntimeException("error:fail
tonewTupleobjectindeclareOutputFields,tupleisnull",e);
}
}
@Override
public
void
activate()
{
}
@Override
public
void
deactivate()
{
}
@Override
public
Map<String,
Object>
getComponentConfiguration()
{
return
null;
}
static
int
count=0;
public
static
void
writeToFile(String
fileName,
Objectobj){
try
{
FileWriter
fwriter;
fwriter=
new
FileWriter(fileName,true);
BufferedWriter
writer=
new
BufferedWriter(fwriter);
writer.write(obj.toString());
writer.close();
}
catch
(IOException
e1)
{
e1.printStackTrace();
}
}
}
相关博文:
基于storm的实时GPS数据客流特征分析系统
源码分析之(一)
基于storm的实时GPS数据客流特征分析系统
源码分析之(二
相关文章推荐
- 基于storm的实时GPS数据客流特征分析系统 源码分析之(一):GPSReceiverSpout
- 基于storm的实时GPS数据客流特征分析系统 源码分析之(二):DistrictMatchingBolt
- 使用storm实现实时GPS数据客流特征分析系统:源码分析
- 实时Hadoop实战篇:基于Storm实时路况分析和实时路径推荐系统
- Twitter Storm 实时数据处理框架分析总结
- 基于ArcGIS10.0和Oracle10g的空间数据管理平台(C#开发)-系统需求分析
- 【Android】GPS启动流程及数据流向分析(基于2.3.5)
- 基于GIS的实时雨水情分析评价系统设计
- 基于GPSG语法的汉语句法分析系统的设计
- Twitter Storm 实时数据处理框架分析总结
- 一种实时多维数据的分析及同步系统
- 基于用户行为的数据分析与挖掘+分布式日志管理系统
- 基于数据字典的通用查询系统(一)SQL语句的构成分析
- Twitter 数据实时分析处理攻击 Storm
- 人人网数据服务平台:基于日志分析的数据系统架构-严岩
- Twitter利用Storm系统处理实时大数据
- 基于WindowsCE的GPS数据导航系统
- 基于数据仓库和维度转换技术的广东电信公话IC话机话务动态分析系统
- 基于GPS gps_qemu.c的数据上报过程分析
- Twitter Storm 实时数据处理框架分析总结