08_Flink Streaming window
2016-06-08 11:04
399 查看
flink提供时间和事件的滑动和跳动的四种窗口。来看下滑动时间窗口的实现。
flink中支持三种时间语义,
1:系统时间,也就是operator在处理数据的时候,当时机器上的时间,性能最高,ProcessingTime。
2:采集时间,也就是flink第一次采集到数据的时间,数据一进来的时候就指定好了。性能一般,还需要把source的时间往下传,IngestionTime
3:事件时间:也就是说,为每个记录,指定一个时间的抽取逻辑。可以实现真正的业务时间。性能代价也最大。EventTime
trigger用来描述复杂情况特定的触发行为。提供几种行为结果。TriggerResult的四种结果。
通过datastream对象来设置3不同的语义。AssignerWithPeriodicWatermarks注入这个对象。指定如何为每一个事件,指定时间。以及如何使用这个时间。
/**
* Windows this {@code DataStream} into sliding time windows.
*
* <p>
* This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
* {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
* set using
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
*
* <p>
* Note: This operation can be inherently non-parallel since all elements have to pass through
* the same operator instance. (Only for special cases, such as aligned time windows is
* it possible to perform this operation in parallel).
*
* @param size The size of the window.
*/
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(SlidingProcessingTimeWindows.of(size, slide));
} else {
return windowAll(SlidingEventTimeWindows.of(size, slide));
}
}
/**
* Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
* over a key grouped stream. Elements are put into windows by a
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
* elements is done both by key and by window.
*
* <p>
* A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
* when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
* that is used if a {@code Trigger} is not specified.
*
* <p>
* Note: This operation can be inherently non-parallel since all elements have to pass through
* the same operator instance. (Only for special cases, such as aligned time windows is
* it possible to perform this operation in parallel).
*
* @param assigner The {@code WindowAssigner} that assigns elements to windows.
* @return The trigger windows data stream.
*/
@PublicEvolving
public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
return new AllWindowedStream<>(this, assigner);
}
flink中支持三种时间语义,
1:系统时间,也就是operator在处理数据的时候,当时机器上的时间,性能最高,ProcessingTime。
2:采集时间,也就是flink第一次采集到数据的时间,数据一进来的时候就指定好了。性能一般,还需要把source的时间往下传,IngestionTime
3:事件时间:也就是说,为每个记录,指定一个时间的抽取逻辑。可以实现真正的业务时间。性能代价也最大。EventTime
trigger用来描述复杂情况特定的触发行为。提供几种行为结果。TriggerResult的四种结果。
通过datastream对象来设置3不同的语义。AssignerWithPeriodicWatermarks注入这个对象。指定如何为每一个事件,指定时间。以及如何使用这个时间。
/**
* Windows this {@code DataStream} into sliding time windows.
*
* <p>
* This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
* {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
* set using
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
*
* <p>
* Note: This operation can be inherently non-parallel since all elements have to pass through
* the same operator instance. (Only for special cases, such as aligned time windows is
* it possible to perform this operation in parallel).
*
* @param size The size of the window.
*/
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(SlidingProcessingTimeWindows.of(size, slide));
} else {
return windowAll(SlidingEventTimeWindows.of(size, slide));
}
}
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api; import org.apache.flink.annotation.PublicEvolving; /** * The time characteristic defines how the system determines time for time-dependent * order and operations that depend on time (such as time windows). */ @PublicEvolving public enum TimeCharacteristic { /** * Processing time for operators means that the operator uses the system clock of the machine * to determine the current time of the data stream. Processing-time windows trigger based * on wall-clock time and include whatever elements happen to have arrived at the operator at * that point in time. * <p> * Using processing time for window operations results in general in quite non-deterministic results, * because the contents of the windows depends on the speed in which elements arrive. It is, however, * the cheapest method of forming windows and the method that introduces the least latency. */ ProcessingTime, /** * Ingestion time means that the time of each individual element in the stream is determined * when the element enters the Flink streaming data flow. Operations like windows group the * elements based on that time, meaning that processing speed within the streaming dataflow * does not affect windowing, but only the speed at which sources receive elements. * <p> * Ingestion time is often a good compromise between processing time and event time. * It does not need and special manual form of watermark generation, and events are typically * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can * only be introduced by streaming shuffles or split/join/union operations. The fact that elements * are not very much out-of-order means that the latency increase is moderate, compared to event * time. */ IngestionTime, /** * Event time means that the time of each individual element in the stream (also called event) * is determined by the event's individual custom timestamp. These timestamps either exist in the * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources. * The big implication of this is that it allows for elements to arrive in the sources and in * all operators out of order, meaning that elements with earlier timestamps may arrive after * elements with later timestamps. * <p> * Operators that window or order data with respect to event time must buffer data until they can * be sure that all timestamps for a certain time interval have been received. This is handled by * the so called "time watermarks". * <p> * Operations based on event time are very predictable - the result of windowing operations * is typically identical no matter when the window is executed and how fast the streams operate. * At the same time, the buffering and tracking of event time is also costlier than operating * with processing time, and typically also introduces more latency. The amount of extra * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span * between the arrival of early and late elements is. With respect to the "time watermarks", this * means that the cost typically depends on how early or late the watermarks can be generated * for their timestamp. * <p> * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's * original time, rather than the time assigned at the data source. Practically, that means that * event time has generally more meaning, but also that it takes longer to determine that all * elements for a certain time have arrived. */ EventTime }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * {@link StreamOperator} for streaming sources. * * @param <OUT> Type of the output elements * @param <SRC> Type of the source function of this stream source operator */ @Internal public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { private static final long serialVersionUID = 1L; private transient SourceFunction.SourceContext<OUT> ctx; private transient volatile boolean canceledOrStopped = false; public StreamSource(SRC sourceFunction) { super(sourceFunction); this.chainingStrategy = ChainingStrategy.HEAD; } public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final SourceFunction.SourceContext<OUT> ctx; switch (timeCharacteristic) { case EventTime: ctx = new ManualWatermarkContext<>(this, lockingObject, collector); break; case IngestionTime: ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval()); break; case ProcessingTime: ctx = new NonTimestampContext<>(this, lockingObject, collector); break; default: throw new Exception(String.valueOf(timeCharacteristic)); } // copy to a field to give the 'cancel()' method access this.ctx = ctx; try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closed in any case ctx.close(); } } public void cancel() { // important: marking the source as stopped has to happen before the function is stopped. // the flag that tracks this status is volatile, so the memory model also guarantees // the happens-before relationship markCanceledOrStopped(); userFunction.cancel(); // the context may not be initialized if the source was never running. if (ctx != null) { ctx.close(); } } /** * Marks this source as canceled or stopped. * * <p>This indicates that any exit of the {@link #run(Object, Output)} method * cannot be interpreted as the result of a finite source. */ protected void markCanceledOrStopped() { this.canceledOrStopped = true; } /** * Checks whether the source has been canceled or stopped. * @return True, if the source is canceled or stopped, false is not. */ protected boolean isCanceledOrStopped() { return canceledOrStopped; } /** * Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...) * has caused an exception. If one of these threads caused an exception, this method will * throw that exception. */ void checkAsyncException() { getContainingTask().checkTimerException(); } // ------------------------------------------------------------------------ // Source contexts for various stream time characteristics // ------------------------------------------------------------------------ /** * A source context that attached {@code -1} as a timestamp to all records, and that * does not forward watermarks. */ public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> { private final StreamSource<?, ?> owner; private final Object lockingObject; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) { this.owner = owner; this.lockingObject = lockingObject; this.output = output; this.reuse = new StreamRecord<T>(null); } @Override public void collect(T element) { owner.checkAsyncException(); synchronized (lockingObject) { output.collect(reuse.replace(element)); } } @Override public void collectWithTimestamp(T element, long timestamp) { // ignore the timestamp collect(element); } @Override public void emitWatermark(Watermark mark) { owner.checkAsyncException(); // do nothing else } @Override public Object getCheckpointLock() { return lockingObject; } @Override public void close() {} } /** * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps * and watermark emission. */ public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { private final StreamSource<?, ?> owner; private final Object lockingObject; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; private final ScheduledExecutorService scheduleExecutor; private final ScheduledFuture<?> watermarkTimer; private final long watermarkInterval; private volatile long nextWatermarkTime; public AutomaticWatermarkContext( final StreamSource<?, ?> owner, final Object lockingObjectParam, final Output<StreamRecord<T>> outputParam, final long watermarkInterval) { if (watermarkInterval < 1L) { throw new IllegalArgumentException("The watermark interval cannot be smaller than one."); } this.owner = owner; this.lockingObject = lockingObjectParam; this.output = outputParam; this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<T>(null); this.scheduleExecutor = Executors.newScheduledThreadPool(1); this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { final long currentTime = System.currentTimeMillis(); if (currentTime > nextWatermarkTime) { // align the watermarks across all machines. this will ensure that we // don't have watermarks that creep along at different intervals because // the machine clocks are out of sync final long watermarkTime = currentTime - (currentTime % watermarkInterval); synchronized (lockingObjectParam) { if (currentTime > nextWatermarkTime) { outputParam.emitWatermark(new Watermark(watermarkTime)); nextWatermarkTime += watermarkInterval; } } } } }, 0, watermarkInterval, TimeUnit.MILLISECONDS); } @Override public void collect(T element) { owner.checkAsyncException(); synchronized (lockingObject) { final long currentTime = System.currentTimeMillis(); output.collect(reuse.replace(element, currentTime)); if (currentTime > nextWatermarkTime) { // in case we jumped some watermarks, recompute the next watermark time final long watermarkTime = currentTime - (currentTime % watermarkInterval); nextWatermarkTime = watermarkTime + watermarkInterval; output.emitWatermark(new Watermark(watermarkTime)); } } } @Override public void collectWithTimestamp(T element, long timestamp) { collect(element); } @Override public void emitWatermark(Watermark mark) { owner.checkAsyncException(); if (mark.getTimestamp() == Long.MAX_VALUE) { // allow it since this is the special end-watermark that for example the Kafka source emits synchronized (lockingObject) { nextWatermarkTime = Long.MAX_VALUE; output.emitWatermark(mark); } // we can shutdown the timer now, no watermarks will be needed any more watermarkTimer.cancel(true); scheduleExecutor.shutdownNow(); } } @Override public Object getCheckpointLock() { return lockingObject; } @Override public void close() { watermarkTimer.cancel(true); scheduleExecutor.shutdownNow(); } } /** * A SourceContext for event time. Sources may directly attach timestamps and generate * watermarks, but if records are emitted without timestamps, no timetamps are automatically * generated and attached. The records will simply have no timestamp in that case. * * Streaming topologies can use timestamp assigner functions to override the timestamps * assigned here. */ public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> { private final StreamSource<?, ?> owner; private final Object lockingObject; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) { this.owner = owner; this.lockingObject = lockingObject; this.output = output; this.reuse = new StreamRecord<T>(null); } @Override public void collect(T element) { owner.checkAsyncException(); synchronized (lockingObject) { output.collect(reuse.replace(element)); } } @Override public void collectWithTimestamp(T element, long timestamp) { owner.checkAsyncException(); synchronized (lockingObject) { output.collect(reuse.replace(element, timestamp)); } } @Override public void emitWatermark(Watermark mark) { owner.checkAsyncException(); synchronized (lockingObject) { output.emitWatermark(mark); } } @Override public Object getCheckpointLock() { return lockingObject; } @Override public void close() {} } }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer; /** * A {@code AllWindowedStream} represents a data stream where the stream of * elements is split into windows based on a * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. * * <p> * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be * used to evict elements from the window after * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window. * When using an evictor window performance will degrade significantly, since * pre-aggregation of window results cannot be used. * * <p> * Note that the {@code AllWindowedStream} is purely and API construct, during runtime * the {@code AllWindowedStream} will be collapsed together with the * operation over the window into one single operation. * * @param <T> The type of elements in the stream. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to. */ @Public public class AllWindowedStream<T, W extends Window> { /** The data stream that is windowed by this stream */ private final DataStream<T> input; /** The window assigner */ private final WindowAssigner<? super T, W> windowAssigner; /** The trigger that is used for window evaluation/emission. */ private Trigger<? super T, ? super W> trigger; /** The evictor that is used for evicting elements before window evaluation. */ private Evictor<? super T, ? super W> evictor; @PublicEvolving public AllWindowedStream(DataStream<T> input, WindowAssigner<? super T, W> windowAssigner) { this.input = input; this.windowAssigner = windowAssigner; this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()); } /** * Sets the {@code Trigger} that should be used to trigger window emission. */ @PublicEvolving public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) { this.trigger = trigger; return this; } /** * Sets the {@code Evictor} that should be used to evict elements from a window before emission. * * <p> * Note: When using an evictor window performance will degrade significantly, since * pre-aggregation of window results cannot be used. */ @PublicEvolving public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) { this.evictor = evictor; return this; } // ------------------------------------------------------------------------ // Operations on the keyed windows // ------------------------------------------------------------------------ /** * Applies a reduce function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the reduce function is interpreted * as a regular non-windowed stream. * <p> * This window will try and pre-aggregate data as much as the window policies permit. For example, * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval, * so a few elements are stored per key (one per slide interval). * Custom windows may not be able to pre-aggregate, or may need to store extra values in an * aggregation tree. * * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " + "Please use apply(ReduceFunction, WindowFunction) instead."); } //clean the closure function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); String udfName = "AllWindowedStream." + callLocation; SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName); if (result != null) { return result; } String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; OneInputStreamOperator<T, T> operator; if (evictor != null) { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), new ReduceIterableAllWindowFunction<W, T>(function), trigger, evictor); } else { operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())), new ReduceIterableAllWindowFunction<W, T>(function), trigger); } return input.transform(opName, input.getType(), operator).setParallelism(1); } /** * Applies the given fold function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the reduce function is * interpreted as a regular non-windowed stream. * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. */ public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + "Please use apply(FoldFunction, WindowFunction) instead."); } TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(), Utils.getCallLocationName(), true); return fold(initialValue, function, resultType); } /** * Applies the given fold function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the reduce function is * interpreted as a regular non-windowed stream. * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. */ public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + "Please use apply(FoldFunction, WindowFunction) instead."); } return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType); } /** * Applies a window function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the window function is interpreted * as a regular non-windowed stream. * <p> * Not that this function requires that all data in the windows is buffered until the window * is evaluated, as the function provides no means of pre-aggregation. * * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) { @SuppressWarnings("unchecked, rawtypes") TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( function, AllWindowFunction.class, true, true, getInputType(), null, false); return apply(function, resultType); } /** * Applies the given window function to each window. The window function is called for each evaluation * of the window for each key individually. The output of the window function is interpreted * as a regular non-windowed stream. * <p> * Not that this function requires that all data in the windows is buffered until the window * is evaluated, as the function provides no means of pre-aggregation. * * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); String udfName = "AllWindowedStream." + callLocation; SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName); if (result != null) { return result; } String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; NonKeyedWindowOperator<T, T, R, W> operator; if (evictor != null) { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), function, trigger, evictor); } else { operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), function, trigger); } return input.transform(opName, resultType, operator).setParallelism(1); } /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. * * <p> * Arriving data is pre-aggregated using the given pre-aggregation reducer. * * @param preAggregator The reduce function that is used for pre-aggregation * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) { TypeInformation<T> inType = input.getType(); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( function, AllWindowFunction.class, true, true, inType, null, false); return apply(preAggregator, function, resultType); } /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. * * <p> * Arriving data is pre-aggregated using the given pre-aggregation reducer. * * @param preAggregator The reduce function that is used for pre-aggregation * @param function The window function. * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { if (preAggregator instanceof RichFunction) { throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction."); } //clean the closures function = input.getExecutionEnvironment().clean(function); preAggregator = input.getExecutionEnvironment().clean(preAggregator); String callLocation = Utils.getCallLocationName(); String udfName = "AllWindowedStream." + callLocation; String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; OneInputStreamOperator<T, R> operator; if (evictor != null) { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), new ReduceApplyAllWindowFunction<>(preAggregator, function), trigger, evictor); } else { operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())), function, trigger); } return input.transform(opName, resultType, operator).setParallelism(1); } /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. * * <p> * Arriving data is incrementally aggregated using the given fold function. * * @param initialValue The initial value of the fold. * @param foldFunction The fold function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) { TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); return apply(initialValue, foldFunction, function, resultType); } /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. * * <p> * Arriving data is incrementally aggregated using the given fold function. * * @param initialValue The initial value of the fold. * @param foldFunction The fold function that is used for incremental aggregation. * @param function The window function. * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) { if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); } //clean the closures function = input.getExecutionEnvironment().clean(function); foldFunction = input.getExecutionEnvironment().clean(foldFunction); String callLocation = Utils.getCallLocationName(); String udfName = "AllWindowedStream." + callLocation; String opName; OneInputStreamOperator<T, R> operator; if (evictor != null) { opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + evictor + ", " + udfName + ")"; operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function), trigger, evictor); } else { opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())), function, trigger); } return input.transform(opName, resultType, operator).setParallelism(1); } // ------------------------------------------------------------------------ // Aggregations on the windows // ------------------------------------------------------------------------ /** * Applies an aggregation that sums every window of the data stream at the * given position. * * @param positionToSum The position in the tuple/array to sum * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> sum(int positionToSum) { return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig())); } /** * Applies an aggregation that sums every window of the pojo data stream at * the given field for every window. * * <p> * A field expression is either * the name of a public field or a getter method with parentheses of the * stream's underlying type. A dot can be used to drill down into objects, * as in {@code "field1.getInnerField2()" }. * * @param field The field to sum * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> sum(String field) { return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig())); } /** * Applies an aggregation that that gives the minimum value of every window * of the data stream at the given position. * * @param positionToMin The position to minimize * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> min(int positionToMin) { return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig())); } /** * Applies an aggregation that that gives the minimum value of the pojo data * stream at the given field expression for every window. * * <p> * A field * expression is either the name of a public field or a getter method with * parentheses of the {@link DataStream}S underlying type. A dot can be used * to drill down into objects, as in {@code "field1.getInnerField2()" }. * * @param field The field expression based on which the aggregation will be applied. * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> min(String field) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig())); } /** * Applies an aggregation that gives the minimum element of every window of * the data stream by the given position. If more elements have the same * minimum value the operator returns the first element by default. * * @param positionToMinBy * The position to minimize by * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } /** * Applies an aggregation that gives the minimum element of every window of * the data stream by the given position. If more elements have the same * minimum value the operator returns the first element by default. * * @param positionToMinBy The position to minimize by * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> minBy(String positionToMinBy) { return this.minBy(positionToMinBy, true); } /** * Applies an aggregation that gives the minimum element of every window of * the data stream by the given position. If more elements have the same * minimum value the operator returns either the first or last one depending * on the parameter setting. * * @param positionToMinBy The position to minimize * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) { return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); } /** * Applies an aggregation that that gives the minimum element of the pojo * data stream by the given field expression for every window. A field * expression is either the name of a public field or a getter method with * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used * to drill down into objects, as in {@code "field1.getInnerField2()" }. * * @param field The field expression based on which the aggregation will be applied. * @param first If True then in case of field equality the first object will be returned * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> minBy(String field, boolean first) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); } /** * Applies an aggregation that gives the maximum value of every window of * the data stream at the given position. * * @param positionToMax The position to maximize * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> max(int positionToMax) { return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig())); } /** * Applies an aggregation that that gives the maximum value of the pojo data * stream at the given field expression for every window. A field expression * is either the name of a public field or a getter method with parentheses * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill * down into objects, as in {@code "field1.getInnerField2()" }. * * @param field The field expression based on which the aggregation will be applied. * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> max(String field) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig())); } /** * Applies an aggregation that gives the maximum element of every window of * the data stream by the given position. If more elements have the same * maximum value the operator returns the first by default. * * @param positionToMaxBy * The position to maximize by * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } /** * Applies an aggregation that gives the maximum element of every window of * the data stream by the given position. If more elements have the same * maximum value the operator returns the first by default. * * @param positionToMaxBy * The position to maximize by * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } /** * Applies an aggregation that gives the maximum element of every window of * the data stream by the given position. If more elements have the same * maximum value the operator returns either the first or last one depending * on the parameter setting. * * @param positionToMaxBy The position to maximize by * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) { return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); } /** * Applies an aggregation that that gives the maximum element of the pojo * data stream by the given field expression for every window. A field * expression is either the name of a public field or a getter method with * parentheses of the {@link DataStream}S underlying type. A dot can be used * to drill down into objects, as in {@code "field1.getInnerField2()" }. * * @param field The field expression based on which the aggregation will be applied. * @param first If True then in case of field equality the first object will be returned * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> maxBy(String field, boolean first) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); } private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) { return reduce(aggregator); } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid( Function function, TypeInformation<R> resultType, String functionName) { // TODO: add once non-parallel fast aligned time windows operator is ready return null; } public StreamExecutionEnvironment getExecutionEnvironment() { return input.getExecutionEnvironment(); } public TypeInformation<T> getInputType() { return input.getType(); } }
/**
* Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
* over a key grouped stream. Elements are put into windows by a
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
* elements is done both by key and by window.
*
* <p>
* A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
* when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
* that is used if a {@code Trigger} is not specified.
*
* <p>
* Note: This operation can be inherently non-parallel since all elements have to pass through
* the same operator instance. (Only for special cases, such as aligned time windows is
* it possible to perform this operation in parallel).
*
* @param assigner The {@code WindowAssigner} that assigns elements to windows.
* @return The trigger windows data stream.
*/
@PublicEvolving
public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
return new AllWindowedStream<>(this, assigner);
}
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.windowing.assigners; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import java.io.Serializable; import java.util.Collection; /** * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element. * * <p> * In a window operation, elements are grouped by their key (if available) and by the windows to * which it was assigned. The set of elements with the same key and window is called a pane. * When a {@link Trigger} decides that a certain pane should fire the * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied * to produce output elements for that pane. * * @param <T> The type of elements that this WindowAssigner can assign windows to. * @param <W> The type of {@code Window} that this assigner assigns. */ @PublicEvolving public abstract class WindowAssigner<T, W extends Window> implements Serializable { private static final long serialVersionUID = 1L; /** * Returns a {@code Collection} of windows that should be assigned to the element. * * @param element The element to which windows should be assigned. * @param timestamp The timestamp of the element. */ public abstract Collection<W> assignWindows(T element, long timestamp); /** * Returns the default trigger associated with this {@code WindowAssigner}. */ public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); /** * Returns a {@link TypeSerializer} for serializing windows that are assigned by * this {@code WindowAssigner}. */ public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig); }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.windowing.windows.Window; import java.io.Serializable; /** * A {@code Trigger} determines when a pane of a window should be evaluated to emit the * results for that part of the window. * * <p> * A pane is the bucket of elements that have the same key (assigned by the * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can * be in multiple panes of it was assigned to multiple windows by the * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all * have their own instance of the {@code Trigger}. * * <p> * Triggers must not maintain state internally since they can be re-created or reused for * different keys. All necessary state should be persisted using the state abstraction * available on the {@link TriggerContext}. * * @param <T> The type of elements on which this {@code Trigger} works. * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate. */ @PublicEvolving public abstract class Trigger<T, W extends Window> implements Serializable { private static final long serialVersionUID = -4104633972991191369L; /** * Called for every element that gets added to a pane. The result of this will determine * whether the pane is evaluated to emit results. * * @param element The element that arrived. * @param timestamp The timestamp of the element that arrived. * @param window The window to which this pane belongs. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; /** * Called when a processing-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; /** * Called when an event-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; /** * Clears any state that the trigger might still hold for the given window. This is called * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as * well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}. * * <p>By default, this method does nothing. */ public void clear(W window, TriggerContext ctx) throws Exception {} // ------------------------------------------------------------------------ /** * A context object that is given to {@link Trigger} methods to allow them to register timer * callbacks and deal with state. */ public interface TriggerContext { /** * Returns the current watermark time. */ long getCurrentWatermark(); /** * Register a system time callback. When the current system time passes the specified * time {@link Trigger#onProcessingTime(long, Window, TriggerContext)} is called with the time specified here. * * @param time The time at which to invoke {@link Trigger#onProcessingTime(long, Window, TriggerContext)} */ void registerProcessingTimeTimer(long time); /** * Register an event-time callback. When the current watermark passes the specified * time {@link Trigger#onEventTime(long, Window, TriggerContext)} is called with the time specified here. * * @param time The watermark at which to invoke {@link Trigger#onEventTime(long, Window, TriggerContext)} * @see org.apache.flink.streaming.api.watermark.Watermark */ void registerEventTimeTimer(long time); /** * Delete the processing time trigger for the given time. */ void deleteProcessingTimeTimer(long time); /** * Delete the event-time trigger for the given time. */ void deleteEventTimeTimer(long time); /** * Retrieves an {@link State} object that can be used to interact with * fault-tolerant state that is scoped to the window and key of the current * trigger invocation. * * @param stateDescriptor The StateDescriptor that contains the name and type of the * state that is being accessed. * @param <S> The type of the state. * @return The partitioned state object. * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part os a KeyedStream). */ <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor); /** * Retrieves a {@link ValueState} object that can be used to interact with * fault-tolerant state that is scoped to the window and key of the current * trigger invocation. * * @param name The name of the key/value state. * @param stateType The class of the type that is stored in the state. Used to generate * serializers for managed memory and checkpointing. * @param defaultState The default state value, returned when the state is accessed and * no value has yet been set for the key. May be null. * * @param <S> The type of the state. * @return The partitioned state object. * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part os a KeyedStream). */ @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); /** * Retrieves a {@link ValueState} object that can be used to interact with * fault-tolerant state that is scoped to the window and key of the current * trigger invocation. * * @param name The name of the key/value state. * @param stateType The type information for the type that is stored in the state. * Used to create serializers for managed memory and checkpoints. * @param defaultState The default state value, returned when the state is accessed and * no value has yet been set for the key. May be null. * * @param <S> The type of the state. * @return The partitioned state object. * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part os a KeyedStream). */ @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); } }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.windowing.assigners; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.util.Collection; import java.util.Collections; /** * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the * elements. Windows cannot overlap. * * <p> * For example, in order to window into windows of 1 minute: * <pre> {@code * DataStream<Tuple2<String, Integer>> in = ...; * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...); * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = * keyed.window(TumblingEventTimeWindows.of(Time.minutes(1))); * } </pre> */ @PublicEvolving public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private long size; protected TumblingEventTimeWindows(long size) { this.size = size; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp) { if (timestamp > Long.MIN_VALUE) { // Long.MIN_VALUE is currently assigned when no timestamp is present long start = timestamp - (timestamp % size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } public long getSize() { return size; } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } @Override public String toString() { return "TumblingEventTimeWindows(" + size + ")"; } /** * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp. * * @param size The size of the generated windows. * @return The time policy. */ public static TumblingEventTimeWindows of(Time size) { return new TumblingEventTimeWindows(size.toMilliseconds()); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** * A {@link Trigger} that fires once the watermark passes the end of the window * to which a pane belongs. * * @see org.apache.flink.streaming.api.watermark.Watermark */ @PublicEvolving public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() {} @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public String toString() { return "EventTimeTrigger()"; } /** * Creates an event-time trigger that fires once the watermark passes the end of the window. * * <p> * Once the trigger fires all elements are discarded. Elements that arrive late immediately * trigger window evaluation with just this one element. */ public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.windowing.windows; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import java.io.IOException; /** * A {@link Window} that represents a time interval from {@code start} (inclusive) to * {@code start + size} (exclusive). */ @PublicEvolving public class TimeWindow extends Window { private final long start; private final long end; public TimeWindow(long start, long end) { this.start = start; this.end = end; } public long getStart() { return start; } public long getEnd() { return end; } @Override public long maxTimestamp() { return end - 1; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } TimeWindow window = (TimeWindow) o; return end == window.end && start == window.start; } @Override public int hashCode() { int result = (int) (start ^ (start >>> 32)); result = 31 * result + (int) (end ^ (end >>> 32)); return result; } @Override public String toString() { return "TimeWindow{" + "start=" + start + ", end=" + end + '}'; } public static class Serializer extends TypeSerializer<TimeWindow> { private static final long serialVersionUID = 1L; @Override public boolean isImmutableType() { return true; } @Override public TypeSerializer<TimeWindow> duplicate() { return this; } @Override public TimeWindow createInstance() { return null; } @Override public TimeWindow copy(TimeWindow from) { return from; } @Override public TimeWindow copy(TimeWindow from, TimeWindow reuse) { return from; } @Override public int getLength() { return 0; } @Override public void serialize(TimeWindow record, DataOutputView target) throws IOException { target.writeLong(record.start); target.writeLong(record.end); } @Override public TimeWindow deserialize(DataInputView source) throws IOException { long start = source.readLong(); long end = source.readLong(); return new TimeWindow(start, end); } @Override public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException { long start = source.readLong(); long end = source.readLong(); return new TimeWindow(start, end); } @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.writeLong(source.readLong()); target.writeLong(source.readLong()); } @Override public boolean equals(Object obj) { return obj instanceof Serializer; } @Override public boolean canEqual(Object obj) { return obj instanceof Serializer; } @Override public int hashCode() { return 0; } } }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.streaming.api.windowing.windows.Window; /** * Result type for trigger methods. This determines what happens with the window, * for example whether the window function should be called, or the window * should be discarded. */ public enum TriggerResult { /** * No action is taken on the window. */ CONTINUE(false, false), /** * {@code FIRE_AND_PURGE} evaluates the window function and emits the window * result. */ FIRE_AND_PURGE(true, true), /** * On {@code FIRE}, the window is evaluated and results are emitted. * The window is not purged, though, all elements are retained. */ FIRE(true, false), /** * All elements in the window are cleared and the window is discarded, * without evaluating the window function or emitting any elements. */ PURGE(false, true); // ------------------------------------------------------------------------ private final boolean fire; private final boolean purge; TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return fire; } public boolean isPurge() { return purge; } // ------------------------------------------------------------------------ /** * Merges two {@code TriggerResults}. This specifies what should happen if we have * two results from a Trigger, for example as a result from * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} and * {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}. * * <p> * For example, if one result says {@code CONTINUE} while the other says {@code FIRE} * then {@code FIRE} is the combined result; */ public static TriggerResult merge(TriggerResult a, TriggerResult b) { if (a.purge || b.purge) { if (a.fire || b.fire) { return FIRE_AND_PURGE; } else { return PURGE; } } else if (a.fire || b.fire) { return FIRE; } else { return CONTINUE; } } }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.functions; import org.apache.flink.streaming.api.watermark.Watermark; /** * A timestamp assigner that assigns timestamps based on the machine's wall clock. * * <p>If this assigner is used after a stream source, it realizes "ingestion time" semantics. * * @param <T> The elements that get timestamps assigned. */ public class IngestionTimeExtractor<T> implements AssignerWithPeriodicWatermarks<T> { private static final long serialVersionUID = -4072216356049069301L; private long maxTimestamp; @Override public long extractTimestamp(T element, long previousElementTimestamp) { // make sure timestamps are monotonously increasing, even when the system clock re-syncs final long now = Math.max(System.currentTimeMillis(), maxTimestamp); maxTimestamp = now; return now; } @Override public Watermark getCurrentWatermark() { // make sure timestamps are monotonously increasing, even when the system clock re-syncs final long now = Math.max(System.currentTimeMillis(), maxTimestamp); maxTimestamp = now; return new Watermark(now - 1); } }
相关文章推荐
- SQL查询排名函数实例
- linux下find用法 find -name *.so -exec ll {} \;
- JD-Eclipse 安装
- 在springMVC中配置<mvc>标签,运行时报错 无法找到MVC元素
- json串解析问题
- C语言实现基于最大堆和最小堆的堆排序算法示例
- python截屏
- Zookeeper的安装与配置(单机和伪分布式)
- 强制跳转HTTPS
- snat dnat
- onedrive-d WARNING: Dummy-2: failed to dump config to file .onedrive/config_v2.json
- 【Unity】 C# 协程 WaitForSeconds产生GC(Garbage Collection)问题
- Process使用
- 用nodejs搭建最简单、轻量化的http server
- Java并发编程总结2——慎用CAS
- iOS图片缩放
- android开发中,可能会导致内存泄露的问题
- JNI 数据类型及方法调用对照表
- CenterOS6.5下安装oracle11g经验谈
- C++实现选择排序