您的位置:首页 > 运维架构

从Kafka topic中获取数据并在Spark中进行分析

2015-12-15 16:40 676 查看

从kafka topic中获取数据

依赖的包

spark-assembly-1.4.1-hadoop2.4.0.jar

spark-streaming-kafka-assembly_2.10-1.5.1.jar

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.pcitc.bigData.demo.storm._

object testprint {

def main(args: Array[String]): Unit = {

val Array(zkQuorum, group, topics) = Array("x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181", "spark-streaming-test", "test")
/**
* 创建StreamingContext对象
*/
val sparkConf = new SparkConf().setAppName("getKafkaData")

val ssc = new StreamingContext(sparkConf, Seconds(2))
val topicMap = topics.split(",").map((_, 1)).toMap

var values = new Array[String](7)

/**
* 创建Streaming 从kafka topic中获取数据 并进行处理
*/
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(
rdd => {
rdd.foreach(record =>{
println("=>=>=>=>[INFO: BEGIN TO FORECAST]")

values = record.toString().split(" ")

...

println("=>=>=>=>[INFO: FORECAST DONE!]")
})
})
ssc.start()
ssc.awaitTermination()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: