一个关于spark streaming+flume的小应用
2016-10-01 15:58
495 查看
这是这个学期的分布式课程是小作业,虽然期末展示的时候被老师说这个小应用与分布式没多大关系,不管怎样也是花时间做的,所以拿出来分享一下。
我这个应用是来统计一个网段某段时间内的网络访问情况,统计访问了哪些网站已经访问的频率。
apache-flume-1.5.2-bin.tar.gz jdk-7u79-linux-x64.tar.gz spark-1.3.1-bin-hadoop2.6.tgz
apache-maven-3.3.3-bin.tar.gz hadoop-2.6.0.tar.gz scala-2.11.6.tgz
至于环境变量的配置,是参照这个来的。
flume-conf.properties文件:
关于一些参数的解释官网上都有,我就不乱说了。
在spark的conf文件夹下,记得要让log4j.properties, slaves文件生效,就是把后面的.template去掉。
log4j.properties的内容如下:
使用的指令为:
sudo ettercap -i eth0 -Tq -M arp:remote //172.18.217.254/
这里172.18.217.254是我这个网段的网关
因为我将
这句注释掉了所以该程序不用放在很深的文件夹内了
/code/src/main/java/***.java
而且上面的抓包程序和pom.xml都放在code/下
sudo ettercap -i eth0 -Tq -M arp:remote //172.18.217.254/
spark运行:
#!/usr/bin/env bash
spark-submit --class JavaFlumeEventCount target/Flume-SparkStreaming-1.0-jar-with-dependencies.jar localhost 42424 ../package/tomcat7/webapps/examples/IpCount.json
flume运行:
#!/usr/bin/env bash
flume-ng agent --conf /usr/local/flume/conf/ --conf-file /usr/local/flume/conf/flume-conf.properties --name a2 -Dflume.root.logger=INFO,console
http抓包:
sudo python http.py
最后访问那个网页就能看到结果了
我这个应用是来统计一个网段某段时间内的网络访问情况,统计访问了哪些网站已经访问的频率。
1.前期准备
我是在Ubuntu上做的,安装的一些包有apache-flume-1.5.2-bin.tar.gz jdk-7u79-linux-x64.tar.gz spark-1.3.1-bin-hadoop2.6.tgz
apache-maven-3.3.3-bin.tar.gz hadoop-2.6.0.tar.gz scala-2.11.6.tgz
至于环境变量的配置,是参照这个来的。
2.配置flume和spark
pom.xml的文件如下:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>flume</groupId> <artifactId>Flume-SparkStreaming</artifactId> <version>1.0</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.3.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20141113</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> </project>
flume-conf.properties文件:
#netcat to avro # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = netcat a2.sources.r2.bind = localhost a2.sources.r2.port = 44444 # Describe the sink a2.sinks.k2.type = avro a2.sinks.k2.hostname = localhost a2.sinks.k2.port = 42424 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
关于一些参数的解释官网上都有,我就不乱说了。
在spark的conf文件夹下,记得要让log4j.properties, slaves文件生效,就是把后面的.template去掉。
log4j.properties的内容如下:
# Set everything to be logged to the console log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
3.arp欺骗
使用Ettercap工具,具体参考点这里。使用的指令为:
sudo ettercap -i eth0 -Tq -M arp:remote //172.18.217.254/
这里172.18.217.254是我这个网段的网关
4.http抓包
这个程序网上有,稍微修改了一下为了满足我这个应用的需求#!/usr/bin/env python import pcap import dpkt import re import socket mysock = socket.socket() mysock.connect(('localhost',44444)) def match(pre, line): p = re.compile(pre) m = p.match(line) return m def main_pcap(p_time, p_data): out_format = "%s\t%s\t%s\t%s\t%s\tHTTP/%s" p = dpkt.ethernet.Ethernet(p_data) ret = None if p.data.__class__.__name__ == 'IP': ip_data = p.data src_ip = '%d.%d.%d.%d' % tuple(map(ord,list(ip_data.src))) dst_ip = '%d.%d.%d.%d' % tuple(map(ord,list(ip_data.dst))) if p.data.data.__class__.__name__=='TCP': tcp_data = p.data.data if tcp_data.dport==80: if tcp_data.data: try: h = dpkt.http.Request(tcp_data.data) except dpkt.UnpackError: return pre = "^/.*$" http_headers = h.headers host = h.headers['host'] url = host # datetime srcip dstip GET /index.htm HTTP/1.1 ret = out_format % (p_time, src_ip, dst_ip, h.method, url, h.version) mysock.send(url) mysock.send("\n") return ret def main(): pc=pcap.pcap() print '1111' pc.setfilter('tcp port 80') for p_time, p_data in pc: ret = main_pcap(p_time, p_data) if ret: print ret if __name__ == '__main__': main()
5.spark中的程序
主要功能就是词频统计,然后将结果输出为json格式的数据/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //package org.apache.spark.examples.streaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.SparkFlumeEvent; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.JavaPairRDD; import scala.Tuple2; import java.util.List; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; import org.json.JSONException; import org.json.JSONStringer; import org.json.JSONObject; public final class JavaFlumeEventCount { private JavaFlumeEventCount() { } public static void outputToJson(List<Tuple2<String, Integer>> listOfResults, String outputPath) { JSONStringer IpJson = new JSONStringer(); IpJson.array(); for (Tuple2<String, Integer> item : listOfResults) { IpJson.object(); IpJson.key("Domain").value(item._1); IpJson.key("Count").value(item._2); IpJson.endObject(); } IpJson.endArray(); try { File outputFile = new File(outputPath); if (!outputFile.exists()) { outputFile.createNewFile(); } FileWriter fw = new FileWriter(outputFile); BufferedWriter outBuffer = new BufferedWriter(fw); outBuffer.write(IpJson.toString()); outBuffer.flush(); outBuffer.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { if (args.length != 3) { System.err.println("Usage: JavaFlumeEventCount <host> <port> <Output Path>"); System.exit(1); } String host = args[0]; int port = Integer.parseInt(args[1]); final String outputPath = args[2]; Duration batchInterval = new Duration(2000); SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); //use to print flumeStream.map( new Function<SparkFlumeEvent, String> () { @Override public String call(SparkFlumeEvent flumeEvent) { return "Flume Event: " + flumeEvent.event().toString(); } } ).print(); //map JavaPairDStream<String, Integer> ipMap = flumeStream.mapToPair( new PairFunction<SparkFlumeEvent, String, Integer>() { @Override public Tuple2<String, Integer> call(SparkFlumeEvent flumeEvent) { JSONObject jsonobj = new JSONObject(flumeEvent.event().toString()); JSONObject jsonobj_body = new JSONObject(jsonobj.getString("body")); String domain = jsonobj_body.getString("bytes"); Tuple2<String, Integer> result = new Tuple2<String, Integer> (domain, 1); return result; } }); //reduce JavaPairDStream<String, Integer> ipReduce = ipMap.reduceByKeyAndWindow( new Function2<Integer, Integer, Integer> (){ @Override public Integer call(Integer c1, Integer c2) { return c1 + c2; } }, new Duration(4000), new Duration(4000)); //output ipReduce.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() { @Override public Void call(JavaPairRDD<String, Integer> rdd) { List<Tuple2<String, Integer>> listOfResults = rdd.collect(); outputToJson(listOfResults, outputPath); return null; } }); ssc.start(); } }
因为我将
package org.apache.spark.examples.streaming;
这句注释掉了所以该程序不用放在很深的文件夹内了
/code/src/main/java/***.java
而且上面的抓包程序和pom.xml都放在code/下
6.网页展示
这段代码网上有,随便改了一下<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>getJSON获取数据</title> <script type="text/javascript" src="js/jquery-1.8.2.min.js"></script> <style type="text/css"> #divframe{ border:1px solid #999; width:500px; margin:0 auto;} .loadTitle{ background:#CCC; height:30px;} </style> <script type="text/javascript"> $(function(){ $("#btn").click(function(){ $.getJSON("IpCount.json",function(data){ var $jsontip = $("#jsonTip"); var strHtml = "";//存储数据的变量 $jsontip.empty();//清空内容 $.each(data,function(infoIndex,info){ strHtml += "Domain:"+info["Domain"]+"<br>"; strHtml += "Count:"+info["Count"]+"<br>"; strHtml += "<hr>" }) $jsontip.html(strHtml);//显示处理后的数据 }) }) }) </script> </head> <body> <div id="divframe"> <div class="loadTitle"> <input type="button" value="获取数据" id="btn"/> </div> <div id="jsonTip"> </div> </div> </body> </html>
7.运行
arp欺骗:sudo ettercap -i eth0 -Tq -M arp:remote //172.18.217.254/
spark运行:
#!/usr/bin/env bash
spark-submit --class JavaFlumeEventCount target/Flume-SparkStreaming-1.0-jar-with-dependencies.jar localhost 42424 ../package/tomcat7/webapps/examples/IpCount.json
flume运行:
#!/usr/bin/env bash
flume-ng agent --conf /usr/local/flume/conf/ --conf-file /usr/local/flume/conf/flume-conf.properties --name a2 -Dflume.root.logger=INFO,console
http抓包:
sudo python http.py
最后访问那个网页就能看到结果了
相关文章推荐
- 关于注册表应用我有一个问题,望不吝赐教,谢谢
- 小程序大问题,MSDN中一个小小示例所带来的疑问,一个关于DataList的一个简单应用
- 基于Linux搭建一个类似Qik手机录像直播平台(服务器端:feng streaming server + web server,客户端:Android手机应用)
- 关于JAVA反射应用容易出现的一个隐性错误:
- 一个关于在VB.NET中应用超级解霸的问题
- awk中一个关于NR和FNR的应用
- 关于一个应用分枝界限法解决最小顶点覆盖问题的程序说明
- 关于一个银行系统的具体应用实现方案的讨论
- 关于Ajax控件中的AutoCompleteExtender控件的一个简单应用
- 小程序大问题,MSDN中一个小小示例所带来的疑问,一个关于DataList的一个简单应用
- 关于Graphics的一个应用实例
- 关于部署、配置基于WebLogic10应用时的一个错误
- 关于一个服务上面多个应用的场景
- 分享一个关于Symbian上开发QT应用的文档吧
- 设计总结2,关于template method 或者algorithm skeleton的一个应用
- 关于C#2.0泛型应用的一个小例子
- 关于多个应用在同一个server中部署时出现cookies被覆盖的解决方案
- 关于宏的一个应用
- 前几天遇到的一个面试题,关于中文字符截取的问题。IO流的应用