您的位置:首页 > 其它

Low-level Processor API(翻译)

2017-08-22 00:00 375 查看

Processor

Stream processor 是processor topology中的一个节点。 Processor API developers定义任意的流处理器,这些流处理器中包括接收数据的处理器、以及与状态存储相关的处理器,共同组成处理器拓扑,来呈现自定义的处理逻辑。

Processor 接口提供了两个主要的API方法:process和punctuate。process方法是每次在接收数据上的操作;punctuate方法是周期性地对数据进行操作。而且处理能够保持init方法初始化的ProcessorContext实例变量,使用context来调度punctuation周期(context().schedule),指定输出到下游处理器的的key/value对(context().forward),提交当前处理进展(context().commit)。

下面是一个简单wordcount的Processor实现:

public class MyProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {//初始化
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;//context对象

// call this processor's punctuate() method every 1000 milliseconds.//定义punctuate处理周期为1000milliseconds,每次进行Count计算的周期
this.context.schedule(1000);

// retrieve the key-value store named "Counts"//Counts的状态存储,key/value
this.kvStore = (KeyValueStore<String, Long>) context.getStateStore("Counts");
}

@Override
public void process(String dummy, String line) {//对接收的每条数据进行处理,参数dummy为接收数据的key,参数line为接收数据的value
String[] words = line.toLowerCase().split(" ");//切分line,获得words

for (String word : words) {
Long oldValue = this.kvStore.get(word);//根据word为key获取状态存储的value

if (oldValue == null) {
this.kvStore.put(word, 1L);//如果key在状态存储中不存在则,作为第一条key插入,put=1
} else {
this.kvStore.put(word, oldValue + 1L);//如果key在状态存储中存在则,key+1,put+1
}
}
}

@Override
public void punctuate(long timestamp) {//周期性的对状态数据进行处理
KeyValueIterator<String, Long> iter = this.kvStore.all();

while (iter.hasNext()) {//迭代器对状态数据中的进行输出
KeyValue<String, Long> entry = iter.next();
context.forward(entry.key, entry.value.toString());//forward
}

iter.close();
// commit the current processing progress
context.commit();//提交这阶段的操作
}

@Override
public void close() {//close方法
// close any resources managed by this processor.
// Note: Do not close any StateStores as these are managed
// by the library
}
};
In the above implementation, the following actions are performed:

In the
init
method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".init方法中,每秒调度punctuation方法一次,检索名为Counts的本地状态存储。

In the
process
method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).在process方法中,在每次接收到的记录中,切分value字符串为words,更新words在状态存储中的counts。

In the
punctuate
method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.在punctuate方法中,本地存储的迭代器,将聚合counts发送到下游处理器,并且提交本地stream状态。

Processor Topology

With the customized processors defined in the Processor API, developers can use the
TopologyBuilder
to build a processor topology by connecting these processors together:

在Processor API中使用自定义processor时,开发者可以使用TopologyBuilder通过连接这些processor到一起来创建processor topology:

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("SOURCE", "src-topic")
// add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
.addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")

// add "PROCESS2" node which takes "PROCESS1" as its upstream processor
.addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")

// add "PROCESS3" node which takes "PROCESS1" as its upstream processor
.addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")

// add the sink processor node "SINK1" that takes Kafka topic "sink-topic1"
// as output and the "PROCESS1" node as its upstream processor
.addSink("SINK1", "sink-topic1", "PROCESS1")

// add the sink processor node "SINK2" that takes Kafka topic "sink-topic2"
// as output and the "PROCESS2" node as its upstream processor
.addSink("SINK2", "sink-topic2", "PROCESS2")

// add the sink processor node "SINK3" that takes Kafka topic "sink-topic3"
// as output and the "PROCESS3" node as its upstream processor
.addSink("SINK3", "sink-topic3", "PROCESS3");
There are several steps in the above code to build the topology, and here is a quick walk through:

First of all a source node named "SOURCE" is added to the topology using the
addSource
method, with one Kafka topic "src-topic" fed to it.最前面的节点为SOURCE节点,使用addSource方法来定义,指定名为src-topic的topic

Three processor nodes are then added using the
addProcessor
method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.然后添加了三个processor节点,使用addProcessor方法来添加processor节点

Finally three sink nodes are added to complete the topology using the
addSink
method, each piping from a different parent processor node and writing to a separate topic.最终添加了三个sink节点,使用addSink方法来添加sink节点,每个piping来自不同的父处理器节点并写入一个单独的topic。

State Stores

Note that the
Processor
API is not limited to only accessing the current records as they arrive in the
process()
method, but can also maintain processing states that keep recently arrived records to use in stateful processing operations such as windowed joins or aggregation. To take advantage of these states, users can define a state store by implementing the
StateStore
interface (the Kafka Streams library also has a few extended interfaces such as
KeyValueStore
); in practice, though, users usually do not need to customize such a state store from scratch but can simply use the
Stores
factory to define a state store by specifying whether it should be persistent, log-backed, etc. In the following example, a persistent key-value store named "Counts" with key type
String
and value type
Long
is created.

注意Processor
API不但能通过process方法接收数据,也能保存这些接收记录的执行状态,并用于诸如windowed joins或者aggregation之类的状态执行操作。为了使用这些状态,用户通过实现StateStore接口来定义一个状态存储(Kafka Streams library中也包含一些可继承接口如KeyValueStore);实践中,尽管用户不需要自定义一个状态存储,但是可以简易地使用Stores工厂来定义一个状态存储,并指定它是否需要持久化、日志备份。在下面的例子中,持久化key-value存储名为“Counts”,key类型为String,value类型为Long。

StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
To take advantage of these state stores, developers can use the
TopologyBuilder.addStateStore
method when building the processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created state store with the existing processor nodes through
TopologyBuilder.connectProcessorAndStateStores
.

利用这些状态存储,开发者能够使用TopologyBuilder.addStateStore方法来建立processor拓扑和本地存储之间的联系;或者使用TopologyBuilder.connectProcessorAndStateStores方法来连接一个已经创建的状态存储和已存在的processor节点。

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("SOURCE", "src-topic")

.addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
// add the created state store "COUNTS" associated with processor "PROCESS1"
.addStateStore(countStore, "PROCESS1")
.addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

// connect the state store "COUNTS" with processor "PROCESS2"
.connectProcessorAndStateStores("PROCESS2", "COUNTS");

.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: