您的位置:首页 > 数据库

自定义的flume-ng的postgresql数据库sink

2014-12-11 14:31 316 查看
flume-ng本身支持的sink没有postgresql数据库的,正好现在有这样的需求,将日志记录按字段分隔存储到数据库中,所以自定义这样一个sink,用JAVA开发

使用flume-ng提供的一些sdk和jar包,数据库映射相关采用mybatis,直接上程序

github地址:github代码

该实例程序用tail -F 监听文件,对新输入的每一行日志记录,用空格切分,取第一个字符串在控制台输出并存入数据库

package flumepgsink;

import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;

public class FlumePgSink extends AbstractSink implements Configurable {

private String queueSize;
private String resultFile;
private PrintWriter out;
private SqlSession session;

@Override
public Status process() throws EventDeliveryException {
Status status = null;

Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
Event event = ch.take();
byte[] datas = event.getBody();
String ds = new String(datas, "UTF-8");
String[] dsarr = ds.split(" ");
System.out.println(dsarr[0]);
out.println(dsarr[0]);

session.insert("flumepgsink.FlumePgMapper.insertContent", dsarr[0]);
session.commit();

txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}

protected void finalize() {
session.close();
}

@Override
public void configure(Context context) {
String queueSize = context.getString("queueSize", "10000");
String resultFile = context.getString("resultFile", "/tmp/flume/result1.txt");
this.queueSize = queueSize;
this.resultFile = resultFile;

try {
FileWriter outFile = new FileWriter(this.resultFile);
PrintWriter out = new PrintWriter(outFile);
this.out = out;
} catch (IOException e) {
e.printStackTrace();
}

try {
String resource = "flumepgsink/mybatis-config.xml";
InputStream inputmybatisstream = Resources.getResourceAsStream(resource);
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputmybatisstream);
SqlSession session = sqlSessionFactory.openSession();
this.session = session;

String firstContent = session.selectOne("flumepgsink.FlumePgMapper.selectFirst", 1);
System.out.println("####!!!!  First mybatis:" + firstContent);

} catch (IOException e) {
e.printStackTrace();
}

}

}


mybatis的SQL 映射文件为:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="flumepgsink.FlumePgMapper">
<select id="selectFirst" resultType="String">
select content from flumetest where id = #{id}
</select>

<insert id="insertContent">
insert into flumetest (content) values ( #{content} )
</insert>
</mapper>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  大数据 flume