spark读取 kafka nginx网站日志消息 并写入HDFS中(转)
2015-11-24 11:23
781 查看
原文链接:spark读取 kafka nginx网站日志消息 并写入HDFS中
spark 版本为1.0kafka 版本为0.8
首先来看看kafka的架构图 详细了解请参考官方
我这边有三台机器用于kafka 日志收集的
A 192.168.1.1 为server
B 192.168.1.2 为producer
C 192.168.1.3 为consumer
首先在A上的kafka安装目录下执行如下命令
./kafka-server-start.sh ../config/server.properties
启动kafka 通过netstat -npl 可以查看出是否开启默认端口9092
B为我们的nginx日志产生服务器,在这里的日志是网站实时写入到access-nginx.log 中
因此我们可以通过 tail -f 的方式能看到当前网站正在请求的日志信息。如果你的网站访问量很大请勿执行tail -f
同样我们也要在B上部署kafka,如果你没有写kafka 的客户端的话(查看客户端API地址 )
执行如下命令来push 数据到集群中
tail -n 0 -f /www/nh-nginx02/access.log | bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic sb-nginx03
这样我们就将日志push到kafka消息中了
C中,现在我们来写 consumer pull数据,还是要部署一下kafka 然后执行命令
bin/kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --topic sb-nginx03 --from-beginning
参数
–zookeeper 指定了你集群中zookeeper 的地址和端口即可
–topic 要和我们在B中push的时候指定的名称一致
上述方式只为在shell 命令行下,如何通过spark来写consumer呢?
假设你已经下载好spark1.0 源码 假设你已经部署好sbt scala等环境
scala 代码如下:
package test import java.util.Properties import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object KafkaTest { def main(args:Array[String]) { if (args.length < 5) { System.err.println("Usage: KafkaTest <zkQuorum> <group> <topics> <numThreads> <output>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads,output) = args val sparkConf = new SparkConf().setAppName("KafkaTest") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) lines.saveAsTextFiles(output) ssc.start() ssc.awaitTermination() //.saveAsTextFile(output) } }
然后编译
mvn -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.0.1 -DskipTests package
然后spark作业提交
./bin/spark-submit --master local[*] --class org.apache.spark.fccs.KafkaTest ./test/target/scala-2.10/spark-test-1.0.0-hadoop2.3.0-cdh5.0.1.jar zoo02 my-test sb-nginx03 1 hdfs://192.168.1.1:9100/tmp/spark-log.txt
结果如下:
相关文章推荐
- nginx配置入门
- nginx upstream 容错机制
- CentOS 6.5安装配置LNMP服务器(Nginx+PHP+MySQL)
- 使用Logrotate 切割nginx日志
- centos6.5直接yum安装nginx,并且支持php访问的配置
- nginx记录实时请求状态
- CentOS6下基于Nginx搭建mp4/flv流媒体服务器(可随意拖动)并支持RTMP/HLS协议(含转码工具)
- nginx + php-fpm故障一例
- nginx禁止ip直接访问
- Nginx发布Alias虚拟目录及PHP支持配置方法
- CentOS Linux 负载均衡高可用WEB集群之Nginx+Keepalived配置
- Nginx 命令
- gdb调试nginx示例
- nginx-push-stream-module安装
- nginx 安装&配置监听端口
- nginx和apache配置目录浏览功能
- nginx+apache 404错误页面
- Nginx搭建flv视频点播服务器
- Nginx SPDY Pagespeed模块编译——加速网站载入
- nginx js、css多个请求合并为一个请求(concat模块)