从Kafka topic中获取数据并在Spark中进行分析
2015-12-15 16:40
676 查看
从kafka topic中获取数据
依赖的包spark-assembly-1.4.1-hadoop2.4.0.jar
spark-streaming-kafka-assembly_2.10-1.5.1.jar
import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import com.pcitc.bigData.demo.storm._ object testprint { def main(args: Array[String]): Unit = { val Array(zkQuorum, group, topics) = Array("x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181", "spark-streaming-test", "test") /** * 创建StreamingContext对象 */ val sparkConf = new SparkConf().setAppName("getKafkaData") val ssc = new StreamingContext(sparkConf, Seconds(2)) val topicMap = topics.split(",").map((_, 1)).toMap var values = new Array[String](7) /** * 创建Streaming 从kafka topic中获取数据 并进行处理 */ val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) lines.foreachRDD( rdd => { rdd.foreach(record =>{ println("=>=>=>=>[INFO: BEGIN TO FORECAST]") values = record.toString().split(" ") ... println("=>=>=>=>[INFO: FORECAST DONE!]") }) }) ssc.start() ssc.awaitTermination() } }
相关文章推荐
- nginx配置多端口访问多项目
- How to install Node.js on Linux
- nginx yum 常用操作命令记录
- tomcat部署报错及解决办法
- opencv提取截获图像,任意区域
- nginx源代码分析--高性能server开发 常见的流程模型
- 总结记录——(opencv取任意位置矩形矫正算法)
- Linux进程间通信——使用消息队列
- 图像处理算法基础(五)---拉普拉斯变换自实现与opencv对比
- 服务器远程监控管理(一)-硬件篇
- java后台怎么判断是手机还是电脑访问网站
- OpenGL深度测试与深度缓冲以及Z-fighting的消除
- OpenCV笔记(六)
- 在OpenStack里如何确保虚机正常访问MetaData IP 169.254.169.254
- 项目:手机网民监测网站的收获与总结
- Linux下Apache+Mysql+PHP的环境搭建
- opencv处理函数记录_转自opencv中文网站
- 解决popupwindow中会出现的IllegalStateException问题
- 路径正确openLayer也无法获得发布地图
- nginx图片服务器配置