您的位置:首页 > 编程语言

Flink_获取wiki用户修改_代码实例

2017-09-26 11:39 459 查看
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WiKiDemo {

/**
* This is my first flink demo
* @param args
* @throws Exception
*/
public static void main(String [] args) throws  Exception{
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
//int               out
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent wikipediaEditEvent) throws Exception {
return wikipediaEditEvent.getUser();
}
});

DataStream<Tuple2<String, Long>> result =  keyedEdits.timeWindow(Time.seconds(5))
//init                                                      //return
.fold(new Tuple2<>("", 0l), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> stringLongTuple2, WikipediaEditEvent o) throws Exception {
stringLongTuple2.f0 = o.getUser();
stringLongTuple2.f1 += o.getByteDiff();
return stringLongTuple2;
}
});
result.print();
/*
//send to kafka
result.map(new MapFunction<Tuple2<String,Long>, String>() {
@Override
public String map(Tuple2<String, Long> stringLongTuple2) throws Exception {
return stringLongTuple2.toString();
}
}).addSink(new FlinkKafkaProducer08<>("localhost:9200", "wiki-test", new SimpleStringSchema()));
*/
see.execute("Hello WiKi");
}
}


pom依赖:

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>


文档:http://flink.iteblog.com/quickstart/run_example_quickstart.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  flink