pyspark kafka createDirectStream和createStream 区别
2017-08-28 11:32
423 查看
from pyspark.streaming.kafka import KafkaUtils kafkaStream = KafkaUtils.createStream(streamingContext, \ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
[/code]
from pyspark.streaming.kafka import KafkaUtils directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
就是参数不一样。其中
createStream用的
ZK quorum是zk的2181端口。而
createDirectStream用的是kafka进程9092端口。
Kafka的进程ID为9300,占用端口为9092
QuorumPeerMain为对应的zookeeper实例,进程ID为6379,在2181端口监听
所以在运行官方例子时候
一个是
./bin/spark-submit --jars ~/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar examples/src/main/python/streaming/direct_kafka_wordcount.py localhost:9092 test
另外一个是:
./bin/spark-submit --jars ~/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar examples/src/main/python/streaming/direct_kafka_wordcount.py localhost:2181 test
参考:
https://spark.apache.org/docs/1.6.1/streaming-kafka-integration.html http://zhangfengzhe.blog.51cto.com/8855103/1556650
相关文章推荐
- spark读取kafka数据 createStream和createDirectStream的区别
- spark读取kafka数据 createStream和createDirectStream的区别
- Kafka + spark stream +redis (createStream + createDirectStream)
- spark streaming 实现kafka的createDirectStream方式!!不坑
- spark streaming kafka1.4.1中的低阶api createDirectStream使用总结(转)
- 这几天折腾spark的kafka的低阶API createDirectStream的一些总结
- spark streaming 实现kafka的createDirectStream方式!!不坑
- KafkaUtils.createDirectStream
- spark streaming 实现kafka的createDirectStream方式!!不坑
- spark streaming 实现kafka的createDirectStream方式!!不坑
- Spark createDirectStream 维护 Kafka offset(Scala)
- scala版本kafka createDirectStream
- spark streaming 实现kafka的createDirectStream方式!!不坑
- spark createDirectStream保存kafka offset(JAVA实现)
- spark streaming 实现kafka的createDirectStream方式!!不坑
- 第114课加强版:SparkStreaming+Kafka+createDirectStream+KafkaOffsetMonitor解决内幕
- java版本kafka createDirectStream
- spark streaming kafka1.4.1中的低阶api createDirectStream使用总结
- spark streaming kafka1.4.1中的低阶api createDirectStream使用总结
- spark createDirectStream保存kafka offset(JAVA实现)