Flink_获取wiki用户修改_代码实例
2017-09-26 11:39
459 查看
import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent; import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource; public class WiKiDemo { /** * This is my first flink demo * @param args * @throws Exception */ public static void main(String [] args) throws Exception{ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource()); //int out KeyedStream<WikipediaEditEvent, String> keyedEdits = edits.keyBy(new KeySelector<WikipediaEditEvent, String>() { @Override public String getKey(WikipediaEditEvent wikipediaEditEvent) throws Exception { return wikipediaEditEvent.getUser(); } }); DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)) //init //return .fold(new Tuple2<>("", 0l), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> fold(Tuple2<String, Long> stringLongTuple2, WikipediaEditEvent o) throws Exception { stringLongTuple2.f0 = o.getUser(); stringLongTuple2.f1 += o.getByteDiff(); return stringLongTuple2; } }); result.print(); /* //send to kafka result.map(new MapFunction<Tuple2<String,Long>, String>() { @Override public String map(Tuple2<String, Long> stringLongTuple2) throws Exception { return stringLongTuple2.toString(); } }).addSink(new FlinkKafkaProducer08<>("localhost:9200", "wiki-test", new SimpleStringSchema())); */ see.execute("Hello WiKi"); } }
pom依赖:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
文档:http://flink.iteblog.com/quickstart/run_example_quickstart.html
相关文章推荐
- JS获取和修改元素样式的实例代码
- C#微信小程序服务端获取用户解密信息实例代码
- Oracle 11g用户修改密码及加锁解锁功能实例代码
- java代码获取用户访问ip,绝大部分可行
- jquery 获取url参数插件实例代码
- PHP无限循环获取MySQL中的数据实例代码
- 获取用户真实姓名 Monxin专用(PHP代码函数)
- JSP+Servlet+Tomcat+Mysql实现用户注册、登陆、修改、添加、查看详情、分页实例之---框架
- ListView异步加载图片是非常实用的方法,凡是是要通过网络获取图片资源一般使用这种方法比较好,用户体验好,下面就说实现方法,先贴上主方法的代码:
- php获取qq用户昵称和在线状态(实例分析)
- Python修改Excel数据的实例代码
- [小代码]在对话框中获取用户输入
- c++修改文件(夹)的用户访问权限程序代码
- c#获取季度时间实例代码(季度的第一天)
- sharepoint 2007 修改用户和组 display name 和 email地址 代码
- Spring启动后获取所有拥有特定注解的Bean实例代码
- PHP7.0微信公众平台开发5: 实例二:获取用户列表
- 无人机项目获取用户信息并进行用户信息修改的angularjs部分
- 微信小程序修改swiper默认指示器样式的实例代码
- c++代码批量修改图片名称(重命名)实例及运行结果