您的位置:首页 > 编程语言 > Java开发

关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错

2016-06-28 17:20 411 查看
林林总总玩了Spark快一个月了, 打算试一下kafka的消息系统加上Spark Streaming 进行实时推送数据的处理。

简单的写了一个类作为kafka的producer, 然后SparkStreaming的类作为consumer

Producer 的run方法产生数据:


public void run() {

KafkaProducer<Integer, String> producer = getProducer();

int messageNum = 0;

Random rd = new Random();

while(true){


String page = "Page_" + rd.nextInt(15) + ".html";

Integer click = rd.nextInt(10);

float stayTime = rd.nextFloat();
Integer likeOrNot = rd.nextInt(3);

String messageStr = page + "\t" + click + "\t" + stayTime + "\t" + likeOrNot;

long startTime = System.currentTimeMillis();
System.out.println("sending message: " + messageStr);
producer.send(new ProducerRecord<>(topic, 999, messageStr));
messageNum++;

try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


}


SparkStreaming的consumer:


val ssc = new StreamingContext("local[2]", "appName", Seconds(5),System.getenv("SPARK_HOME"))
ssc.checkpoint("./")

val kafkaStream = KafkaUtils.createStream(ssc, "10.32.190.165:2181", "test", Map("test"->1))
// val kafkaStream = KafkaUtils.createStream(ssc, Map("group.id"->"test","zookeeper.connect"->"10.32.190.165:2181", "zookeeper.connection.timeout.ms"->"10000"), Map("test"->1),StorageLevel.MEMORY_AND_DISK_SER_2)
// kafkaStream.ytt

val msgRDD = kafkaStream.map(_._2)
val newRdd = msgRDD.map { x => (x.split("\t")(0), getValueOfPage(x.split("\t"))) }.reduceByKey((a,b) => a + b)

val resultRdd = newRdd.transform(x =>x.sortByKey(false))

var updateFunc = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {


iterator.flatMap(x=> {

var page = x._1
var nowValue = x._2.sum
var oldValue : Double = x._3.getOrElse(0)


Some(nowValue + oldValue)

}.map { y => (x._1, y) })
}

val initRDD = ssc.sparkContext.parallelize(List(("page_0.html", 0.0)))

val stateRDD = newRdd.updateStateByKey[Double](updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true,initRDD);

// val sortRDD = stateRDD.map(x => (x._2, x._1))



// newRdd.

stateRDD.foreachRDD(r =>{

val sortedRDD = r.map(x => (x._2, x._1)).sortByKey(false)

val topK = sortedRDD.take(3)

topK.foreach(y => println(y))


})
// resultRdd.print()
ssc.start()
ssc.awaitTermination()



zookeeper和Kafka的config都是默认配置, 由于资源不够, 目前都是单机环境, 就改了一下zookeeper的server port, 和kafka这边zookeeper的host+port

结果运行的时候就报错:
16/06/28 16:55:33 WARN ClientUtils$: Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
16/06/28 16:55:33 INFO SyncProducer: Disconnecting from [color=red]localhost:9092[/color]
16/06/28 16:55:33 WARN ConsumerFetcherManager$LeaderFinderThread: [test_CNCSHUM4L3C-1467104130749-71111338-leader-finder-thread], Failed to find leader for Set([test,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more

可以看到kafka的broker是去找localhost:9092, 由于eclipse环境在本地, kafka和zookeeper是在vm上面, 基本锁定是这个原因。

关键是, localhost是在哪里改。。。。

尝试跟踪源码, 发现KafkaUtils.createStream 有一个方法是可以传一个Map进去, Map里面存的configuration。 尝试传入"metadata.broker.list", 结果从spark的日志中看到这个属性不能在这里设置, 直接被ingore了。

折腾了两天, 基本上把本机可以动的地方都动了, 没用。

后来想到会不会是kafka启动的时候用的server.properties里面有设置, 打开一看, 果然, 有一个属性:

#advertised.listeners=PLAINTEXT://your.host.name:9092
默认被注释掉了, 看说明如果被注释掉了后就直接设置成localhost了, 果断修改成Kafka的IP:Port,重启Kafka, 启动producer, 运行Consumer, 错误消失, 分析结果出来了 :D :D
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: