您的位置:首页 > 其它

SparkStreaming基于Kafka Direct

2016-05-27 16:15 337 查看
1.优点:

Kafka中的数据相当于streaming的底层文件系统,可以保证kafka中的数据能够处理且只能处理一次。此时不需要开启WAL机制。因为本身就可以保证数据0丢失

 

数据处理不过来

1.限定数据流动速度

2.增强机器处理能力

3.放到缓存池

 
2.代码:
 
 
   String brokers = "192.168.10.150:9092";
    String topics = "ws2,";
    // 创建conf
    SparkConf sparkConf = new SparkConf().setMaster("spark://rizhicaiji:7077").setAppName("StreamingMain");
    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", brokers);
    // 创建direct kafka
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jsc,
        String.class,String.class,
        StringDecoder.class,StringDecoder.class,
        kafkaParams,
       topicsSet
    );
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: