大数据生态系统基础:Apache Kafka基础(四):最新kafka编程入门:Stream API
2017-08-06 12:52
405 查看
数据传输的事务定义通常有以下三种级别:
最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。当发布消息时,Kafka有一个“committed”的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失。
consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
“精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。
一、流 Stream API 的作用
流 API 提供一个主题到另一个主题的转换。 也就是从一个输入主题到一个输出主题的流数据转换。
它的主要类就是 : Class KafkaStreams
可以通过使用TopologyBuilder来定义一个处理器的DAG拓扑结构,或者使用提供高级DSL来定义转换的KStreamBuilder来指定计算逻辑。
一个KafkaStreams可以包含一个或多个在configs中指定的线程,用于处理工作。可以通过相同的 APP ID 来协调其它的实例,不管是在同一进程中,还是在这台机器上的其他进程上,还是在远程机器上,都可以作为一个流处理 APP。这些实例将根据输入主题分区的分配来划分工作,从而使所有分区都被消耗。如果添加或失败实例,所有(剩余)实例将重新平衡分区分配,以平衡处理负载,并确保所有输入主题分区都被处理。
在内部,一个流实例包含了一个正常的 kafkaProducer 和 kafkaConsumer 实例,他们用做读输入和写输出。如下简单的例子:
二、WordCountDemo 例子
1、说明
*/
/**
* 用 high-level KStream DSL演示怎么完成 WordCount程序
* 从一个输入的 txt 文件中,计算一个简单单词出现的频率
*
* 在本例中,输入流从一个名为“streams-file-input”的主题中读取,
* 其中消息的值表示文本行;并且,直方图输出被写入到主题“streams-wordcount-output”,
* 其中每个记录都是单个单词的更新计数。
*
* 在运行本例子钱,你一定要创建一个输入主题和输出主题 (e.g. kafka-topics.sh--create ...),
* 并写一些数据到输入主题中,(e.g. kafka-console-producer.sh).
* 否则的话,你在输出主题中什么都看不到.
*/
2、准备数据
创建数据文件:file-input.txt
或者在 Windows:
建立主题: stream-file-input
将数据文件 file-input.txt 输入到主题 streams-file-input 3、开始处理数据
三、源程序 注意:在 pom.xml 增添以下一行代码:<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.11.0.0</version></dependency>下面是源程序
最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。当发布消息时,Kafka有一个“committed”的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失。
consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
“精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。
一、流 Stream API 的作用
流 API 提供一个主题到另一个主题的转换。 也就是从一个输入主题到一个输出主题的流数据转换。
它的主要类就是 : Class KafkaStreams
可以通过使用TopologyBuilder来定义一个处理器的DAG拓扑结构,或者使用提供高级DSL来定义转换的KStreamBuilder来指定计算逻辑。
一个KafkaStreams可以包含一个或多个在configs中指定的线程,用于处理工作。可以通过相同的 APP ID 来协调其它的实例,不管是在同一进程中,还是在这台机器上的其他进程上,还是在远程机器上,都可以作为一个流处理 APP。这些实例将根据输入主题分区的分配来划分工作,从而使所有分区都被消耗。如果添加或失败实例,所有(剩余)实例将重新平衡分区分配,以平衡处理负载,并确保所有输入主题分区都被处理。
在内部,一个流实例包含了一个正常的 kafkaProducer 和 kafkaConsumer 实例,他们用做读输入和写输出。如下简单的例子:
Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");//定义 App ID props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //连接 kafka 集群 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
二、WordCountDemo 例子
1、说明
*/
/**
* 用 high-level KStream DSL演示怎么完成 WordCount程序
* 从一个输入的 txt 文件中,计算一个简单单词出现的频率
*
* 在本例中,输入流从一个名为“streams-file-input”的主题中读取,
* 其中消息的值表示文本行;并且,直方图输出被写入到主题“streams-wordcount-output”,
* 其中每个记录都是单个单词的更新计数。
*
* 在运行本例子钱,你一定要创建一个输入主题和输出主题 (e.g. kafka-topics.sh--create ...),
* 并写一些数据到输入主题中,(e.g. kafka-console-producer.sh).
* 否则的话,你在输出主题中什么都看不到.
*/
2、准备数据
创建数据文件:file-input.txt
4、检查结果 |
package wangxn.testkafka; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueMapper; /** * @author wangxinnian * */ /** * 用 high-level KStream DSL演示怎么完成 WordCount程序 * 从一个输入的 txt 文件中,计算一个简单单词出现的频率 * * 在本例中,输入流从一个名为“streams-file-input”的主题中读取, * 其中消息的值表示文本行;并且,直方图输出被写入到主题“streams-wordcount-output”, * 其中每个记录都是单个单词的更新计数。 * * 在运行本例子钱,你一定要创建一个输入主题和输出主题 (e.g. kafka-topics.sh --create ...), * 并写一些数据到输入主题中,(e.g. kafka-console-producer.sh). * 否则的话,你在输出主题中什么都看不到. */ public class WordCountDemo { /** * @param args */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "mymac:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // 设置偏移复位到最早,这样我们就可以用相同的预加载数据重新运行演示代码 // 注意,重新运行 demo, 你需要用 偏移量复位工具(offset reset tool): // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream("streams-wordcount-input"); KTable<String, Long> counts=source.flatMapValues(new ValueMapper<String, Iterable<String>>() { public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); } }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { public KeyValue<String, String> apply(String key, String value) { return new KeyValue<String,String>(value, value); } }).groupByKey().count("Counts"); // need to override value serde to Long type counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); final KafkaStreams streams = new KafkaStreams(builder, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { Exit.exit(1); } Exit.exit(0); } }
相关文章推荐
- 大数据生态系统基础:Apache Kafka基础(三):最新kafka编程入门:Consumer
- 大数据生态系统基础:Apache Kafka基础(二):最新kafka编程入门:Producer API
- Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和排序多种输出格式(十一)
- Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)
- 大数据生态系统基础:Apache Kafka基础(一):介绍和安装
- JavaScript入门基础--数据类型编程练习
- Apache Kafka 入门 - Kafka API 简单用法
- Spark修炼之道(基础篇)——Linux大数据开发基础:第十一节:Shell编程入门(三)
- 服务器编程入门(4)Linux网络编程基础API
- Spark修炼之道(基础篇)——Linux大数据开发基础:第十四节:Shell编程入门(六)
- 大数据生态系统基础:Apache Spark(三):Java 版本编程实例(WordCount)
- Spark修炼之道(基础篇)——Linux大数据开发基础:第十四节:Shell编程入门(六)
- Java Servelet API入门基础教程-Java基础-Java-编程开发
- 大数据生态系统基础:Apache Kafka基础(一):介绍和安装
- Spark RDD/Core 编程 API入门系列之简单移动互联网数据(五)
- Linux大数据开发基础:第十一节:Shell编程入门(三)
- Spark RDD/Core 编程 API入门系列之简单移动互联网数据(五)
- kafka从安装到编程实现零基础入门
- Java基础(极客)——01、Java编程基础知识入门:变量与数据类型
- Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)