您的位置:首页 > 运维架构

GIS Hadoop 开发案例 (gis-tools-for-hadoop)

2015-04-13 17:32 302 查看

1、开发环境

1)软件

     Hadoop 2.4.0  +  eclipse 3.7.0  + gis-tools-for-hadoop-master

2)数据

    


2、引入Esri Geometry API JAR包



3、新建mapper类

package esri.hadoop.text;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.esri.core.geometry.Envelope;
import com.esri.core.geometry.Envelope2D;
import com.esri.core.geometry.GeometryEngine;
import com.esri.core.geometry.Point;
import com.esri.core.geometry.QuadTree;
import com.esri.core.geometry.QuadTree.QuadTreeIterator;
import com.esri.core.geometry.SpatialReference;
import com.esri.json.EsriFeatureClass;

public class MapperClass extends Mapper<LongWritable, Text, Text, IntWritable> {

int longitudeIndex;
int latitudeIndex;
String labelAttribute;
EsriFeatureClass featureClass;
SpatialReference spatialReference;
QuadTree quadTree;
QuadTreeIterator quadTreeIter;
//创建四叉树
private void buildQuadTree(){
quadTree = new QuadTree(new Envelope2D(-180, -90, 180, 90), 8);
Envelope envelope = new Envelope();
for (int i=0;i<featureClass.features.length;i++){
featureClass.features[i].geometry.queryEnvelope(envelope);
quadTree.insert(i, new Envelope2D(envelope.getXMin(), envelope.getYMin(), envelope.getXMax(), envelope.getYMax()));
}
quadTreeIter = quadTree.getIterator();
}

private int queryQuadTree(Point pt)
{
quadTreeIter.resetIterator(pt, 0);
int elmHandle = quadTreeIter.next();
while (elmHandle >= 0){
int featureIndex = quadTree.getElement(elmHandle);
if (GeometryEngine.contains(featureClass.features[featureIndex].geometry, pt, spatialReference)){
return featureIndex;
}
elmHandle = quadTreeIter.next();
}
return -1;
}

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration config = context.getConfiguration();
spatialReference = SpatialReference.create(4326);
String featuresPath = config.get("sample.features.input");
labelAttribute = config.get("sample.features.keyattribute", "NAME");
latitudeIndex = config.getInt("samples.csvdata.columns.lat", 1);
longitudeIndex = config.getInt("samples.csvdata.columns.long", 2);
FSDataInputStream iStream = null;
spatialReference = SpatialReference.create(4326);
try {
Path p= new Path(featuresPath);
FileSystem hdfs = p .getFileSystem (config);
iStream = hdfs.open(new Path(featuresPath));
featureClass = EsriFeatureClass.fromJson(iStream);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
if (iStream != null)
{
try {
iStream.close();
} catch (IOException e) { }
}
}
if (featureClass != null){
buildQuadTree();
}
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (key.get() == 0) return;
String line = value.toString();
String [] values = line.split(",");
float latitude = Float.parseFloat(values[latitudeIndex]);
float longitude = Float.parseFloat(values[longitudeIndex]);
Point point = new Point(longitude, latitude);

IntWritable one = new IntWritable(1);
int featureIndex = queryQuadTree(point);
if (featureIndex >= 0){
String name = (String)featureClass.features[featureIndex].attributes.get(labelAttribute);

if (name == null)
name = "???";
context.write(new Text(name), one);
} else {
context.write(new Text("*Outside Feature Set"), one);
}
}
}

 

4、新建reducer类

package esri.hadoop.text;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ReduceClass extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sumCount = 0;
for (IntWritable sum : values)
{
sumCount += sum.get();
}
context.write(key, new IntWritable(sumCount));
}

}

5、新建sample类

package esri.hadoop.text;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Sample {

public static void main (String[] args1) throws Exception {

Configuration config = new Configuration();
// 这句话很关键
config.set("mapred.job.tracker", "10.2.173.15:9000");
String[] ioArgs = new String[] { "/user/guest/esri/counties-data/california-counties.json","/user/guest/esri/earthquake-data/earthquakes.csv", "/user/guest/esri/DataCount" };

String [] args = new GenericOptionsParser(config, ioArgs).getRemainingArgs();

if (args.length != 3)
{
System.out.println("Invalid Arguments");

throw new IllegalArgumentException();
}
config.set("sample.features.input", args[0]);
config.set("sample.features.keyattribute", "NAME");
config.setInt("samples.csvdata.columns.lat", 1);
config.setInt("samples.csvdata.columns.long", 2);
Job job = Job.getInstance(config);
job.setJobName("Earthquake Data Aggregation Sample");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(MapperClass.class);
job.setReducerClass(ReduceClass.class);
job.setCombinerClass(ReduceClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.setInputPaths(job, new Path(args[1]));
TextOutputFormat.setOutputPath(job, new Path(args[2]));
job.setJarByClass(Sample.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

6、运行

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop gis
相关文章推荐