SparkStreaming基于Kafka Direct
2016-05-27 16:15
337 查看
1.优点:
Kafka中的数据相当于streaming的底层文件系统,可以保证kafka中的数据能够处理且只能处理一次。此时不需要开启WAL机制。因为本身就可以保证数据0丢失
数据处理不过来
1.限定数据流动速度
2.增强机器处理能力
3.放到缓存池
2.代码:
String brokers = "192.168.10.150:9092";
String topics = "ws2,";
// 创建conf
SparkConf sparkConf = new SparkConf().setMaster("spark://rizhicaiji:7077").setAppName("StreamingMain");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
// 创建direct kafka
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jsc,
String.class,String.class,
StringDecoder.class,StringDecoder.class,
kafkaParams,
topicsSet
);
Kafka中的数据相当于streaming的底层文件系统,可以保证kafka中的数据能够处理且只能处理一次。此时不需要开启WAL机制。因为本身就可以保证数据0丢失
数据处理不过来
1.限定数据流动速度
2.增强机器处理能力
3.放到缓存池
2.代码:
String brokers = "192.168.10.150:9092";
String topics = "ws2,";
// 创建conf
SparkConf sparkConf = new SparkConf().setMaster("spark://rizhicaiji:7077").setAppName("StreamingMain");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
// 创建direct kafka
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jsc,
String.class,String.class,
StringDecoder.class,StringDecoder.class,
kafkaParams,
topicsSet
);
相关文章推荐
- 可变参数
- mongodb查看当前操作db.currentOp()
- linux中常用的一些命令
- C# SMTP 邮件发送之QQ邮箱篇
- static 全局变量 和 普通全局变量区别
- Eclipse SVN文件冲突及不能直接提交情况
- Oracle数据库用户、表、表空间之间关系
- IE6不支持fixed
- 读jQuery源码释疑笔记2
- 餐饮管理系统之员工管理
- 隐藏/修饰页面的滚动条
- 集成shareSDK3.3.0所遇到的问题
- JuheNews系列之二 · ToolBar+AppBarLayout+CoordinatorLayout+CollapsingToolbarLayout
- Android 学习路线总结
- 自行编写strcpy()函数
- 解决Grails端口号冲突问题
- Java中vector的使用详解
- 安卓Transition学习(一)
- 二叉树
- 解决AndroidStudio从网络引入三方jar包,项目中无法找到类问题