Low-level Processor API(翻译)
2017-08-22 00:00
375 查看
Processor
Stream processor 是processor topology中的一个节点。 Processor API developers定义任意的流处理器,这些流处理器中包括接收数据的处理器、以及与状态存储相关的处理器,共同组成处理器拓扑,来呈现自定义的处理逻辑。Processor 接口提供了两个主要的API方法:process和punctuate。process方法是每次在接收数据上的操作;punctuate方法是周期性地对数据进行操作。而且处理能够保持init方法初始化的ProcessorContext实例变量,使用context来调度punctuation周期(context().schedule),指定输出到下游处理器的的key/value对(context().forward),提交当前处理进展(context().commit)。
下面是一个简单wordcount的Processor实现:
public class MyProcessor implements Processor<String, String> { private ProcessorContext context; private KeyValueStore<String, Long> kvStore; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) {//初始化 // keep the processor context locally because we need it in punctuate() and commit() this.context = context;//context对象 // call this processor's punctuate() method every 1000 milliseconds.//定义punctuate处理周期为1000milliseconds,每次进行Count计算的周期 this.context.schedule(1000); // retrieve the key-value store named "Counts"//Counts的状态存储,key/value this.kvStore = (KeyValueStore<String, Long>) context.getStateStore("Counts"); } @Override public void process(String dummy, String line) {//对接收的每条数据进行处理,参数dummy为接收数据的key,参数line为接收数据的value String[] words = line.toLowerCase().split(" ");//切分line,获得words for (String word : words) { Long oldValue = this.kvStore.get(word);//根据word为key获取状态存储的value if (oldValue == null) { this.kvStore.put(word, 1L);//如果key在状态存储中不存在则,作为第一条key插入,put=1 } else { this.kvStore.put(word, oldValue + 1L);//如果key在状态存储中存在则,key+1,put+1 } } } @Override public void punctuate(long timestamp) {//周期性的对状态数据进行处理 KeyValueIterator<String, Long> iter = this.kvStore.all(); while (iter.hasNext()) {//迭代器对状态数据中的进行输出 KeyValue<String, Long> entry = iter.next(); context.forward(entry.key, entry.value.toString());//forward } iter.close(); // commit the current processing progress context.commit();//提交这阶段的操作 } @Override public void close() {//close方法 // close any resources managed by this processor. // Note: Do not close any StateStores as these are managed // by the library } }; |
In the
initmethod, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".init方法中,每秒调度punctuation方法一次,检索名为Counts的本地状态存储。
In the
processmethod, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).在process方法中,在每次接收到的记录中,切分value字符串为words,更新words在状态存储中的counts。
In the
punctuatemethod, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.在punctuate方法中,本地存储的迭代器,将聚合counts发送到下游处理器,并且提交本地stream状态。
Processor Topology
With the customized processors defined in the Processor API, developers can use theTopologyBuilderto build a processor topology by connecting these processors together:
在Processor API中使用自定义processor时,开发者可以使用TopologyBuilder通过连接这些processor到一起来创建processor topology:
TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") // add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE") // add "PROCESS2" node which takes "PROCESS1" as its upstream processor .addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1") // add "PROCESS3" node which takes "PROCESS1" as its upstream processor .addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1") // add the sink processor node "SINK1" that takes Kafka topic "sink-topic1" // as output and the "PROCESS1" node as its upstream processor .addSink("SINK1", "sink-topic1", "PROCESS1") // add the sink processor node "SINK2" that takes Kafka topic "sink-topic2" // as output and the "PROCESS2" node as its upstream processor .addSink("SINK2", "sink-topic2", "PROCESS2") // add the sink processor node "SINK3" that takes Kafka topic "sink-topic3" // as output and the "PROCESS3" node as its upstream processor .addSink("SINK3", "sink-topic3", "PROCESS3"); |
First of all a source node named "SOURCE" is added to the topology using the
addSourcemethod, with one Kafka topic "src-topic" fed to it.最前面的节点为SOURCE节点,使用addSource方法来定义,指定名为src-topic的topic
Three processor nodes are then added using the
addProcessormethod; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.然后添加了三个processor节点,使用addProcessor方法来添加processor节点
Finally three sink nodes are added to complete the topology using the
addSinkmethod, each piping from a different parent processor node and writing to a separate topic.最终添加了三个sink节点,使用addSink方法来添加sink节点,每个piping来自不同的父处理器节点并写入一个单独的topic。
State Stores
Note that theProcessorAPI is not limited to only accessing the current records as they arrive in the
process()method, but can also maintain processing states that keep recently arrived records to use in stateful processing operations such as windowed joins or aggregation. To take advantage of these states, users can define a state store by implementing the
StateStoreinterface (the Kafka Streams library also has a few extended interfaces such as
KeyValueStore); in practice, though, users usually do not need to customize such a state store from scratch but can simply use the
Storesfactory to define a state store by specifying whether it should be persistent, log-backed, etc. In the following example, a persistent key-value store named "Counts" with key type
Stringand value type
Longis created.
注意ProcessorAPI不但能通过process方法接收数据,也能保存这些接收记录的执行状态,并用于诸如windowed joins或者aggregation之类的状态执行操作。为了使用这些状态,用户通过实现StateStore接口来定义一个状态存储(Kafka Streams library中也包含一些可继承接口如KeyValueStore);实践中,尽管用户不需要自定义一个状态存储,但是可以简易地使用Stores工厂来定义一个状态存储,并指定它是否需要持久化、日志备份。在下面的例子中,持久化key-value存储名为“Counts”,key类型为String,value类型为Long。
StateStoreSupplier countStore = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .build(); |
TopologyBuilder.addStateStoremethod when building the processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created state store with the existing processor nodes through
TopologyBuilder.connectProcessorAndStateStores.
利用这些状态存储,开发者能够使用TopologyBuilder.addStateStore方法来建立processor拓扑和本地存储之间的联系;或者使用TopologyBuilder.connectProcessorAndStateStores方法来连接一个已经创建的状态存储和已存在的processor节点。
TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE") // add the created state store "COUNTS" associated with processor "PROCESS1" .addStateStore(countStore, "PROCESS1") .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") // connect the state store "COUNTS" with processor "PROCESS2" .connectProcessorAndStateStores("PROCESS2", "COUNTS"); .addSink("SINK1", "sink-topic1", "PROCESS1") .addSink("SINK2", "sink-topic2", "PROCESS2") .addSink("SINK3", "sink-topic3", "PROCESS3"); |
相关文章推荐
- Java Sound API是javaSE平台提供底层的(low-level)处理声音接口。
- Unity Networking API文档翻译(二):The High Level API
- Low-level Windows API hooks from C# to stop unwanted keystrokes
- Akka(29): Http:Server-Side-Api,Low-Level-Api
- Akka(29): Http:Server-Side-Api,Low-Level-Api
- Akka(29): Http:Server-Side-Api,Low-Level-Api
- Kafka High Level API vs. Low Level API
- Scala版SparkStreaming读写kafka,low level api模板代码存档
- 从Demo到Engine(三) -- Wrap Low Level Graphics API
- PostgreSQL备份之手工备份(Low Level API)
- Low Level Digital Audio API
- 利用low-level API构建的几种DBus工作流程(转)
- Low Level Digital Audio API
- Low Level MIDI API
- Call requires API level 16 (current min is 14): android.view.View#announceForAccessibility问题解决
- 【Error】Call requires API level 3 (current min is 1)解决办法
- 使用ASP.NET 2.0 Profile存储用户信息_1[翻译] Level 200
- java1.8 api 翻译-(001)java.util包下的Collection集合
- Mootools1.41的API翻译工作已经启动了,SVN已经搭建ok,感兴趣的欢迎加入
- google api调用 实现在线翻译