Kafka_producer
2015-12-15 10:29
225 查看
1、将日志数据收集并发送到Kafka topic中(循环周期CEL\Time)
//创建producer对象 Properties props = new Properties(); props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");//属性设置 props.put("serializer.class", "kafka.serializer.StringEncoder"); //属性设置 props.put("request.required.acks", "1");//属性设置 ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); //创建reader对象 reader 对象中自己封装了1、读取最后一行 2、将数据存入一维数组 3、日期转换 等方法 Reader reader = new Reader(); double[] rtData; String rtmsg; while (true) { long time = System.currentTimeMillis(); for (String tagName : tags) { rtData = reader.getrtdata(tagName, DataPath); rtmsg = tagName + " " + rtData[0] + " " + rtData[1] + " " + rtData[2] + " " + rtData[3] + " " + rtData[4] + " " + rtData[5]; String ip = ""; try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } KeyedMessage<String, String> data = new KeyedMessage<String, String>("test2", ip, rtmsg);//数据格式 producer.send(data);发送到topic } long _long = System.currentTimeMillis()-time; System.out.println(_long); Utils.sleep(CEL\Time-_long); }
相关文章推荐
- Core Image 和视频
- Dom解析
- tableview 的 headerview 滑动问题
- Pandas:SettingWithCopyWarning
- EventBus的关键函数介绍
- Grade 常用语法和结构(待整理)
- 11月网民上网高峰时段为晚上8点 比例升至6.48%
- ibatis工具--Abator
- 表单中有图文编辑器的内容,提交失败。
- linux dup,dup2,dup3 复制一个文件描述符
- Javascript基于AJAX回调函数传递参数实例分析
- 『全球化产品』应该要避开的几个坑
- 带你看懂Dictionary的内部实现
- 2.选择排序(直接选择排序和堆排序)
- ASPxPopupControl 关闭刷新
- Spring MVC传递URL到后台参数为空
- CSS字符编码引起乱码
- 实用的php清除html,php去除空格与换行,php清除空白行和换行,提取页面纯文本
- python基础-图形界面库
- error while loading shared libraries: libssl.so.6