您的位置:首页 > 大数据

大数据生态系统基础:Apache Kafka基础(四):最新kafka编程入门:Stream API

2017-08-06 12:52 405 查看
    数据传输的事务定义通常有以下三种级别:
最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。当发布消息时,Kafka有一个“committed”的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失。 

consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
“精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。

一、流 Stream API 的作用
       流 API 提供一个主题到另一个主题的转换。 也就是从一个输入主题到一个输出主题的流数据转换。
       它的主要类就是 : Class KafkaStreams
       可以通过使用TopologyBuilder来定义一个处理器的DAG拓扑结构,或者使用提供高级DSL来定义转换的KStreamBuilder来指定计算逻辑。
      一个KafkaStreams可以包含一个或多个在configs中指定的线程,用于处理工作。可以通过相同的 APP ID 来协调其它的实例,不管是在同一进程中,还是在这台机器上的其他进程上,还是在远程机器上,都可以作为一个流处理 APP。这些实例将根据输入主题分区的分配来划分工作,从而使所有分区都被消耗。如果添加或失败实例,所有(剩余)实例将重新平衡分区分配,以平衡处理负载,并确保所有输入主题分区都被处理。

        在内部,一个流实例包含了一个正常的 kafkaProducer 和 kafkaConsumer 实例,他们用做读输入和写输出。如下简单的例子:
       
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");//定义 App ID
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //连接 kafka 集群
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig config = new StreamsConfig(props);

KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();


二、WordCountDemo 例子
        1、说明
       

 */

/**

 * 用 high-level KStream DSL演示怎么完成 WordCount程序

 * 从一个输入的 txt 文件中,计算一个简单单词出现的频率

 *

 * 在本例中,输入流从一个名为“streams-file-input”的主题中读取,

 * 其中消息的值表示文本行;并且,直方图输出被写入到主题“streams-wordcount-output”,

 * 其中每个记录都是单个单词的更新计数。

 *

 * 在运行本例子钱,你一定要创建一个输入主题和输出主题 (e.g. kafka-topics.sh--create ...),

 * 并写一些数据到输入主题中,(e.g. kafka-console-producer.sh). 

 * 否则的话,你在输出主题中什么都看不到.

 */

       2、准备数据

创建数据文件:file-input.txt

或者在 Windows:

   建立主题: stream-file-input  
           将数据文件 file-input.txt 输入到主题 streams-file-input 3、开始处理数据
4、检查结果
输出结果为:
三、源程序      注意:在 pom.xml 增添以下一行代码:<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-streams</artifactId>    <version>0.11.0.0</version></dependency>下面是源程序
package wangxn.testkafka;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;

/**
* @author wangxinnian
*
*/
/**
* 用 high-level KStream DSL演示怎么完成 WordCount程序
* 从一个输入的 txt 文件中,计算一个简单单词出现的频率
*
* 在本例中,输入流从一个名为“streams-file-input”的主题中读取,
* 其中消息的值表示文本行;并且,直方图输出被写入到主题“streams-wordcount-output”,
* 其中每个记录都是单个单词的更新计数。
*
* 在运行本例子钱,你一定要创建一个输入主题和输出主题 (e.g. kafka-topics.sh --create ...),
* 并写一些数据到输入主题中,(e.g. kafka-console-producer.sh).
* 否则的话,你在输出主题中什么都看不到.
*/

public class WordCountDemo {

/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "mymac:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// 设置偏移复位到最早,这样我们就可以用相同的预加载数据重新运行演示代码
// 注意,重新运行 demo, 你需要用 偏移量复位工具(offset reset tool):
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("streams-wordcount-input");
KTable<String, Long> counts=source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String,String>(value, value);
}

}).groupByKey().count("Counts");
// need to override value serde to Long type
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
latch.await();
} catch (Throwable e) {
Exit.exit(1);
}
Exit.exit(0);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐