您的位置:首页 > 其它

flume1.6自定义elasticsearch2.3的sink插件

2017-12-01 10:20 211 查看
目前官方Flume最新的版本仍然不支持Elasticsearch2.x版本,flume的老版本(1.6)只支持Elasticsearch1.7.x的版本,因为Elasticsearch2.x版本做了比较大的改动,很多API都已经废弃不用了。

在github上找了一个项目:https://github.com/lucidfrontier45/ElasticsearchSink2  也是按照flume源码中sink的模式编写的,其实也很简单。最近,公司有个项目也是需要将日志手机到es上,供实时查询,索性自己写了一个。

一、整体架构:

1、线上业务代码将日志数据发送到kafka,数据结构是pb,kafka中存放的是byte流;

2、flume-source对接kafka,自定义一个intercept,将byte流解成结构化数据;

3、自定义es-sink,将结构化数据存放到es中。

二、代码:

1、pb:

option java_package = "XXX.base.proto";
option java_outer_classname="ApiLogPB";

message ApiLog {
optional string puid = 1;
optional string uId = 2;
optional string reqId = 3;
optional int32 fNum = 4;
optional int32 cost = 5;
optional string chId = 6 [ default = "default"];
optional string strategy = 7;
repeated int64 recId = 8;
optional string txt = 9;
optional string vedio = 10;
optional string gallery = 11;
optional string pMap = 12;
optional string paramMap = 13;
}


2、线上发送数据到kafka:

public Integer call() throws Exception {
Builder newBuilder = ApiLogPB.ApiLog.newBuilder();

newBuilder.setReqId(reqId);
newBuilder.setUId(uId);
newBuilder.setPuid(puid);
newBuilder.setChId(channelId);
newBuilder.setFNum(fNum);
newBuilder.setCost(Integer.parseInt((System.currentTimeMillis()-a)+""));
newBuilder.setStrategy(personalResponse.getStrategy());

List<Long> recIdSet = personalResponse.getRecIdSet();
for (Long fid:recIdSet) {
newBuilder.addRecId(fid);
}

Map<String, List<Long>> typeFeeds = personalResponse.getTypeFeeds();
List<Long> newList = typeFeeds.get("NEWS");
newBuilder.setTxt(newList==null ? "":newList.toString());
List<Long> garleryList = typeFeeds.get("GALLERY");
newBuilder.setGallery(garleryList==null ? "":garleryList.toString());
List<Long> vedioList = typeFeeds.get("VIDEO");
newBuilder.setVedio(vedioList==null?"":vedioList.toString());

Map<String, List<Long>> mutilFeeds = personalResponse.getMutilFeeds();
StringBuilder pSb = new StringBuilder();
Set<Entry<String, List<Long>>> entrySet = mutilFeeds.entrySet();
int i = 0;
for (Entry<String, List<Long>> engry : entrySet) {
String key = engry.getKey();
List<Long> value = engry.getValue();
if (CollectionUtils.isNotEmpty(value)) {
String v1 = StringUtils.strip(value.toString(),"[]");

pSb.append(key).append(":").append(v1);
if (i<entrySet.size()-1) {
pSb.append(";");
}
i++;
}
}
newBuilder.setPMap(pSb.toString());
//newBuilder.setPMap(mutilFeeds.toString());

newBuilder.setParamMap(paraMap.toString());

KafkaDao.getInstance().sendMessage(uId,newBuilder.build().toByteArray());
return null;
}


3、日志实体类:

package XXX.log.common.entity;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.protobuf.InvalidProtocolBufferException;
import com.iqiyi.ttbrain.base.proto.ApiLogPB;
import com.iqiyi.ttbrain.base.proto.ApiLogPB.ApiLog;

public class ApiEntity {
public static final int P_N_SIZE = 50;
private static final String flag = "\t";

private String reqId="";
private String uid="";
private String ppuid="";
private String channel="";
private int feedNum=10;
private int cost=0;
private String strategy="";

private String timeStamp="";
private String host="";

private String recFeedId="";
private String txt="";
private String gallery="";
private String vedio="";

private String pMap = "";
private String paraMap = "";

@Override
public String toString() {
StringBuilder sb = new StringBuilder(768);
sb.append(reqId).append(flag).append(uid).append(flag).append(ppuid).append(flag).append(channel).append(flag)
.append(feedNum).append(flag).append(cost).append(flag).append(strategy).append(flag)

.append(timeStamp).append(flag).append(host).append(flag)

.append(recFeedId).append(flag)
.append(txt).append(flag).append(gallery).append(flag).append(vedio).append(flag)

.append(pMap).append(flag)
.append(paraMap).append(flag)

.append("end");

return sb.toString();
}

public static ApiEntity parseFromPB(byte[] bytes) throws InvalidProtocolBufferException{
ApiEntity an = null;
if (bytes != null) {
an = new ApiEntity();
ApiLog apiLog = ApiLogPB.ApiLog.parseFrom(bytes);

an.setReqId(apiLog.getReqId());
an.setUid(apiLog.getUId());
an.setPpuid(apiLog.getPuid());
an.setChannel(apiLog.getChId());
an.setFeedNum(apiLog.getFNum());
an.setCost(apiLog.getCost());
an.setStrategy(apiLog.getStrategy());

StringBuilder sb = new StringBuilder();
List<Long> recIdList = apiLog.getRecIdList();
for (int i=0;i<recIdList.size();i++) {
sb.append(recIdList.get(i));
if (i<recIdList.size()-1) {
sb.append(",");
}
}
an.setRecFeedId(sb.toString());

an.setTxt(apiLog.getTxt());
an.setGallery(apiLog.getGallery());
an.setVedio(apiLog.getVedio());

an.setParaMap(apiLog.getParamMap());
an.setpMap(apiLog.getPMap());
}

return an;
}

public static Map<String,Object> parseToMap(String line) {
Map<String,Object> map = new HashMap<>();
if (line != null) {
String[] split = line.split(flag);
if (split.length>=15) {
map.put("reqId", split[0]);
map.put("uid", split[1]);
map.put("ppuid", split[2]);
map.put("channel", split[3]);
map.put("feedNum", split[4]);
map.put("cost", split[5]);
map.put("strategy", split[6]);
map.put("timeStamp", split[7]);
map.put("host", split[8]);
map.put("recFeedId", split[9]);
map.put("txt", split[10]);
map.put("gallery", split[11]);
map.put("vedio", split[12]);
map.put("pMap", split[13]);
map.put("paraMap", split[14]);
}
}
/*BeanMap beanMap = BeanMap.create(an);
for (Object key : beanMap.keySet()) {
map.put((String)key, beanMap.get(key));
}*/

return map;
}

public static void main(String...strings) {
ApiEntity le = new ApiEntity();
le.setChannel("default");
le.setParaMap("sdfsdfdsfdsfds");
le.setpMap("12321321321321");
String line = le.toString();
System.out.println(line);

String[] split = line.split(flag);
System.out.println(split.length);

Map<String, Object> parseToMap = parseToMap(line);
System.out.println(parseToMap);

System.out.println(split[15]);
}
}


4、flume-intercept:

package XXX.log.flume.interceptor;

import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.iqiyi.ttbrain.log.common.entity.ApiEntity;

public class ApiInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(ApiInterceptor.class);

@Override
public void close() {
// TODO Auto-generated method stub
logger.info("flume ApiInterceptor is close");
}

@Override
public void initialize() {
// TODO Auto-generated method stub
logger.info("flume ApiInterceptor is initialize");
}

@Override
public Event intercept(Event event) {
try {
Map<String, String> headers = event.getHeaders();
byte[] body = event.getBody();
if (body != null) {
ApiEntity apiLog = null;
try{
apiLog = ApiEntity.parseFromPB(body);
}catch(Exception e){
logger.info("apiLog:{}",apiLog);
}
if (apiLog != null) {
String hostName = headers.get("hostname");
String timeStamp = headers.get("timestamp");
apiLog.setHost(hostName);
apiLog.setTimeStamp(timeStamp);
//					logger.info(apiLog.toString());
event.setBody(apiLog.toString().getBytes());
return event;
}
}
} catch (Exception e ) {
logger.error("intercept:",e);
}
return null;
}

@Override
public List<Event> intercept(List<Event> events) {
List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
for (Event event : events) {
Event interceptedEvent = intercept(event);
if (interceptedEvent != null) {
intercepted.add(interceptedEvent);
}
}
return intercepted;
}

public static class Builder implements Interceptor.Builder {
//使用Builder初始化Interceptor
@Override
public Interceptor build() {
return new ApiInterceptor();
}

@Override
public void configure(Context arg0) {
// TODO Auto-generated method stub

}
}
}
5、esDao工具类:

package XXX.log.flume.sink.db;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EsDao {
private static final Logger logger = LoggerFactory.getLogger(EsDao.class);
// ip:port
private static final String clusterHost = "1.1.1.1:9300";
private static final String clusterName = "test";

private static TransportClient transportClient = null;

static {
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName).build();

transportClient = TransportClient.builder().settings(settings).build();

String[] hostNames = clusterHost.split(",");
InetSocketTransportAddress[] serverAddresses = new InetSocketTransportAddress[hostNames.length];
for (int i = 0; i < hostNames.length; i++) {
String[] hostPort = hostNames[i].trim().split(":");
String host = hostPort[0].trim();
int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1]
.trim()) : 9300;
serverAddresses[i] = new InetSocketTransportAddress(
new InetSocketAddress(host, port));
}

for (InetSocketTransportAddress host : serverAddresses) {
transportClient.addTransportAddress(host);
}

}

private EsDao() {
}

public static void closeClient() {
transportClient.close();
}

/**
* bulk——在一个请求中添加、更新和删除多个文档
* @param indexName
* @param indexType
* @param datas
* @return
*/
public static boolean bulk(String indexName,String indexType,List<Map<String,Object>> datas){
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
for (Map<String,Object> map: datas) {
bulkRequestBuilder.add(transportClient.prepareIndex(indexName,
indexType).setSource(map));
}
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();

return bulkResponse.hasFailures();
}
}


6、esSink:

package XXX.log.flume.sink;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.iqiyi.ttbrain.log.flume.sink.db.EsDao;

public class EsSink extends AbstractSink implements Configurable {

private static final Logger logger = LoggerFactory.getLogger(EsSink.class);

private String indexName = "api";
private String indexType =  "api_type";
private int batchSize = 100;

public EsSink() {
logger.info("EsSink start...");
}

@Override
public void start() {
super.start();

}

@Override
public void stop() {
super.stop();

//DataSourceUtils.closeDs();
}

@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Transaction transaction = null;
Event event = null;
String content = "";
List<Map<String,Object>> actions = Lists.newArrayList();

try {
//flume
Channel channel = getChannel();
transaction = channel.getTransaction();
transaction.begin();

for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event == null) {
result = Status.BACKOFF;
break;
} else {
content = new String(event.getBody(),"UTF-8");
String[] split = content.split("\t");
if (split.length < 15) {
continue;
}
//logger.info(content);
actions.add(generateMap(content));
}
}

if (actions.size() > 0) {
//logger.info(actions.get(0));
EsDao.bulk(indexName, indexType, actions);
}

transaction.commit();
} catch (Throwable e) {
/*try {
if (transaction != null) {
transaction.rollback();
}
} catch (Exception e2) {
logger.error("flume transaction rollback error.", e2);
}*/
logger.error("Failed to commit flume transaction," +"Transaction rolled back.", e);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}

@Override
public void configure(Context context) {

}

private static Map<String,Object> generateMap(String line) {
Map<String,Object> map = new HashMap<>();
if (line != null) {
String[] split = line.split("\t");
if (split.length>=15) {
map.put("reqId", split[0]);
map.put("uid", split[1]);
map.put("ppuid", split[2]);
map.put("channel", split[3]);
map.put("feedNum", split[4]);
map.put("cost", split[5]);
map.put("strategy", split[6]);
map.put("timeStamp", split[7]);
map.put("host", split[8]);
map.put("recFeedId", split[9]);
map.put("txt", split[10]);
map.put("gallery", split[11]);
map.put("vedio", split[12]);
map.put("pMap", split[13]);
map.put("paraMap", split[14]);
}
}
/*BeanMap beanMap = BeanMap.create(an);
for (Object key : beanMap.keySet()) {
map.put((String)key, beanMap.get(key));
}*/

return map;
}
}


三、flume配置:

1)将上面的代码打包,放到flume的lib下

2)flume的配置如下:

agent1.sources = logsource
agent1.channels = mc1 mc2
agent1.sinks = avro-sink sink2

agent1.sources.logsource.channels = mc1 mc2
agent1.sinks.avro-sink.channel = mc1
agent1.sinks.sink2.channel = mc2

#source
agent1.sources.logsource.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.logsource.zookeeperConnect = ttAlgorithm-kafka-online001-jyltqbs.qiyi.virtual:2181,ttAlgorithm-kafka-online002-jyltqbs.qiyi.virtual:2181,ttAlgorithm-kafka-online003-jyltqbs.qiyi.virtual:2181,ttAlgorithm-kafka-online004-jyltqbs.qiyi.virtual:2181,ttAlgorithm-kafka-online005-jyltqbs.qiyi.virtual:2181
agent1.sources.logsource.topic = topic_predict
agent1.sources.logsource.groupId = flume
agent1.sources.logsource.kafka.consumer.timeout.ms = 100

#interceptor
agent1.sources.logsource.interceptors=filt2 filt3 filt4
agent1.sources.logsource.interceptors.filt2.type=host
agent1.sources.logsource.interceptors.filt2.hostHeader=hostname
agent1.sources.logsource.interceptors.filt2.useIP=true
agent1.sources.logsource.interceptors.filt3.type=timestamp
agent1.sources.logsource.interceptors.filt4.type=com.iqiyi.ttbrain.log.flume.interceptor.PredictInterceptor$Builder

agent1.sources.logsource.selector.type = multiplexing
agent1.sources.logsource.selector.header = isCool
agent1.sources.logsource.selector.mapping.0 = mc1
agent1.sources.logsource.selector.mapping.1 = mc2
agent1.sources.logsource.selector.default = mc1

#channel1
agent1.channels.mc1.type = memory
agent1.channels.mc1.capacity = 10000
agent1.channels.mc1.transactionCapacity = 10000
agent1.channels.mc1.keep-alive = 60

#channel2
agent1.channels.mc2.type = memory
agent1.channels.mc2.capacity = 10000
agent1.channels.mc2.transactionCapacity = 10000
agent1.channels.mc2.keep-alive = 60

#sink1
#agent1.sinks.avro-sink.type = file_roll
#agent1.sinks.avro-sink.sink.directory = /data/mysink
#agent1.sinks.avro-sink.sink.rollInterval = 10000000
agent1.sinks.avro-sink.type = hdfs
agent1.sinks.avro-sink.hdfs.path = hdfs://hadoop-jy-namenode/data/qytt/flume/ttengine_predict/dt=%Y-%m-%d/hour=%H/
agent1.sinks.avro-sink.hdfs.writeFormat = Text
agent1.sinks.avro-sink.hdfs.fileType = DataStream
agent1.sinks.avro-sink.hdfs.fileSuffix = .log
agent1.sinks.avro-sink.hdfs.filePrefix = %Y-%m-%d_%H
agent1.sinks.avro-sink.hdfs.rollInterval = 3600
agent1.sinks.avro-sink.hdfs.rollSize = 0
agent1.sinks.avro-sink.hdfs.rollCount = 0
agent1.sinks.avro-sink.hdfs.batchSize = 1000
agent1.sinks.avro-sink.hdfs.callTimeout = 60000
agent1.sinks.avro-sink.hdfs.appendTimeout = 60000

#sink2
agent1.sinks.sink2.type = hdfs
agent1.sinks.sink2.hdfs.path = hdfs://hadoop-jy-namenode/data/qytt/flume/ttengine_predict_cool_start/dt=%Y-%m-%d/hour=%H/
agent1.sinks.sink2.hdfs.writeFormat = Text
agent1.sinks.sink2.hdfs.fileType = DataStream
agent1.sinks.sink2.hdfs.fileSuffix = .log
agent1.sinks.sink2.hdfs.filePrefix = %Y-%m-%d_%H
agent1.sinks.sink2.hdfs.rollInterval = 3600
agent1.sinks.sink2.hdfs.rollSize = 0
agent1.sinks.sink2.hdfs.rollCount = 0
agent1.sinks.sink2.hdfs.batchSize = 1000
agent1.sinks.sink2.hdfs.callTimeout = 60000
agent1.sinks.sink2.hdfs.appendTimeout = 60000

3)注意事项:

flume1.6和es2.3版本有jar冲突问题,在编写flume-es-sink时,需要引入:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.3.2</version>
</dependency> 然后发现会有java.lang.NoSuchMethodError:com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concu‌rrent/Executor 报错,和 Exception in thread "main" java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW 两个问题,解决方法,在maven中加上
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency> 和
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.2</version>
</dependency>

同时,在把jar包放到flume的lib中,启动flume也会报上面的错误,这时需要把flume的lib目录下guava-11.0.2.jar和jackson-core-2.3.1.jar 两个包去掉。

具体可以参考:http://blog.csdn.net/liuxiao723846/article/details/78531916

参考文档:
http://tech.lede.com/2017/02/08/rd/server/flumeToEs/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: