您的位置:首页 > 其它

Kafka_producer

2015-12-15 10:29 225 查看

1、将日志数据收集并发送到Kafka topic中(循环周期CEL\Time)

//创建producer对象
Properties props = new Properties();
props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");//属性设置
props.put("serializer.class", "kafka.serializer.StringEncoder"); //属性设置
props.put("request.required.acks", "1");//属性设置

ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);

//创建reader对象  reader 对象中自己封装了1、读取最后一行 2、将数据存入一维数组 3、日期转换 等方法
Reader reader = new Reader();
double[] rtData;
String rtmsg;

while (true) {
long time = System.currentTimeMillis();
for (String tagName : tags) {
rtData = reader.getrtdata(tagName, DataPath);
rtmsg = tagName + " " + rtData[0] + " " + rtData[1] + " " + rtData[2] + " " + rtData[3] + " " + rtData[4] + " " + rtData[5];
String ip = "";
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test2", ip, rtmsg);//数据格式
producer.send(data);发送到topic
}
long _long = System.currentTimeMillis()-time;
System.out.println(_long);
Utils.sleep(CEL\Time-_long);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: