spark streaming 同时处理两个不同kafka集群的数据
2016-07-13 15:16
369 查看
如题,总是不那么完美,要处理的数据在两个不同的kafka集群里面,日子得过,问题也得解决,我们创建两个DStream,连接两个不同的kafka集群的不同topic,然后再把这两个DStream union在一起处理,代码如下:
package com.kingnet
import java.util
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scala.collection.JavaConversions._
/** *
*
*/
object IOSChannelNewActiveDids {
def createContext(params: KafkaStreamingParams) = {
// {"batchTime":5,"sources":[{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test1","numThreads":"1"},{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test2","numThreads":"1"}]}
val sparkConf = new SparkConf().setAppName("IOSChannelNewActiveDids")
val ssc = new StreamingContext(sparkConf, Seconds(params.getBatchTime.toInt))
// ssc.checkpoint(checkpointDirectory)
val rawdata = params.getSources.map(p => {
val topicMap = p.getTopics.split(",").map((_, p.getNumThreads.toInt)).toMap
KafkaUtils.createStream(ssc, p.getZookeeper, p.getGroup, topicMap).map(_._2)
}).toSeq
//把多个DStream union在一起处理。
val union_rawdata = ssc.union(rawdata)
union_rawdata.print()
ssc
}
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: com.kingnet.IOSChannelNewActiveDids {\"batchTime\":5,\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":1},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":1}]}")
System.exit(1)
}
val params = GsonObject.getInstance().fromJson(args(0), classOf[KafkaStreamingParams])
params.getSources.foreach(p => {
println(p.getTopics)
})
val ssc = createContext(params)
ssc.start()
ssc.awaitTermination()
}
}
我们向args里面传递了一个json字符串作为参数,json字符串中配置了一个sources列表,里面指定了两个连接信息(我这里是测试,所以两个配置的zookerlist是相同的),然后我把这个json解析成了一个java对象:
package com.kingnet;
import java.util.List;
/**
* Created by xiaoj on 2016/7/13.
*/
public class KafkaStreamingParams {
private String batchTime;
private List<KafkaParams> sources;
public String getBatchTime() {
return batchTime;
}
public void setBatchTime(String batchTime) {
this.batchTime = batchTime;
}
public List<KafkaParams> getSources() {
return sources;
}
public void setSources(List<KafkaParams> sources) {
this.sources = sources;
}
@Override
public String toString() {
return "KafkaStreamingParams{" +
"batchTime='" + batchTime + '\'' +
", sources=" + sources +
'}';
}
class KafkaParams{
private String zookeeper;
private String group;
private String topics;
private String numThreads;
public String getZookeeper() {
return zookeeper;
}
public void setZookeeper(String zookeeper) {
this.zookeeper = zookeeper;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getNumThreads() {
return numThreads;
}
public void setNumThreads(String numThreads) {
this.numThreads = numThreads;
}
@Override
public String toString() {
return "KafkaParams{" +
"zookeeper='" + zookeeper + '\'' +
", group='" + group + '\'' +
", topics='" + topics + '\'' +
", numThreads='" + numThreads + '\'' +
'}';
}
}
}
好吧,我经常这么干,在scala项目中创建java类,得益于强大的IDEA开发工具。
package com.kingnet
import java.util
import com.google.gson.{Gson, GsonBuilder}
/**
* Created by xiaoj on 2016/5/5.
*/
object GsonObject {
@volatile private var instance: Gson = null
def getInstance(): Gson = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = new GsonBuilder().create()
}
}
}
instance
}
def fromJson(s: String): Option[util.HashMap[String, Any]] = {
try {
Some(getInstance().fromJson(s,classOf[util.HashMap[String, Any]]))
} catch {
case e: Exception =>
e.printStackTrace()
None
}
}
def toJson(src:Any) = {
getInstance().toJson(src)
}
}
运行程序,传递一个json参数:{\"batchTime\":\"10\",\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":"1"},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":"1"}]}
打开两个kafka 的console producer分别往test1和test2两个topic里面写数据,然后在streaming程序控制台就会打印出接收到的消息了。
package com.kingnet
import java.util
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scala.collection.JavaConversions._
/** *
*
*/
object IOSChannelNewActiveDids {
def createContext(params: KafkaStreamingParams) = {
// {"batchTime":5,"sources":[{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test1","numThreads":"1"},{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test2","numThreads":"1"}]}
val sparkConf = new SparkConf().setAppName("IOSChannelNewActiveDids")
val ssc = new StreamingContext(sparkConf, Seconds(params.getBatchTime.toInt))
// ssc.checkpoint(checkpointDirectory)
val rawdata = params.getSources.map(p => {
val topicMap = p.getTopics.split(",").map((_, p.getNumThreads.toInt)).toMap
KafkaUtils.createStream(ssc, p.getZookeeper, p.getGroup, topicMap).map(_._2)
}).toSeq
//把多个DStream union在一起处理。
val union_rawdata = ssc.union(rawdata)
union_rawdata.print()
ssc
}
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: com.kingnet.IOSChannelNewActiveDids {\"batchTime\":5,\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":1},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":1}]}")
System.exit(1)
}
val params = GsonObject.getInstance().fromJson(args(0), classOf[KafkaStreamingParams])
params.getSources.foreach(p => {
println(p.getTopics)
})
val ssc = createContext(params)
ssc.start()
ssc.awaitTermination()
}
}
我们向args里面传递了一个json字符串作为参数,json字符串中配置了一个sources列表,里面指定了两个连接信息(我这里是测试,所以两个配置的zookerlist是相同的),然后我把这个json解析成了一个java对象:
package com.kingnet;
import java.util.List;
/**
* Created by xiaoj on 2016/7/13.
*/
public class KafkaStreamingParams {
private String batchTime;
private List<KafkaParams> sources;
public String getBatchTime() {
return batchTime;
}
public void setBatchTime(String batchTime) {
this.batchTime = batchTime;
}
public List<KafkaParams> getSources() {
return sources;
}
public void setSources(List<KafkaParams> sources) {
this.sources = sources;
}
@Override
public String toString() {
return "KafkaStreamingParams{" +
"batchTime='" + batchTime + '\'' +
", sources=" + sources +
'}';
}
class KafkaParams{
private String zookeeper;
private String group;
private String topics;
private String numThreads;
public String getZookeeper() {
return zookeeper;
}
public void setZookeeper(String zookeeper) {
this.zookeeper = zookeeper;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getNumThreads() {
return numThreads;
}
public void setNumThreads(String numThreads) {
this.numThreads = numThreads;
}
@Override
public String toString() {
return "KafkaParams{" +
"zookeeper='" + zookeeper + '\'' +
", group='" + group + '\'' +
", topics='" + topics + '\'' +
", numThreads='" + numThreads + '\'' +
'}';
}
}
}
好吧,我经常这么干,在scala项目中创建java类,得益于强大的IDEA开发工具。
package com.kingnet
import java.util
import com.google.gson.{Gson, GsonBuilder}
/**
* Created by xiaoj on 2016/5/5.
*/
object GsonObject {
@volatile private var instance: Gson = null
def getInstance(): Gson = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = new GsonBuilder().create()
}
}
}
instance
}
def fromJson(s: String): Option[util.HashMap[String, Any]] = {
try {
Some(getInstance().fromJson(s,classOf[util.HashMap[String, Any]]))
} catch {
case e: Exception =>
e.printStackTrace()
None
}
}
def toJson(src:Any) = {
getInstance().toJson(src)
}
}
运行程序,传递一个json参数:{\"batchTime\":\"10\",\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":"1"},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":"1"}]}
打开两个kafka 的console producer分别往test1和test2两个topic里面写数据,然后在streaming程序控制台就会打印出接收到的消息了。
相关文章推荐
- Gradle实现的两种简单的多渠道打包方法
- 会做饭,对女性来说很重要吗?
- kafka_2.11-0.10.0.0 编程Failed to send messages after 3 tries. 错误解决
- 如何在struts2中使用、配置ajax,json?sx:datetimepicker无法显示日历
- Android M(6.x)使用OkHttp包解析和发送JSON请求的教程
- 关于Fragment与Fragment、Activity通信的三种方式
- Xargs的用法和理解
- Docker容器案例:应用 Mysql
- JavaScript详解
- 心灵可以停停,代码还得敲敲
- linux 中 timeval结构体
- 有一个会做饭的男朋友幸福么?
- 关于HTML5
- 不良资产处置(四):国内外不良资产处置公司的背景特点
- 判定覆盖和条件覆盖
- Block很简单,就像delegate的简化版
- 什么是CGI、FastCGI、PHP-CGI、PHP-FPM、Spawn-FCGI
- ios字典功能UIReferenceLibraryViewController
- CodeForces 567C Geometric Progression
- 滚动 CSS3 鼠标滚轮滚动插件jQuery Smoove.js