您的位置:首页 > 其它

spark streaming 同时处理两个不同kafka集群的数据

2016-07-13 15:16 369 查看
如题,总是不那么完美,要处理的数据在两个不同的kafka集群里面,日子得过,问题也得解决,我们创建两个DStream,连接两个不同的kafka集群的不同topic,然后再把这两个DStream union在一起处理,代码如下:

package com.kingnet

import java.util

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

import scala.collection.JavaConversions._

/** *
*
*/
object IOSChannelNewActiveDids {

def createContext(params: KafkaStreamingParams) = {

// {"batchTime":5,"sources":[{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test1","numThreads":"1"},{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test2","numThreads":"1"}]}

val sparkConf = new SparkConf().setAppName("IOSChannelNewActiveDids")
val ssc = new StreamingContext(sparkConf, Seconds(params.getBatchTime.toInt))

// ssc.checkpoint(checkpointDirectory)
val rawdata = params.getSources.map(p => {
val topicMap = p.getTopics.split(",").map((_, p.getNumThreads.toInt)).toMap
KafkaUtils.createStream(ssc, p.getZookeeper, p.getGroup, topicMap).map(_._2)
}).toSeq
//把多个DStream union在一起处理。
val union_rawdata = ssc.union(rawdata)
union_rawdata.print()
ssc
}

def main(args: Array[String]) {

if (args.length < 1) {
System.err.println("Usage: com.kingnet.IOSChannelNewActiveDids {\"batchTime\":5,\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":1},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":1}]}")
System.exit(1)
}

val params = GsonObject.getInstance().fromJson(args(0), classOf[KafkaStreamingParams])
params.getSources.foreach(p => {
println(p.getTopics)
})

val ssc = createContext(params)
ssc.start()
ssc.awaitTermination()

}
}

我们向args里面传递了一个json字符串作为参数,json字符串中配置了一个sources列表,里面指定了两个连接信息(我这里是测试,所以两个配置的zookerlist是相同的),然后我把这个json解析成了一个java对象:

package com.kingnet;

import java.util.List;

/**
* Created by xiaoj on 2016/7/13.
*/
public class KafkaStreamingParams {
private String batchTime;
private List<KafkaParams> sources;

public String getBatchTime() {
return batchTime;
}

public void setBatchTime(String batchTime) {
this.batchTime = batchTime;
}

public List<KafkaParams> getSources() {
return sources;
}

public void setSources(List<KafkaParams> sources) {
this.sources = sources;
}

@Override
public String toString() {
return "KafkaStreamingParams{" +
"batchTime='" + batchTime + '\'' +
", sources=" + sources +
'}';
}

class KafkaParams{
private String zookeeper;
private String group;
private String topics;
private String numThreads;

public String getZookeeper() {
return zookeeper;
}

public void setZookeeper(String zookeeper) {
this.zookeeper = zookeeper;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

public String getTopics() {
return topics;
}

public void setTopics(String topics) {
this.topics = topics;
}

public String getNumThreads() {
return numThreads;
}

public void setNumThreads(String numThreads) {
this.numThreads = numThreads;
}

@Override
public String toString() {
return "KafkaParams{" +
"zookeeper='" + zookeeper + '\'' +
", group='" + group + '\'' +
", topics='" + topics + '\'' +
", numThreads='" + numThreads + '\'' +
'}';
}
}
}


好吧,我经常这么干,在scala项目中创建java类,得益于强大的IDEA开发工具。
package com.kingnet

import java.util

import com.google.gson.{Gson, GsonBuilder}

/**
* Created by xiaoj on 2016/5/5.
*/
object GsonObject {
@volatile private var instance: Gson = null

def getInstance(): Gson = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = new GsonBuilder().create()
}
}
}
instance
}

def fromJson(s: String): Option[util.HashMap[String, Any]] = {
try {
Some(getInstance().fromJson(s,classOf[util.HashMap[String, Any]]))
} catch {
case e: Exception =>
e.printStackTrace()
None
}
}
def toJson(src:Any) = {
getInstance().toJson(src)
}
}

运行程序,传递一个json参数:{\"batchTime\":\"10\",\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":"1"},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":"1"}]}
打开两个kafka 的console producer分别往test1和test2两个topic里面写数据,然后在streaming程序控制台就会打印出接收到的消息了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: