您的位置:首页 > 其它

基于storm的实时GPS数据客流特征分析系统 源码分析之(一):GPSReceiverSpout

2015-01-17 22:33 627 查看
先介绍背景再贴源码:
项目背景:实时GPS数据客流特征分析系统,项目来源于深圳交委,数据来源是深圳大约5万两出租车和公交车的车载GPS仪,其目的是要研究出行者的出行特征、实时路况、客流特征等。

开发环境:请参考:storm的开发环境部署配置教程


函数详解:

最重要的两个函数是nextTuple()和declareOutputFields(OutputFieldsDeclarer
declarer)。

nextTuple()告诉storm下一个tuple是什么内容,其处理过程是先用一个socket函数接受来自网络的实时GPS数据,用lineSplit()将GPS以逗号分隔成字符串数组,发送给下一个处理单元
DistrictMatchingBolt。
例如一条原始GPS记录:粤BXXXXX,114.121765,22.569218,2013-02-0817:29:58,1065382,28,101,0,蓝色

_collector.emit(new
Values(GPSRecord[0],GPSRecord[3],GPSRecord[7],GPSRecord[5],GPSRecord[6]
,
GPSRecord[2],GPSRecord[1]))
这一条语句则提取了GPS记录中的第0、3、7、5、6、2、1列字符串发送给下一个处理单元。

declareOutputFields()告诉下一个处理单元DistrictMatchingBolt:
spout的输出数据即DistrictMatchingBolt的输入数据格式的列数和内容,即:"vehicle_number","date_time","occupied","speed","bearing","lantitude","longitude"
共7列。


public
class
GPSReceiverSpout
implements
IRichSpout
{

private
static
final
long
serialVersionUID
=
1L;

private
SpoutOutputCollector
_collector;

private
BufferedReader
fileReader;

//privateTopologyContextcontext;

//privateStringfile="/home/ghchen/2013-01-05.1/2013-01-05--11_05_48.txt";

private
TupleInfotupleInfo=new
TupleInfo();

 static
Socketsock=null;

@Override

public
void
close()
{

}

@Override

public
void
open(Map
conf,
TopologyContext
context,
SpoutOutputCollector
collector)

{

_collector
=
collector;

System.out.println("This
isopenfunctioninFieldSpout!");

}

@SuppressWarnings("unused")

@Override

public
void
nextTuple()
{

  int
count=0;

int
ch=0;

int
err=0;

try
{

if(sock==null){

sock=new
Socket("172.20.14.XXX",portXXXX);}

while(true){

byte[]
b3=
new
byte[3];

if(sock!=null
){

try{

sock.getInputStream().read(b3,0,3);

ch=b3[0];

}catch
(
Exceptione){

System.out.println("connection
reset,reconnecting...");

sock.close();

Thread.sleep(100);

sock=new
Socket("172.20.14.XXX",portXXXX);;

}

}else{

sock=new
Socket("172.20.14.XXX",portXXXX);;

break
;

}

int
len=SocketJava.bytesToShort(b3,
1);

if(len<0)
break;

byte[]
bytelen=
new
byte[len];

sock.getInputStream().read(bytelen);

if(bytelen==null){

System.out.println("read
thesecondpartfrombytefromsocketfailed!");

break;

}

sock.getInputStream().markSupported();

sock.getInputStream().mark(3);

String
gpsString=SocketJava.DissectOneMessage(ch,bytelen);

   String[]
GPSRecord=null;

if(gpsString!=null){

GPSRecord
=gpsString.split(TupleInfo.getDelimiter());

_collector.emit(new
Values(GPSRecord[0],GPSRecord[3],GPSRecord[7],GPSRecord[5],

GPSRecord[6]
,
GPSRecord[2],GPSRecord[1]));

//}

}else{

break;

}

}

}
catch
(IOException
e)
{

  e.printStackTrace();

}
catch
(Exception
e)
{

  e.printStackTrace();

}

}

@Override

public
void
ack(Object
id)
{

System.out.println("OK:"+id);

}

@Override

public
void
fail(Object
id)
{

System.out.println("Fail:"+id);

}

@Override

public
void
declareOutputFields(OutputFieldsDeclarer
declarer)
{

TupleInfo
tuple=
new
TupleInfo();

Fields
fieldsArr;

try
{

fieldsArr=
tuple.getFieldList();

declarer.declare(fieldsArr);

}
catch
(Exception
e)
{

throw
new
RuntimeException("error:fail
tonewTupleobjectindeclareOutputFields,tupleisnull",e);

}

}

@Override

public
void
activate()
{

}

@Override

public
void
deactivate()
{

}

@Override

public
Map<String,
Object>
getComponentConfiguration()
{

return
null;

}

static
int
count=0;

public
static
void
writeToFile(String
fileName,
Objectobj){

try
{

FileWriter
fwriter;

fwriter=
new
FileWriter(fileName,true);

BufferedWriter
writer=
new
BufferedWriter(fwriter);

writer.write(obj.toString());

writer.close();

}
catch
(IOException
e1)
{

e1.printStackTrace();

}

}
 
}


相关博文:

storm单机版和集群版安装配置过程


使用Storm实现实时大数据分析实例

storm的开发环境部署配置教程


基于storm的实时GPS数据客流特征分析系统
源码分析之(一)


基于storm的实时GPS数据客流特征分析系统
源码分析之(二
)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: