您的位置:首页 > 数据库 > MySQL

R语言与mysql结合处理交通数据及其算法优化

2015-06-11 23:47 423 查看
一、序言

交通数据处理是智能交通的一个很关键的要素,更好的分析交通数据,可以为市政管理、交通信号管制、道路规划、交通设施建设提供更好的咨询和建议。全国各地政府都在寄期望于智能交通,以缓解城市拥堵,甚至一定程度上解决大城市病或者说是市政建设滞后的问题。同时,诸如百度地图、谷歌地图、高德地图、微软地图都推出了相应的交通应用,以期找到更大的商机。

用好的存储方法和好的算法进行分析,在批处理方面可以更多的分析历史数据,分析和发现问题,为未来进行预测以及公共查询服务;在实时计算方面可以更多的进行交通监控、突发事件处理、甚至是罪犯跟踪。

因此,寻求好的存储策略,好的计算算法,成为非常必要解决的问题。

二、数据概述及存储

目前交通数据有人流量数据、汽车数据。前者对于有大规模地铁,公共交通的城市十分有用,如北京,上海,其必要前提是能通过设备采集到人流信息,对于城市管理者而言,一卡通是最好的工具。因此,在北京,交通一卡通成为了监测地铁,公交车客流信息的主要采集工具,客流信息的采集准确性也相当高,信息数据的格式也容易控制。而后者,目前主要是通过摄像头拍照采集,存在图像识别度不高,设备故障影响,数据格式半结构化,数据缺失(部分信息没有采集到),脏数据存在(拍到人,自行车)。

对于贵阳这样的三线城市,一卡通还没有普及使用,故而监测公交客流人流信息存在诸多困难。但车流信息由于平安城市的建设,变得各个交通路口存在大量的摄像头以采集数据,从而为车流信息分析提供了可能。

对于贵阳车流信息数据,存储的格式是以日志形式传输到文件服务器,并压缩存储,每天大约有1千万条数据。想要处理这些文本,要分析其格式,并转存于数据库中。

在大数据流行的今天,最好的存储和处理方式当然是放在hdfs上,并用相应的nosql分布式数据库进行管理。但作为初步研究使用,分析的内容没有那么多,财力也没有那么大,所以找了一台小型机,16G的内存,CPU8*2.3MHZ。应用的数据信息也只有时间,拍摄地点,车牌号三个字段,故经过解析文本,直接将者三个字段存储到了mysql中,以便处理。

三、数据处理的目标

初步处理数据的目标有三个:

计算各个路口各个时段的车流量,以五分钟为间隔;
计算出城市交通通行有向图;
计算出各个路段各个时段的车流量,以五分钟为间隔;
根据结果1,画出城市车流信息可视化动态图(全部车流,公交出租车流、本地车流、外地车流,外城车流,柱状图);
根据结果1,结合gis画出,城市车流可视化图(热力图,柱状图);
根据结果2,3,画出交通流可视化图;
根据结果1,2,3,对第二天车流拥堵信息进行预测,给出绕行建议。

四、计算过程

目标1,可以通过sql语句很容易达到;目标2,在前一篇《R语言空间换时间算法、Hash键值对在字符串处理中的应用》已经完成;本篇主要解决目标3。

解决目标3的主要思想是根据同一辆车的运行轨迹,以经过路口为节点,按照到达某一路段的时间,并归结到划分的时间段内,并计数到该路段的某一时刻中。

直接上代码,以下是原始版本,其思想借鉴的前篇博文的算法思想,以空间换时间,遍历900多万条数据,只需要几个小时。

rm(list=ls(all=TRUE))
gc()
library(RODBC)
library(hash)

# 读取文件中排序好的路口地址数据
address_file <-file("/home/wanglinlin/transport/address.txt","r")
sorted_address <-readLines(address_file)
sorted_address_hash_pairs<-hash(sorted_address,1:269)
close(address_file)

#矩阵的列表示形式
transection_code_length <-length(sorted_address)*length(sorted_address)
transection_code_matrix <- matrix(0,transection_code_length,1)
k=1;
for(i in 1:269){
for(j in 1:269){
transection_code_matrix[k]<- i*1000+j
k=k+1
}
}
transection_code_hash<-hash(transection_code_matrix,1:transection_code_length)
#transection_code_hash

time<-file("/home/wanglinlin/transport/time.txt","r")
traffic_time<-readLines(time)
close(time)
length(traffic_time)
traffic_time_hash<-hash(traffic_time,1:length(traffic_time))
traffic_time_hash[["1409501100"]]
trafic_flow <- matrix(0,nrow=length(traffic_time),ncol=transection_code_length)

channel=odbcConnect("transport-connector-R", uid="transport", pwd="transport") #连接mysql test 数据库
sqlTables(channel) # 显示test数据库中的表格
trafic_flow_data<-sqlQuery(channel,"select unix_timestamp(time),plate,address from transport20140901 order by plate,time")
trafic_flow_count<-sqlQuery(channel,"select count(*) from transport20140901")
trafic_flow_data<-as.matrix(trafic_flow_data)

#找出所有车牌号,并散列化,形成键值对表
plates<-sqlQuery(channel,"select distinct plate from transport20140901")
odbcClose(channel)
plate_list=(as.matrix(plates))[,1]
plate_count=length(plate_list)
plate_hash_pairs=hash(plate_list,1:plate_count)

#各路段车流计数
transection_code_hash[[toString(sorted_address_hash_pairs[[trafic_flow_data[1000,3]]] * 1000 +sorted_address_hash_pairs[[trafic_flow_data[1001,3]]])]]
trafic_flow_count_number<-as.numeric(trafic_flow_count)-1
for(i in 1:trafic_flow_count_number){
if(plate_hash_pairs[[trafic_flow_data[i,2]]]==plate_hash_pairs[[trafic_flow_data[i+1,2]]]){
start_time_stamp <- as.numeric(trafic_flow_data[i,1])-as.numeric(trafic_flow_data[i,1])%%300
count_times <- ceiling((as.numeric(trafic_flow_data[i+1,1])-start_time_stamp)/300)
col_index_code <- sorted_address_hash_pairs[[trafic_flow_data[i,3]]] * 1000 +sorted_address_hash_pairs[[trafic_flow_data[i+1,3]]]
col_index <- transection_code_hash[[toString(col_index_code)]]
for(j in 1:count_times){
timestamp <- start_time_stamp + 300 * j
row_index <- traffic_time_hash[[toString(timestamp)]]
trafic_flow[row_index,col_index] <- trafic_flow[row_index,col_index] + 1
}
}
}
write.table(trafic_flow,"/home/wanglinlin/transport/trafic_flow.txt",row.names = FALSE,col.names = FALSE)

#把trafic_flow中的数据读取第一个五分钟车流量到矩阵中
five_minues_trafic_flow <- matrix(0,269,269)
for(i in 1:269){
for (j in 1:269){
five_minues_trafic_flow[i,j]=trafic_flow[1,transection_code_hash[[toString(i*1000+j)]]]
}
}
write.table(five_minues_trafic_flow,"/home/wanglinlin/transport/five_minues_trafic_flow.txt",row.names = FALSE,col.names = FALSE)上述代码中,最终的各路段车流信息存储于一个矩阵中,每行代表所有路段的一个时间段的车流信息。借助了对路口地址的编码和hash散列。建立编码和散列的过程一方面消耗了时间,另一方面也增加了内存开销。除此之外,将某一时段的车流信息以正规的方式显示,任然依赖于编码和散列,对后续不利。故可进一步优化这一部分,以三维数组存储车流信息,x,y平面刻画各路口的信息,z轴刻画时间变化。代码如下:
rm(list=ls(all=TRUE))
gc()
library(RODBC)
library(hash)

# 读取文件中按照拼音排序好的路口地址数据
address_file <-file("/home/wanglinlin/transport/address.txt","r")
sorted_address <-readLines(address_file)

sorted_address_hash_pairs<-hash(sorted_address,1:269) #生成路口地址hash键值对,便于通过路口地址名称找序号
close(address_file)
#sorted_address
#class(sorted_address_hash_pairs[["黔灵西路(合群路口-威清路口)"]])

time<-file("/home/wanglinlin/transport/time.txt","r")
traffic_time<-readLines(time)
close(time)
length(traffic_time)
traffic_time_hash<-hash(traffic_time,1:length(traffic_time))
traffic_time_hash[["1409501100"]]

channel=odbcConnect("transport-connector-R", uid="transport", pwd="transport") #连接mysql test 数据库
sqlTables(channel) # 显示test数据库中的表格
trafic_flow_data<-sqlQuery(channel,"select unix_timestamp(time),plate,address from transport20140901 order by plate,time")
trafic_flow_count<-sqlQuery(channel,"select count(*) from transport20140901")
trafic_flow_data<-as.matrix(trafic_flow_data)

#找出所有车牌号,并散列化,形成键值对表
plates<-sqlQuery(channel,"select distinct plate fro
aabf
m transport20140901")
odbcClose(channel)
plate_list=(as.matrix(plates))[,1]
plate_count=length(plate_list)
plate_hash_pairs=hash(plate_list,1:plate_count)

trafic_flow <- array(0, dim=c(269,269,length(traffic_time)))
trafic_flow_count_number<-as.numeric(trafic_flow_count)-1
for(i in 1:trafic_flow_count_number){
if(plate_hash_pairs[[trafic_flow_data[i,2]]]==plate_hash_pairs[[trafic_flow_data[i+1,2]]]){
start_time_stamp <- as.numeric(trafic_flow_data[i,1])-as.numeric(trafic_flow_data[i,1])%%300
count_times <- ceiling((as.numeric(trafic_flow_data[i+1,1])-start_time_stamp)/300)
x_index <- sorted_address_hash_pairs[[trafic_flow_data[i,3]]]
y_index <- sorted_address_hash_pairs[[trafic_flow_data[i+1,3]]]
for(j in 1:count_times){
timestamp <- start_time_stamp + 300 * j
z_index <- traffic_time_hash[[toString(timestamp)]]
trafic_flow[x_index,y_index,z_index] <- trafic_flow[x_index,y_index,z_index] + 1
}
}
}
write.table(trafic_flow,"/home/wanglinlin/transport/trafic_flow_roads.txt",row.names = FALSE,col.names = FALSE)

#把trafic_flow中的数据读取第一个五分钟车流量到矩阵中
#write.table(trafic_flow[,,1],"/home/wanglinlin/transport/five_minues_trafic_flow.txt",row.names = FALSE,col.names = FALSE)

上述代码,解决了前述的问题,但时间戳还用到了hash,这个hash也可以去掉,并将其统一化,用函数的方式将功能改的更加通用化,代码还需进一步优化。
可以看到的是,算法通过优化,具备了更好的额性能,只有通过不断优化,才会更好。

五、总结

解决了一个计算目标,下次也算迈出了新的一步,如果有对算法的最终优化版,到时候再贴上来。

==================================关于原创的声明==================================

本博主的所有原创文章,非本人书面授权,严禁转载。

可以应用于一切非商用的内部学习和讨论研究。

欢迎通过留言或email进行广泛交流。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息