自定义的flume-ng的postgresql数据库sink
2014-12-11 14:31
316 查看
flume-ng本身支持的sink没有postgresql数据库的,正好现在有这样的需求,将日志记录按字段分隔存储到数据库中,所以自定义这样一个sink,用JAVA开发
使用flume-ng提供的一些sdk和jar包,数据库映射相关采用mybatis,直接上程序
github地址:github代码
该实例程序用tail -F 监听文件,对新输入的每一行日志记录,用空格切分,取第一个字符串在控制台输出并存入数据库
mybatis的SQL 映射文件为:
使用flume-ng提供的一些sdk和jar包,数据库映射相关采用mybatis,直接上程序
github地址:github代码
该实例程序用tail -F 监听文件,对新输入的每一行日志记录,用空格切分,取第一个字符串在控制台输出并存入数据库
package flumepgsink; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.apache.ibatis.io.Resources; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.SqlSessionFactoryBuilder; public class FlumePgSink extends AbstractSink implements Configurable { private String queueSize; private String resultFile; private PrintWriter out; private SqlSession session; @Override public Status process() throws EventDeliveryException { Status status = null; Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); byte[] datas = event.getBody(); String ds = new String(datas, "UTF-8"); String[] dsarr = ds.split(" "); System.out.println(dsarr[0]); out.println(dsarr[0]); session.insert("flumepgsink.FlumePgMapper.insertContent", dsarr[0]); session.commit(); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); status = Status.BACKOFF; if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; } protected void finalize() { session.close(); } @Override public void configure(Context context) { String queueSize = context.getString("queueSize", "10000"); String resultFile = context.getString("resultFile", "/tmp/flume/result1.txt"); this.queueSize = queueSize; this.resultFile = resultFile; try { FileWriter outFile = new FileWriter(this.resultFile); PrintWriter out = new PrintWriter(outFile); this.out = out; } catch (IOException e) { e.printStackTrace(); } try { String resource = "flumepgsink/mybatis-config.xml"; InputStream inputmybatisstream = Resources.getResourceAsStream(resource); SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputmybatisstream); SqlSession session = sqlSessionFactory.openSession(); this.session = session; String firstContent = session.selectOne("flumepgsink.FlumePgMapper.selectFirst", 1); System.out.println("####!!!! First mybatis:" + firstContent); } catch (IOException e) { e.printStackTrace(); } } }
mybatis的SQL 映射文件为:
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="flumepgsink.FlumePgMapper"> <select id="selectFirst" resultType="String"> select content from flumetest where id = #{id} </select> <insert id="insertContent"> insert into flumetest (content) values ( #{content} ) </insert> </mapper>
相关文章推荐
- Flume(ng) 自定义sink实现和属性注入
- flume-ng 自定义sink 实现rollfile 变量目录
- flume-ng 自定义sink消费flume source
- Flume(ng) 自定义sink实现和属性注入
- Flume(ng) 自定义sink实现和属性注入
- Flume-ng 自定义sink实现和属性注入
- 【Flume】【源码分析】深入flume-ng的三大组件——source,channel,sink
- 【Java】【Flume】Flume-NG源码阅读之AvroSink
- Flume-ng的HdfsSink出现Lease mismatch错误
- 【Flume】【源码分析】深入flume-ng的三大组件——source,channel,sink
- 【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决
- Flume-ng HDFS sink原理解析
- 【Flume】【源码分析】深入flume-ng的三大组件——source,channel,sink
- Flume NG flume-hdfs-sink 源代码分析
- 【Flume】flume ng中HDFS sink设置按天滚动,0点滚动文件,修改源码实现
- flume 自定义sink
- Flume-ng HDFS Sink “丢数据”
- Flume-ng:multi sink one channel两种配置方式的对比
- Flume-NG源码阅读之AvroSink
- 如何编写Flume-ng-morphline-avro-sink