您的位置:首页 > 其它

flume 日志搬家下半场

2015-09-09 15:57 393 查看
续上面 获得资源后我们要将转换为相应的日志,落在统一的服务器中.
在flume中的对file操作的sink只有RollingFileSink但这个对我们来一点用都没有,
package com.ule.flume.sink.file;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;

import com.google.common.base.Preconditions;
import com.ule.flume.util.Constants;

import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.flume.sink.AbstractSink;
import org.apache.log4j.Logger;

public class FileSink extends AbstractSink implements Configurable {

private static final Logger logger = Logger.getLogger("sinklog");

//每隔30秒滚动一个文件。指定0将禁用滚动,并导致所有事件被写入到一个单独的文件中
private static final long defaultRollInterval = 30;

private static final int defaultBatchSize = 100;

private int batchSize = defaultBatchSize;

private String filePrefix = "";    //文件前缀名
private String dateType = "yyyy-MM-dd";//按照日期类型输出文件
private static Map<String, String> fileMap = new HashMap<String, String>();
private static Map<String, OutputStream> streamMap = new HashMap<String, OutputStream>();
private static Map<String, EventSerializer> serializerMap = new HashMap<String, EventSerializer>();
private  SimpleDateFormat format;

private File directory;
private long rollInterval;
private OutputStream outputStream;
private ScheduledExecutorService rollService;

private String serializerType;
private Context serializerContext;
private EventSerializer serializer;

private SinkCounter sinkCounter;

//  private FileManager pathController;
private volatile boolean shouldRotate;

public FileSink() {
shouldRotate = false;
}

@Override
public void configure(Context context) {

String directory = context.getString("file.directory");

filePrefix = context.getString( "file.filePrefix");
dateType = context.getString( "file.dateType");

String rollInterval = context.getString("file.rollInterval");

serializerType = context.getString("sink.serializer", "TEXT");
serializerContext =
new Context(context.getSubProperties("sink." + EventSerializer.CTX_PREFIX));

Preconditions.checkArgument(directory != null, "Directory may not be null");
Preconditions.checkNotNull(serializerType, "Serializer type is undefined");

if (rollInterval == null) {
this.rollInterval = defaultRollInterval;
} else {
this.rollInterval = Long.parseLong(rollInterval);
}

batchSize = context.getInteger("file.batchSize", defaultBatchSize);

this.directory = new File(directory);

if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}

@Override
public synchronized void start() {
logger.info("Starting {}..."+ this);
sinkCounter.start();
fileMap.put("start", "start");
super.start();

}

@Override
public Status process() throws EventDeliveryException {

Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
Status result = Status.READY;
String path = "";
String logName = "";
String infosString = "";
try {
transaction.begin();
event = channel.take();
if (event != null) {
String eventinfos = new String(event.getBody());
logger.debug("eventinfos:" + eventinfos);
String[] infosStrings = eventinfos.split(Constants.SPLIT);
if (infosStrings.length >= 3) {
for (int i = 0; i < infosStrings.length; i++) {
switch (i) {
case 0:
infosString = infosStrings[0].substring(0,infosStrings[0].lastIndexOf("\n"));
break;
case 1:
path = infosStrings[1];
break;
case 2:
logName = infosStrings[2];
break;
}
}
logger.debug("path:" + path);
logger.debug("logName:" + logName);
logger.debug("infosString:" + infosString.toString());
event.setBody(infosString.toString().getBytes());
}

}else {
result = Status.BACKOFF;
}

if (StringUtils.isNotBlank(path)&&StringUtils.isNotBlank(logName)) {
File file=new File(directory+File.separator+path);
if  (!file.exists()  && !file.isDirectory())
{
logger.debug("mkdirFile:" + file .mkdirs());
}

format =  new SimpleDateFormat(dateType);
String key = file+File.separator+logName;
String nowDate = format.format(System.currentTimeMillis());

if ("start".equals(fileMap.get("start"))) {//第一次启动
sinkCounter.incrementConnectionCreatedCount();
fileMap.put("start", "end");
}

String fileDate = fileMap.get(key);
File logFile=new File(key+"."+nowDate);
if (StringUtils.isNotBlank(fileDate)) {//value不为空 说明之前有该key的信息

if (nowDate.equals(fileDate)) {//时间为当天时间

serializerMap.get(key).write(event);
serializerMap.get(key).flush();
streamMap.get(key).flush();

}else {//时间不为当天时间
try {
serializerMap.get(key).flush();
serializerMap.get(key).beforeClose();
streamMap.get(key).close();
} catch (IOException e) {
sinkCounter.incrementConnectionFailedCount();
throw new EventDeliveryException("Unable to rotate file "
+ key+ " while delivering event", e);
} finally {
serializerMap.remove(key);
streamMap.remove(key);
}

fileMap.put(key, nowDate);
streamMap.put(key, new BufferedOutputStream(
new FileOutputStream(logFile)));
serializerMap.put(key, EventSerializerFactory.getInstance(
serializerType, serializerContext, streamMap.get(key)));
serializerMap.get(key).afterCreate();

serializerMap.get(key).write(event);
serializerMap.get(key).flush();
streamMap.get(key).flush();
}
}else {//value为空 说明之前没有 该 key的信息

if  (file.exists()){ //文件存在
fileMap.put(key, nowDate);
streamMap.put(key, new BufferedOutputStream(
new FileOutputStream(logFile,true)));
serializerMap.put(key, EventSerializerFactory.getInstance(
serializerType, serializerContext, streamMap.get(key)));
serializerMap.get(key).afterCreate();

serializerMap.get(key).write(event);
serializerMap.get(key).flush();
streamMap.get(key).flush();
}else {
fileMap.put(key, nowDate);
streamMap.put(key, new BufferedOutputStream(
new FileOutputStream(logFile)));
serializerMap.put(key, EventSerializerFactory.getInstance(
serializerType, serializerContext, streamMap.get(key)));
serializerMap.get(key).afterCreate();

serializerMap.get(key).write(event);
serializerMap.get(key).flush();
streamMap.get(key).flush();
}
}
}

int eventAttemptCounter = 0;
sinkCounter.incrementEventDrainAttemptCount();
eventAttemptCounter++;

transaction.commit();
sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException("Failed to process transaction", ex);
} finally {
transaction.close();
}

return result;
}

@Override
public synchronized void stop() {
logger.info("RollingFile sink {} stopping..."+ getName());
sinkCounter.stop();
super.stop();

for (String key : serializerMap.keySet()) {
try {
serializerMap.get(key).flush();
serializerMap.get(key).beforeClose();
streamMap.get(key).close();
} catch (IOException e) {
logger.error("Unable to close output stream. Exception follows.", e);
}
}
sinkCounter.incrementConnectionClosedCount();
fileMap.clear();
serializerMap.clear();
streamMap.clear();

logger.debug("RollingFile sink {} stopped. Event metrics: {} = "+" getName():"+
getName() +" sinkCounter:"+ sinkCounter);
}

public File getDirectory() {
return directory;
}

public void setDirectory(File directory) {
this.directory = directory;
}

public long getRollInterval() {
return rollInterval;
}

public void setRollInterval(long rollInterval) {
this.rollInterval = rollInterval;
}

}
由于我们接收的信息都是从不同机器传输过来的,但是落信息时需要根据传来的不同目录进行划分,所以这里不能用同步,并且每个写的操作都是独立的.需要在内存分开.

本文出自 “记下就对了” 博客,请务必保留此出处http://jueshizhanhun.blog.51cto.com/4372226/1693156
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: