flume1.6自定义elasticsearch2.3的sink插件
2017-12-01 10:20
211 查看
目前官方Flume最新的版本仍然不支持Elasticsearch2.x版本,flume的老版本(1.6)只支持Elasticsearch1.7.x的版本,因为Elasticsearch2.x版本做了比较大的改动,很多API都已经废弃不用了。
在github上找了一个项目:https://github.com/lucidfrontier45/ElasticsearchSink2 也是按照flume源码中sink的模式编写的,其实也很简单。最近,公司有个项目也是需要将日志手机到es上,供实时查询,索性自己写了一个。
一、整体架构:
1、线上业务代码将日志数据发送到kafka,数据结构是pb,kafka中存放的是byte流;
2、flume-source对接kafka,自定义一个intercept,将byte流解成结构化数据;
3、自定义es-sink,将结构化数据存放到es中。
二、代码:
1、pb:
2、线上发送数据到kafka:
3、日志实体类:
4、flume-intercept:
6、esSink:
三、flume配置:
1)将上面的代码打包,放到flume的lib下
2)flume的配置如下:
3)注意事项:
flume1.6和es2.3版本有jar冲突问题,在编写flume-es-sink时,需要引入:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.3.2</version>
</dependency> 然后发现会有java.lang.NoSuchMethodError:com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor 报错,和 Exception in thread "main" java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW 两个问题,解决方法,在maven中加上
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency> 和
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.2</version>
</dependency>
同时,在把jar包放到flume的lib中,启动flume也会报上面的错误,这时需要把flume的lib目录下guava-11.0.2.jar和jackson-core-2.3.1.jar 两个包去掉。
具体可以参考:http://blog.csdn.net/liuxiao723846/article/details/78531916
参考文档:
http://tech.lede.com/2017/02/08/rd/server/flumeToEs/
在github上找了一个项目:https://github.com/lucidfrontier45/ElasticsearchSink2 也是按照flume源码中sink的模式编写的,其实也很简单。最近,公司有个项目也是需要将日志手机到es上,供实时查询,索性自己写了一个。
一、整体架构:
1、线上业务代码将日志数据发送到kafka,数据结构是pb,kafka中存放的是byte流;
2、flume-source对接kafka,自定义一个intercept,将byte流解成结构化数据;
3、自定义es-sink,将结构化数据存放到es中。
二、代码:
1、pb:
option java_package = "XXX.base.proto"; option java_outer_classname="ApiLogPB"; message ApiLog { optional string puid = 1; optional string uId = 2; optional string reqId = 3; optional int32 fNum = 4; optional int32 cost = 5; optional string chId = 6 [ default = "default"]; optional string strategy = 7; repeated int64 recId = 8; optional string txt = 9; optional string vedio = 10; optional string gallery = 11; optional string pMap = 12; optional string paramMap = 13; }
2、线上发送数据到kafka:
public Integer call() throws Exception { Builder newBuilder = ApiLogPB.ApiLog.newBuilder(); newBuilder.setReqId(reqId); newBuilder.setUId(uId); newBuilder.setPuid(puid); newBuilder.setChId(channelId); newBuilder.setFNum(fNum); newBuilder.setCost(Integer.parseInt((System.currentTimeMillis()-a)+"")); newBuilder.setStrategy(personalResponse.getStrategy()); List<Long> recIdSet = personalResponse.getRecIdSet(); for (Long fid:recIdSet) { newBuilder.addRecId(fid); } Map<String, List<Long>> typeFeeds = personalResponse.getTypeFeeds(); List<Long> newList = typeFeeds.get("NEWS"); newBuilder.setTxt(newList==null ? "":newList.toString()); List<Long> garleryList = typeFeeds.get("GALLERY"); newBuilder.setGallery(garleryList==null ? "":garleryList.toString()); List<Long> vedioList = typeFeeds.get("VIDEO"); newBuilder.setVedio(vedioList==null?"":vedioList.toString()); Map<String, List<Long>> mutilFeeds = personalResponse.getMutilFeeds(); StringBuilder pSb = new StringBuilder(); Set<Entry<String, List<Long>>> entrySet = mutilFeeds.entrySet(); int i = 0; for (Entry<String, List<Long>> engry : entrySet) { String key = engry.getKey(); List<Long> value = engry.getValue(); if (CollectionUtils.isNotEmpty(value)) { String v1 = StringUtils.strip(value.toString(),"[]"); pSb.append(key).append(":").append(v1); if (i<entrySet.size()-1) { pSb.append(";"); } i++; } } newBuilder.setPMap(pSb.toString()); //newBuilder.setPMap(mutilFeeds.toString()); newBuilder.setParamMap(paraMap.toString()); KafkaDao.getInstance().sendMessage(uId,newBuilder.build().toByteArray()); return null; }
3、日志实体类:
package XXX.log.common.entity; import java.util.HashMap; import java.util.List; import java.util.Map; import com.google.protobuf.InvalidProtocolBufferException; import com.iqiyi.ttbrain.base.proto.ApiLogPB; import com.iqiyi.ttbrain.base.proto.ApiLogPB.ApiLog; public class ApiEntity { public static final int P_N_SIZE = 50; private static final String flag = "\t"; private String reqId=""; private String uid=""; private String ppuid=""; private String channel=""; private int feedNum=10; private int cost=0; private String strategy=""; private String timeStamp=""; private String host=""; private String recFeedId=""; private String txt=""; private String gallery=""; private String vedio=""; private String pMap = ""; private String paraMap = ""; @Override public String toString() { StringBuilder sb = new StringBuilder(768); sb.append(reqId).append(flag).append(uid).append(flag).append(ppuid).append(flag).append(channel).append(flag) .append(feedNum).append(flag).append(cost).append(flag).append(strategy).append(flag) .append(timeStamp).append(flag).append(host).append(flag) .append(recFeedId).append(flag) .append(txt).append(flag).append(gallery).append(flag).append(vedio).append(flag) .append(pMap).append(flag) .append(paraMap).append(flag) .append("end"); return sb.toString(); } public static ApiEntity parseFromPB(byte[] bytes) throws InvalidProtocolBufferException{ ApiEntity an = null; if (bytes != null) { an = new ApiEntity(); ApiLog apiLog = ApiLogPB.ApiLog.parseFrom(bytes); an.setReqId(apiLog.getReqId()); an.setUid(apiLog.getUId()); an.setPpuid(apiLog.getPuid()); an.setChannel(apiLog.getChId()); an.setFeedNum(apiLog.getFNum()); an.setCost(apiLog.getCost()); an.setStrategy(apiLog.getStrategy()); StringBuilder sb = new StringBuilder(); List<Long> recIdList = apiLog.getRecIdList(); for (int i=0;i<recIdList.size();i++) { sb.append(recIdList.get(i)); if (i<recIdList.size()-1) { sb.append(","); } } an.setRecFeedId(sb.toString()); an.setTxt(apiLog.getTxt()); an.setGallery(apiLog.getGallery()); an.setVedio(apiLog.getVedio()); an.setParaMap(apiLog.getParamMap()); an.setpMap(apiLog.getPMap()); } return an; } public static Map<String,Object> parseToMap(String line) { Map<String,Object> map = new HashMap<>(); if (line != null) { String[] split = line.split(flag); if (split.length>=15) { map.put("reqId", split[0]); map.put("uid", split[1]); map.put("ppuid", split[2]); map.put("channel", split[3]); map.put("feedNum", split[4]); map.put("cost", split[5]); map.put("strategy", split[6]); map.put("timeStamp", split[7]); map.put("host", split[8]); map.put("recFeedId", split[9]); map.put("txt", split[10]); map.put("gallery", split[11]); map.put("vedio", split[12]); map.put("pMap", split[13]); map.put("paraMap", split[14]); } } /*BeanMap beanMap = BeanMap.create(an); for (Object key : beanMap.keySet()) { map.put((String)key, beanMap.get(key)); }*/ return map; } public static void main(String...strings) { ApiEntity le = new ApiEntity(); le.setChannel("default"); le.setParaMap("sdfsdfdsfdsfds"); le.setpMap("12321321321321"); String line = le.toString(); System.out.println(line); String[] split = line.split(flag); System.out.println(split.length); Map<String, Object> parseToMap = parseToMap(line); System.out.println(parseToMap); System.out.println(split[15]); } }
4、flume-intercept:
package XXX.log.flume.interceptor; import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.iqiyi.ttbrain.log.common.entity.ApiEntity; public class ApiInterceptor implements Interceptor { private static final Logger logger = LoggerFactory.getLogger(ApiInterceptor.class); @Override public void close() { // TODO Auto-generated method stub logger.info("flume ApiInterceptor is close"); } @Override public void initialize() { // TODO Auto-generated method stub logger.info("flume ApiInterceptor is initialize"); } @Override public Event intercept(Event event) { try { Map<String, String> headers = event.getHeaders(); byte[] body = event.getBody(); if (body != null) { ApiEntity apiLog = null; try{ apiLog = ApiEntity.parseFromPB(body); }catch(Exception e){ logger.info("apiLog:{}",apiLog); } if (apiLog != null) { String hostName = headers.get("hostname"); String timeStamp = headers.get("timestamp"); apiLog.setHost(hostName); apiLog.setTimeStamp(timeStamp); // logger.info(apiLog.toString()); event.setBody(apiLog.toString().getBytes()); return event; } } } catch (Exception e ) { logger.error("intercept:",e); } return null; } @Override public List<Event> intercept(List<Event> events) { List<Event> intercepted = Lists.newArrayListWithCapacity(events.size()); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { intercepted.add(interceptedEvent); } } return intercepted; } public static class Builder implements Interceptor.Builder { //使用Builder初始化Interceptor @Override public Interceptor build() { return new ApiInterceptor(); } @Override public void configure(Context arg0) { // TODO Auto-generated method stub } } }5、esDao工具类:
package XXX.log.flume.sink.db; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EsDao { private static final Logger logger = LoggerFactory.getLogger(EsDao.class); // ip:port private static final String clusterHost = "1.1.1.1:9300"; private static final String clusterName = "test"; private static TransportClient transportClient = null; static { Settings settings = Settings.settingsBuilder() .put("cluster.name", clusterName).build(); transportClient = TransportClient.builder().settings(settings).build(); String[] hostNames = clusterHost.split(","); InetSocketTransportAddress[] serverAddresses = new InetSocketTransportAddress[hostNames.length]; for (int i = 0; i < hostNames.length; i++) { String[] hostPort = hostNames[i].trim().split(":"); String host = hostPort[0].trim(); int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1] .trim()) : 9300; serverAddresses[i] = new InetSocketTransportAddress( new InetSocketAddress(host, port)); } for (InetSocketTransportAddress host : serverAddresses) { transportClient.addTransportAddress(host); } } private EsDao() { } public static void closeClient() { transportClient.close(); } /** * bulk——在一个请求中添加、更新和删除多个文档 * @param indexName * @param indexType * @param datas * @return */ public static boolean bulk(String indexName,String indexType,List<Map<String,Object>> datas){ BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk(); for (Map<String,Object> map: datas) { bulkRequestBuilder.add(transportClient.prepareIndex(indexName, indexType).setSource(map)); } BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); return bulkResponse.hasFailures(); } }
6、esSink:
package XXX.log.flume.sink; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.iqiyi.ttbrain.log.flume.sink.db.EsDao; public class EsSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(EsSink.class); private String indexName = "api"; private String indexType = "api_type"; private int batchSize = 100; public EsSink() { logger.info("EsSink start..."); } @Override public void start() { super.start(); } @Override public void stop() { super.stop(); //DataSourceUtils.closeDs(); } @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Transaction transaction = null; Event event = null; String content = ""; List<Map<String,Object>> actions = Lists.newArrayList(); try { //flume Channel channel = getChannel(); transaction = channel.getTransaction(); transaction.begin(); for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event == null) { result = Status.BACKOFF; break; } else { content = new String(event.getBody(),"UTF-8"); String[] split = content.split("\t"); if (split.length < 15) { continue; } //logger.info(content); actions.add(generateMap(content)); } } if (actions.size() > 0) { //logger.info(actions.get(0)); EsDao.bulk(indexName, indexType, actions); } transaction.commit(); } catch (Throwable e) { /*try { if (transaction != null) { transaction.rollback(); } } catch (Exception e2) { logger.error("flume transaction rollback error.", e2); }*/ logger.error("Failed to commit flume transaction," +"Transaction rolled back.", e); } finally { if (transaction != null) { transaction.close(); } } return result; } @Override public void configure(Context context) { } private static Map<String,Object> generateMap(String line) { Map<String,Object> map = new HashMap<>(); if (line != null) { String[] split = line.split("\t"); if (split.length>=15) { map.put("reqId", split[0]); map.put("uid", split[1]); map.put("ppuid", split[2]); map.put("channel", split[3]); map.put("feedNum", split[4]); map.put("cost", split[5]); map.put("strategy", split[6]); map.put("timeStamp", split[7]); map.put("host", split[8]); map.put("recFeedId", split[9]); map.put("txt", split[10]); map.put("gallery", split[11]); map.put("vedio", split[12]); map.put("pMap", split[13]); map.put("paraMap", split[14]); } } /*BeanMap beanMap = BeanMap.create(an); for (Object key : beanMap.keySet()) { map.put((String)key, beanMap.get(key)); }*/ return map; } }
三、flume配置:
1)将上面的代码打包,放到flume的lib下
2)flume的配置如下:
agent1.sources = logsource agent1.channels = mc1 mc2 agent1.sinks = avro-sink sink2 agent1.sources.logsource.channels = mc1 mc2 agent1.sinks.avro-sink.channel = mc1 agent1.sinks.sink2.channel = mc2 #source agent1.sources.logsource.type = org.apache.flume.source.kafka.KafkaSource agent1.sources.logsource.zookeeperConnect = ttAlgorithm-kafka-online001-jyltqbs.qiyi.virtual:2181,ttAlgorithm-kafka-online002-jyltqbs.qiyi.virtual:2181,ttAlgorithm-kafka-online003-jyltqbs.qiyi.virtual:2181,ttAlgorithm-kafka-online004-jyltqbs.qiyi.virtual:2181,ttAlgorithm-kafka-online005-jyltqbs.qiyi.virtual:2181 agent1.sources.logsource.topic = topic_predict agent1.sources.logsource.groupId = flume agent1.sources.logsource.kafka.consumer.timeout.ms = 100 #interceptor agent1.sources.logsource.interceptors=filt2 filt3 filt4 agent1.sources.logsource.interceptors.filt2.type=host agent1.sources.logsource.interceptors.filt2.hostHeader=hostname agent1.sources.logsource.interceptors.filt2.useIP=true agent1.sources.logsource.interceptors.filt3.type=timestamp agent1.sources.logsource.interceptors.filt4.type=com.iqiyi.ttbrain.log.flume.interceptor.PredictInterceptor$Builder agent1.sources.logsource.selector.type = multiplexing agent1.sources.logsource.selector.header = isCool agent1.sources.logsource.selector.mapping.0 = mc1 agent1.sources.logsource.selector.mapping.1 = mc2 agent1.sources.logsource.selector.default = mc1 #channel1 agent1.channels.mc1.type = memory agent1.channels.mc1.capacity = 10000 agent1.channels.mc1.transactionCapacity = 10000 agent1.channels.mc1.keep-alive = 60 #channel2 agent1.channels.mc2.type = memory agent1.channels.mc2.capacity = 10000 agent1.channels.mc2.transactionCapacity = 10000 agent1.channels.mc2.keep-alive = 60 #sink1 #agent1.sinks.avro-sink.type = file_roll #agent1.sinks.avro-sink.sink.directory = /data/mysink #agent1.sinks.avro-sink.sink.rollInterval = 10000000 agent1.sinks.avro-sink.type = hdfs agent1.sinks.avro-sink.hdfs.path = hdfs://hadoop-jy-namenode/data/qytt/flume/ttengine_predict/dt=%Y-%m-%d/hour=%H/ agent1.sinks.avro-sink.hdfs.writeFormat = Text agent1.sinks.avro-sink.hdfs.fileType = DataStream agent1.sinks.avro-sink.hdfs.fileSuffix = .log agent1.sinks.avro-sink.hdfs.filePrefix = %Y-%m-%d_%H agent1.sinks.avro-sink.hdfs.rollInterval = 3600 agent1.sinks.avro-sink.hdfs.rollSize = 0 agent1.sinks.avro-sink.hdfs.rollCount = 0 agent1.sinks.avro-sink.hdfs.batchSize = 1000 agent1.sinks.avro-sink.hdfs.callTimeout = 60000 agent1.sinks.avro-sink.hdfs.appendTimeout = 60000 #sink2 agent1.sinks.sink2.type = hdfs agent1.sinks.sink2.hdfs.path = hdfs://hadoop-jy-namenode/data/qytt/flume/ttengine_predict_cool_start/dt=%Y-%m-%d/hour=%H/ agent1.sinks.sink2.hdfs.writeFormat = Text agent1.sinks.sink2.hdfs.fileType = DataStream agent1.sinks.sink2.hdfs.fileSuffix = .log agent1.sinks.sink2.hdfs.filePrefix = %Y-%m-%d_%H agent1.sinks.sink2.hdfs.rollInterval = 3600 agent1.sinks.sink2.hdfs.rollSize = 0 agent1.sinks.sink2.hdfs.rollCount = 0 agent1.sinks.sink2.hdfs.batchSize = 1000 agent1.sinks.sink2.hdfs.callTimeout = 60000 agent1.sinks.sink2.hdfs.appendTimeout = 60000
3)注意事项:
flume1.6和es2.3版本有jar冲突问题,在编写flume-es-sink时,需要引入:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.3.2</version>
</dependency> 然后发现会有java.lang.NoSuchMethodError:com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor 报错,和 Exception in thread "main" java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW 两个问题,解决方法,在maven中加上
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency> 和
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.2</version>
</dependency>
同时,在把jar包放到flume的lib中,启动flume也会报上面的错误,这时需要把flume的lib目录下guava-11.0.2.jar和jackson-core-2.3.1.jar 两个包去掉。
具体可以参考:http://blog.csdn.net/liuxiao723846/article/details/78531916
参考文档:
http://tech.lede.com/2017/02/08/rd/server/flumeToEs/
相关文章推荐
- Hadoop实战-Flume之自定义Sink(十九)
- Flume(ng) 自定义sink实现和属性注入
- flume-ng 自定义sink消费flume source
- flume的hdfssink自定义EventSerializer序列化类
- flume高并发优化——(10)消灭elasticsearch sink多次插入
- Flume自定义Source、Sink和Interceptor(简单功能实现)
- flume自定义sink
- ElasticSearch 5.4 自定义插件
- ElasticSearch自定义分析器-集成结巴分词插件
- Elasticsearch 实现自定义排序插件
- ElasticSearch自定义分析器-集成结巴分词插件
- flume 自定义Sink之kafkaSink
- flume 自定义sink
- Flume-ng 自定义sink实现和属性注入
- flume自定义 file sink 以本地时间,event数据为目录
- flume自定义Serializer收集日志入elasticsearch
- flume高并发优化——(10)消灭elasticsearch sink多次插入
- flume 自定义kafka sink运行失败:找不到Callback
- 自定义FlumeKafkaSink
- 【自定义IK词典】Elasticsearch之中文分词器插件es-ik的自定义词库