nifi通过自定义processor将数据流入Elasticsearch
2018-01-02 14:33
239 查看
此文章对自定义processor不做详细讲解。
总体流程如下
其中AddT为自定义processor
AddT流入的数据为json格式流入的字段应与FlowMsgIn中相同
因此流入的数据应如:
{“City”:”“,”param”:”cc”,”Number”:”12345”,”body”:”aa”,”query”:”111”}
数据流入后在rules类中通过一定业务逻辑增加newType字段,在将数据以json格式流出。
在nifi中每一个流是一个flowfile,在flowfile中添加一个名为“jsonOut”attribute(Key-Value)值为添加newType后的json串。
FlowFile:一个FlowFile表示一个对象,这个对象在系统之间进行移动,被传递。NiFi保存一个FlowFile的key-value的属性列表和这个FlowFile的字节内容。
FlowFileProcessor:Processor对FlowFile进行各种操作它可以获取FlowFile的属性和内容,同时操作多个FlowFile的属性和内容,同时操作多个FlowFile,可以将操作结果提交给下一个Processer或者回滚。
数据传出需要将整个的flowfile转换成json格式,所以添加组件AttributeToJson,配置如下
Attribute List:选择转换成json的Attribute。
Destination:选择flowfile-content flowfile-content是将转换后的数据作为整个flowfile输出。
flowfile-Attribute 是将转换后的数据作为flowfile的一个Attribute输出。
从AttributeToJson中流出的数据为
{“jsonOut”:”{\”Number\”:\”P1008982\”,\”City\”:\”\”,\”query\”:\”111\”,\”body\”:\”cc\”,\”param\”:\”ww\”,\”newT
ype\”:\”1\”}”}
如果将该json数据直接流入Elasticsearch得到的为
与预期效果不符,故添加ExecuteScript组件添加脚本在脚本中进行处理,理论上只需要将jsonOut切掉即可,但如果将这样的数据直接传入下一个节点,会产生反斜杠无法识别的错误,故将反斜杠也一并删除。
脚本代码如下:
连接Elasticsearch5节点,配置index与type。可以成功将数据写入Elasticsearch。
其中Elasticsearch匹配的是Elasticsearch2.x版本,Elasticsearch5匹配Elasticsearch5以上版本。且传入Elasticsearch的数据必须为json格式,若不是json格式则会出现content type is missing的错误。
提供一个简单的自定义processornifi自定义processor
总体流程如下
其中AddT为自定义processor
AddT流入的数据为json格式流入的字段应与FlowMsgIn中相同
因此流入的数据应如:
{“City”:”“,”param”:”cc”,”Number”:”12345”,”body”:”aa”,”query”:”111”}
数据流入后在rules类中通过一定业务逻辑增加newType字段,在将数据以json格式流出。
在nifi中每一个流是一个flowfile,在flowfile中添加一个名为“jsonOut”attribute(Key-Value)值为添加newType后的json串。
FlowFile:一个FlowFile表示一个对象,这个对象在系统之间进行移动,被传递。NiFi保存一个FlowFile的key-value的属性列表和这个FlowFile的字节内容。
FlowFileProcessor:Processor对FlowFile进行各种操作它可以获取FlowFile的属性和内容,同时操作多个FlowFile的属性和内容,同时操作多个FlowFile,可以将操作结果提交给下一个Processer或者回滚。
数据传出需要将整个的flowfile转换成json格式,所以添加组件AttributeToJson,配置如下
Attribute List:选择转换成json的Attribute。
Destination:选择flowfile-content flowfile-content是将转换后的数据作为整个flowfile输出。
flowfile-Attribute 是将转换后的数据作为flowfile的一个Attribute输出。
从AttributeToJson中流出的数据为
{“jsonOut”:”{\”Number\”:\”P1008982\”,\”City\”:\”\”,\”query\”:\”111\”,\”body\”:\”cc\”,\”param\”:\”ww\”,\”newT
ype\”:\”1\”}”}
如果将该json数据直接流入Elasticsearch得到的为
与预期效果不符,故添加ExecuteScript组件添加脚本在脚本中进行处理,理论上只需要将jsonOut切掉即可,但如果将这样的数据直接传入下一个节点,会产生反斜杠无法识别的错误,故将反斜杠也一并删除。
脚本代码如下:
import java.nio.charset.StandardCharsets def flowFile = session.get() if(!flowFile) return flowFile = session.write(flowFile, {inputStream, outputStream -> inputStream.eachLine { line -> String str=line.toString(); str=str.substring(str.indexOf(":")+2, str.lastIndexOf("}")-1); String []b =str.split("\"");; StringBuffer buffer=new StringBuffer(); buffer.append("{"); for(int i=1;i<b.length-1;i++){ String c=b[i].substring(0, b[i].length()-1); buffer.append("\""); buffer.append(c); } buffer.append("\""); buffer.append("}"); str=buffer.toString(); outputStream.write("$str".getBytes(StandardCharsets.UTF_8)) } } as StreamCallback) session.transfer(flowFile, REL_SUCCESS)
连接Elasticsearch5节点,配置index与type。可以成功将数据写入Elasticsearch。
其中Elasticsearch匹配的是Elasticsearch2.x版本,Elasticsearch5匹配Elasticsearch5以上版本。且传入Elasticsearch的数据必须为json格式,若不是json格式则会出现content type is missing的错误。
提供一个简单的自定义processornifi自定义processor
相关文章推荐
- SpringMvc通过自定义注解在方法的参数中注入数据
- 通过python使用游标查询Elasticsearch数据并写入文件
- 对:通过HBase Observer同步数据到ElasticSearch的使用情况
- 通过HBase Observer同步数据到ElasticSearch
- elasticsearch 通过HTTP RESTful API 操作数据
- ios通过SQLite自定义缓存数据
- 对:通过HBase Observer同步数据到ElasticSearch的使用情况
- 【干货】如何通过OPC自定义接口来实现客户端数据的读取?
- 通过HBase Observer同步数据到ElasticSearch
- ListView通过自定义适配器来显示数据并对Item项以及子view项的控件实现监听
- sql 通过存储过程和自定义类型批量新增数据
- 一种不通过UI给C4C自定义BO创建测试数据的方式
- 处理模型——通过定义一个自定义的TypeWriter和TypeReader直接处理顶点位置数据
- 对:通过HBase Observer同步数据到ElasticSearch的使用情况
- 通过smali注入的方式插入自定义代码来监控app的行为数据
- 通过HBase Observer同步数据到ElasticSearch
- ListView通过自定义适配器来显示数据并对Item项以及子view项的控件实现监听.
- 通过自定义数据绑定类实现MVC中图片上传
- php通过header发送自定义数据方法
- 【第26篇】通过Gson解析微信自定义菜单多个Button的数组数据