您的位置:首页 > 其它

nifi通过自定义processor将数据流入Elasticsearch

2018-01-02 14:33 239 查看
此文章对自定义processor不做详细讲解。

总体流程如下



其中AddT为自定义processor

AddT流入的数据为json格式流入的字段应与FlowMsgIn中相同



因此流入的数据应如:

{“City”:”“,”param”:”cc”,”Number”:”12345”,”body”:”aa”,”query”:”111”}

数据流入后在rules类中通过一定业务逻辑增加newType字段,在将数据以json格式流出。

在nifi中每一个流是一个flowfile,在flowfile中添加一个名为“jsonOut”attribute(Key-Value)值为添加newType后的json串。



FlowFile:一个FlowFile表示一个对象,这个对象在系统之间进行移动,被传递。NiFi保存一个FlowFile的key-value的属性列表和这个FlowFile的字节内容。

FlowFileProcessor:Processor对FlowFile进行各种操作它可以获取FlowFile的属性和内容,同时操作多个FlowFile的属性和内容,同时操作多个FlowFile,可以将操作结果提交给下一个Processer或者回滚。

数据传出需要将整个的flowfile转换成json格式,所以添加组件AttributeToJson,配置如下



Attribute List:选择转换成json的Attribute。

Destination:选择flowfile-content flowfile-content是将转换后的数据作为整个flowfile输出。

flowfile-Attribute 是将转换后的数据作为flowfile的一个Attribute输出。

从AttributeToJson中流出的数据为

{“jsonOut”:”{\”Number\”:\”P1008982\”,\”City\”:\”\”,\”query\”:\”111\”,\”body\”:\”cc\”,\”param\”:\”ww\”,\”newT

ype\”:\”1\”}”}

如果将该json数据直接流入Elasticsearch得到的为



与预期效果不符,故添加ExecuteScript组件添加脚本在脚本中进行处理,理论上只需要将jsonOut切掉即可,但如果将这样的数据直接传入下一个节点,会产生反斜杠无法识别的错误,故将反斜杠也一并删除。

脚本代码如下:

import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
inputStream.eachLine { line ->
String str=line.toString();
str=str.substring(str.indexOf(":")+2, str.lastIndexOf("}")-1);
String []b =str.split("\"");;
StringBuffer buffer=new StringBuffer();
buffer.append("{");
for(int i=1;i<b.length-1;i++){
String c=b[i].substring(0, b[i].length()-1);
buffer.append("\"");
buffer.append(c);
}
buffer.append("\"");
buffer.append("}");
str=buffer.toString();
outputStream.write("$str".getBytes(StandardCharsets.UTF_8))
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)


连接Elasticsearch5节点,配置index与type。可以成功将数据写入Elasticsearch。

其中Elasticsearch匹配的是Elasticsearch2.x版本,Elasticsearch5匹配Elasticsearch5以上版本。且传入Elasticsearch的数据必须为json格式,若不是json格式则会出现content type is missing的错误。

提供一个简单的自定义processornifi自定义processor
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: