* Windows this {@code DataStream} into sliding time windows.
* <p>
* This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
* {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
* set using
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
* <p>
* Note: This operation can be inherently non-parallel since all elements have to pass through
* the same operator instance. (Only for special cases, such as aligned time windows is
* it possible to perform this operation in parallel).
* @param size The size of the window.
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(SlidingProcessingTimeWindows.of(size, slide));
} else {
return windowAll(SlidingEventTimeWindows.of(size, slide));
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package org.apache.flink.streaming.api;

import org.apache.flink.annotation.PublicEvolving;

* The time characteristic defines how the system determines time for time-dependent
* order and operations that depend on time (such as time windows).
public enum TimeCharacteristic {

* Processing time for operators means that the operator uses the system clock of the machine
* to determine the current time of the data stream. Processing-time windows trigger based
* on wall-clock time and include whatever elements happen to have arrived at the operator at
* that point in time.
* <p>
* Using processing time for window operations results in general in quite non-deterministic results,
* because the contents of the windows depends on the speed in which elements arrive. It is, however,
* the cheapest method of forming windows and the method that introduces the least latency.

* Ingestion time means that the time of each individual element in the stream is determined
* when the element enters the Flink streaming data flow. Operations like windows group the
* elements based on that time, meaning that processing speed within the streaming dataflow
* does not affect windowing, but only the speed at which sources receive elements.
* <p>
* Ingestion time is often a good compromise between processing time and event time.
* It does not need and special manual form of watermark generation, and events are typically
* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
* only be introduced by streaming shuffles or split/join/union operations. The fact that elements
* are not very much out-of-order means that the latency increase is moderate, compared to event
* time.

* Event time means that the time of each individual element in the stream (also called event)
* is determined by the event's individual custom timestamp. These timestamps either exist in the
* elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
* The big implication of this is that it allows for elements to arrive in the sources and in
* all operators out of order, meaning that elements with earlier timestamps may arrive after
* elements with later timestamps.
* <p>
* Operators that window or order data with respect to event time must buffer data until they can
* be sure that all timestamps for a certain time interval have been received. This is handled by
* the so called "time watermarks".
* <p>
* Operations based on event time are very predictable - the result of windowing operations
* is typically identical no matter when the window is executed and how fast the streams operate.
* At the same time, the buffering and tracking of event time is also costlier than operating
* with processing time, and typically also introduces more latency. The amount of extra
* cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
* between the arrival of early and late elements is. With respect to the "time watermarks", this
* means that the cost typically depends on how early or late the watermarks can be generated
* for their timestamp.
* <p>
* In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
* original time, rather than the time assigned at the data source. Practically, that means that
* event time has generally more meaning, but also that it takes longer to determine that all
* elements for a certain time have arrived.

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

* {@link StreamOperator} for streaming sources.
* @param <OUT> Type of the output elements
* @param <SRC> Type of the source function of this stream source operator
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

private static final long serialVersionUID = 1L;

private transient SourceFunction.SourceContext<OUT> ctx;

private transient volatile boolean canceledOrStopped = false;

public StreamSource(SRC sourceFunction) {

this.chainingStrategy = ChainingStrategy.HEAD;

public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final SourceFunction.SourceContext<OUT> ctx;

switch (timeCharacteristic) {
case EventTime:
ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
case IngestionTime:
ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
case ProcessingTime:
ctx = new NonTimestampContext<>(this, lockingObject, collector);
throw new Exception(String.valueOf(timeCharacteristic));

// copy to a field to give the 'cancel()' method access
this.ctx = ctx;

try {

// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
} finally {
// make sure that the context is closed in any case

public void cancel() {
// important: marking the source as stopped has to happen before the function is stopped.
// the flag that tracks this status is volatile, so the memory model also guarantees
// the happens-before relationship

// the context may not be initialized if the source was never running.
if (ctx != null) {

* Marks this source as canceled or stopped.
* <p>This indicates that any exit of the {@link #run(Object, Output)} method
* cannot be interpreted as the result of a finite source.
protected void markCanceledOrStopped() {
this.canceledOrStopped = true;

* Checks whether the source has been canceled or stopped.
* @return True, if the source is canceled or stopped, false is not.
protected boolean isCanceledOrStopped() {
return canceledOrStopped;

* Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...)
* has caused an exception. If one of these threads caused an exception, this method will
* throw that exception.
void checkAsyncException() {

// ------------------------------------------------------------------------
//  Source contexts for various stream time characteristics
// ------------------------------------------------------------------------

* A source context that attached {@code -1} as a timestamp to all records, and that
* does not forward watermarks.
public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {

private final StreamSource<?, ?> owner;
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;

public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
this.owner = owner;
this.lockingObject = lockingObject;
this.output = output;
this.reuse = new StreamRecord<T>(null);

public void collect(T element) {
synchronized (lockingObject) {

public void collectWithTimestamp(T element, long timestamp) {
// ignore the timestamp

public void emitWatermark(Watermark mark) {
// do nothing else

public Object getCheckpointLock() {
return lockingObject;

public void close() {}

* {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
* and watermark emission.
public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {

private final StreamSource<?, ?> owner;
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;

private final ScheduledExecutorService scheduleExecutor;
private final ScheduledFuture<?> watermarkTimer;
private final long watermarkInterval;

private volatile long nextWatermarkTime;

public AutomaticWatermarkContext(
final StreamSource<?, ?> owner,
final Object lockingObjectParam,
final Output<StreamRecord<T>> outputParam,
final long watermarkInterval) {

if (watermarkInterval < 1L) {
throw new IllegalArgumentException("The watermark interval cannot be smaller than one.");

this.owner = owner;
this.lockingObject = lockingObjectParam;
this.output = outputParam;
this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord<T>(null);

this.scheduleExecutor = Executors.newScheduledThreadPool(1);

this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
public void run() {
final long currentTime = System.currentTimeMillis();

if (currentTime > nextWatermarkTime) {
// align the watermarks across all machines. this will ensure that we
// don't have watermarks that creep along at different intervals because
// the machine clocks are out of sync
final long watermarkTime = currentTime - (currentTime % watermarkInterval);

synchronized (lockingObjectParam) {
if (currentTime > nextWatermarkTime) {
outputParam.emitWatermark(new Watermark(watermarkTime));
nextWatermarkTime += watermarkInterval;
}, 0, watermarkInterval, TimeUnit.MILLISECONDS);

public void collect(T element) {

synchronized (lockingObject) {
final long currentTime = System.currentTimeMillis();
output.collect(reuse.replace(element, currentTime));

if (currentTime > nextWatermarkTime) {
// in case we jumped some watermarks, recompute the next watermark time
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
nextWatermarkTime = watermarkTime + watermarkInterval;
output.emitWatermark(new Watermark(watermarkTime));

public void collectWithTimestamp(T element, long timestamp) {

public void emitWatermark(Watermark mark) {

if (mark.getTimestamp() == Long.MAX_VALUE) {
// allow it since this is the special end-watermark that for example the Kafka source emits
synchronized (lockingObject) {
nextWatermarkTime = Long.MAX_VALUE;

// we can shutdown the timer now, no watermarks will be needed any more

public Object getCheckpointLock() {
return lockingObject;

public void close() {

* A SourceContext for event time. Sources may directly attach timestamps and generate
* watermarks, but if records are emitted without timestamps, no timetamps are automatically
* generated and attached. The records will simply have no timestamp in that case.
* Streaming topologies can use timestamp assigner functions to override the timestamps
* assigned here.
public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {

private final StreamSource<?, ?> owner;
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;

public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
this.owner = owner;
this.lockingObject = lockingObject;
this.output = output;
this.reuse = new StreamRecord<T>(null);

public void collect(T element) {

synchronized (lockingObject) {

public void collectWithTimestamp(T element, long timestamp) {

synchronized (lockingObject) {
output.collect(reuse.replace(element, timestamp));

public void emitWatermark(Watermark mark) {

synchronized (lockingObject) {

public Object getCheckpointLock() {
return lockingObject;

public void close() {}

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;

* A {@code AllWindowedStream} represents a data stream where the stream of
* elements is split into windows based on a
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
* is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
* <p>
* If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
* used to evict elements from the window after
* evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
* When using an evictor window performance will degrade significantly, since
* pre-aggregation of window results cannot be used.
* <p>
* Note that the {@code AllWindowedStream} is purely and API construct, during runtime
* the {@code AllWindowedStream} will be collapsed together with the
* operation over the window into one single operation.
* @param <T> The type of elements in the stream.
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
public class AllWindowedStream<T, W extends Window> {

/** The data stream that is windowed by this stream */
private final DataStream<T> input;

/** The window assigner */
private final WindowAssigner<? super T, W> windowAssigner;

/** The trigger that is used for window evaluation/emission. */
private Trigger<? super T, ? super W> trigger;

/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;

public AllWindowedStream(DataStream<T> input,
WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.windowAssigner = windowAssigner;
this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());

* Sets the {@code Trigger} that should be used to trigger window emission.
public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
this.trigger = trigger;
return this;

* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
* <p>
* Note: When using an evictor window performance will degrade significantly, since
* pre-aggregation of window results cannot be used.
public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
this.evictor = evictor;
return this;

// ------------------------------------------------------------------------
//  Operations on the keyed windows
// ------------------------------------------------------------------------

* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
* as a regular non-windowed stream.
* <p>
* This window will try and pre-aggregate data as much as the window policies permit. For example,
* tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
* key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
* so a few elements are stored per key (one per slide interval).
* Custom windows may not be able to pre-aggregate, or may need to store extra values in an
* aggregation tree.
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
"Please use apply(ReduceFunction, WindowFunction) instead.");

//clean the closure
function = input.getExecutionEnvironment().clean(function);

String callLocation = Utils.getCallLocationName();
String udfName = "AllWindowedStream." + callLocation;

SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
if (result != null) {
return result;

String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";

OneInputStreamOperator<T, T> operator;

if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
new ReduceIterableAllWindowFunction<W, T>(function),

} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
new ReduceIterableAllWindowFunction<W, T>(function),

return input.transform(opName, input.getType(), operator).setParallelism(1);

* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
* interpreted as a regular non-windowed stream.
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
"Please use apply(FoldFunction, WindowFunction) instead.");

TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
Utils.getCallLocationName(), true);

return fold(initialValue, function, resultType);

* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
* interpreted as a regular non-windowed stream.
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
"Please use apply(FoldFunction, WindowFunction) instead.");

return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType);

* Applies a window function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the window function is interpreted
* as a regular non-windowed stream.
* <p>
* Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of pre-aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
@SuppressWarnings("unchecked, rawtypes")
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, getInputType(), null, false);

return apply(function, resultType);

* Applies the given window function to each window. The window function is called for each evaluation
* of the window for each key individually. The output of the window function is interpreted
* as a regular non-windowed stream.
* <p>
* Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of pre-aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);

String callLocation = Utils.getCallLocationName();
String udfName = "AllWindowedStream." + callLocation;

SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
if (result != null) {
return result;

String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";

NonKeyedWindowOperator<T, T, R, W> operator;

if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),

} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),

return input.transform(opName, resultType, operator).setParallelism(1);

* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
* <p>
* Arriving data is pre-aggregated using the given pre-aggregation reducer.
* @param preAggregator The reduce function that is used for pre-aggregation
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.

public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, inType, null, false);

return apply(preAggregator, function, resultType);

* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
* <p>
* Arriving data is pre-aggregated using the given pre-aggregation reducer.
* @param preAggregator The reduce function that is used for pre-aggregation
* @param function The window function.
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
if (preAggregator instanceof RichFunction) {
throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction.");

//clean the closures
function = input.getExecutionEnvironment().clean(function);
preAggregator = input.getExecutionEnvironment().clean(preAggregator);

String callLocation = Utils.getCallLocationName();
String udfName = "AllWindowedStream." + callLocation;

String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";

OneInputStreamOperator<T, R> operator;

if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
new ReduceApplyAllWindowFunction<>(preAggregator, function),

} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())),

return input.transform(opName, resultType, operator).setParallelism(1);

* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
* <p>
* Arriving data is incrementally aggregated using the given fold function.
* @param initialValue The initial value of the fold.
* @param foldFunction The fold function that is used for incremental aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
Utils.getCallLocationName(), true);

return apply(initialValue, foldFunction, function, resultType);

* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
* <p>
* Arriving data is incrementally aggregated using the given fold function.
* @param initialValue The initial value of the fold.
* @param foldFunction The fold function that is used for incremental aggregation.
* @param function The window function.
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) {
if (foldFunction instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");

//clean the closures
function = input.getExecutionEnvironment().clean(function);
foldFunction = input.getExecutionEnvironment().clean(foldFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "AllWindowedStream." + callLocation;

String opName;

OneInputStreamOperator<T, R> operator;

if (evictor != null) {
opName = "NonParallelTriggerWindow(" + windowAssigner  + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),

} else {
opName = "NonParallelTriggerWindow(" + windowAssigner  + ", " + trigger + ", " + udfName + ")";

operator = new NonKeyedWindowOperator<>(windowAssigner,
new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())),

return input.transform(opName, resultType, operator).setParallelism(1);

// ------------------------------------------------------------------------
//  Aggregations on the  windows
// ------------------------------------------------------------------------

* Applies an aggregation that sums every window of the data stream at the
* given position.
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));

* Applies an aggregation that sums every window of the pojo data stream at
* the given field for every window.
* <p>
* A field expression is either
* the name of a public field or a getter method with parentheses of the
* stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
* @param field The field to sum
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));

* Applies an aggregation that that gives the minimum value of every window
* of the data stream at the given position.
* @param positionToMin The position to minimize
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> min(int positionToMin) {
return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));

* Applies an aggregation that that gives the minimum value of the pojo data
* stream at the given field expression for every window.
* <p>
* A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream}S underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
* @param field The field expression based on which the aggregation will be applied.
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> min(String field) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));

* Applies an aggregation that gives the minimum element of every window of
* the data stream by the given position. If more elements have the same
* minimum value the operator returns the first element by default.
* @param positionToMinBy
*            The position to minimize by
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);

* Applies an aggregation that gives the minimum element of every window of
* the data stream by the given position. If more elements have the same
* minimum value the operator returns the first element by default.
* @param positionToMinBy The position to minimize by
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true);

* Applies an aggregation that gives the minimum element of every window of
* the data stream by the given position. If more elements have the same
* minimum value the operator returns either the first or last one depending
* on the parameter setting.
* @param positionToMinBy The position to minimize
* @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));

* Applies an aggregation that that gives the minimum element of the pojo
* data stream by the given field expression for every window. A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
* @param field The field expression based on which the aggregation will be applied.
* @param first If True then in case of field equality the first object will be returned
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));

* Applies an aggregation that gives the maximum value of every window of
* the data stream at the given position.
* @param positionToMax The position to maximize
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> max(int positionToMax) {
return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));

* Applies an aggregation that that gives the maximum value of the pojo data
* stream at the given field expression for every window. A field expression
* is either the name of a public field or a getter method with parentheses
* of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
* down into objects, as in {@code "field1.getInnerField2()" }.
* @param field The field expression based on which the aggregation will be applied.
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> max(String field) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));

* Applies an aggregation that gives the maximum element of every window of
* the data stream by the given position. If more elements have the same
* maximum value the operator returns the first by default.
* @param positionToMaxBy
*            The position to maximize by
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);

* Applies an aggregation that gives the maximum element of every window of
* the data stream by the given position. If more elements have the same
* maximum value the operator returns the first by default.
* @param positionToMaxBy
*            The position to maximize by
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);

* Applies an aggregation that gives the maximum element of every window of
* the data stream by the given position. If more elements have the same
* maximum value the operator returns either the first or last one depending
* on the parameter setting.
* @param positionToMaxBy The position to maximize by
* @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));

* Applies an aggregation that that gives the maximum element of the pojo
* data stream by the given field expression for every window. A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream}S underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
* @param field The field expression based on which the aggregation will be applied.
* @param first If True then in case of field equality the first object will be returned
* @return The transformed DataStream.
public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));

private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {
return reduce(aggregator);

// ------------------------------------------------------------------------
//  Utilities
// ------------------------------------------------------------------------

private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
Function function,
TypeInformation<R> resultType,
String functionName) {

// TODO: add once non-parallel fast aligned time windows operator is ready
return null;

public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();

public TypeInformation<T> getInputType() {
return input.getType();

* Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
* over a key grouped stream. Elements are put into windows by a
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
* elements is done both by key and by window.
* <p>
* A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
* when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
* that is used if a {@code Trigger} is not specified.
* <p>
* Note: This operation can be inherently non-parallel since all elements have to pass through
* the same operator instance. (Only for special cases, such as aligned time windows is
* it possible to perform this operation in parallel).
* @param assigner The {@code WindowAssigner} that assigns elements to windows.
* @return The trigger windows data stream.
public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
return new AllWindowedStream<>(this, assigner);

package org.apache.flink.streaming.api.windowing.assigners;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import java.io.Serializable;

import java.util.Collection;

* A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
* <p>
* In a window operation, elements are grouped by their key (if available) and by the windows to
* which it was assigned. The set of elements with the same key and window is called a pane.
* When a {@link Trigger} decides that a certain pane should fire the
* {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
* to produce output elements for that pane.
* @param <T> The type of elements that this WindowAssigner can assign windows to.
* @param <W> The type of {@code Window} that this assigner assigns.
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;

* Returns a {@code Collection} of windows that should be assigned to the element.
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
public abstract Collection<W> assignWindows(T element, long timestamp);

* Returns the default trigger associated with this {@code WindowAssigner}.
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

* Returns a {@link TypeSerializer} for serializing windows that are assigned by
* this {@code WindowAssigner}.
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.windowing.windows.Window;

import java.io.Serializable;

* A {@code Trigger} determines when a pane of a window should be evaluated to emit the
* results for that part of the window.
* <p>
* A pane is the bucket of elements that have the same key (assigned by the
* {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
* be in multiple panes of it was assigned to multiple windows by the
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
* have their own instance of the {@code Trigger}.
* <p>
* Triggers must not maintain state internally since they can be re-created or reused for
* different keys. All necessary state should be persisted using the state abstraction
* available on the {@link TriggerContext}.
* @param <T> The type of elements on which this {@code Trigger} works.
* @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
public abstract class Trigger<T, W extends Window> implements Serializable {

private static final long serialVersionUID = -4104633972991191369L;

* Called for every element that gets added to a pane. The result of this will determine
* whether the pane is evaluated to emit results.
* @param element The element that arrived.
* @param timestamp The timestamp of the element that arrived.
* @param window The window to which this pane belongs.
* @param ctx A context object that can be used to register timer callbacks.
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

* Called when a processing-time timer that was set using the trigger context fires.
* @param time The timestamp at which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

* Called when an event-time timer that was set using the trigger context fires.
* @param time The timestamp at which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

* Clears any state that the trigger might still hold for the given window. This is called
* when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
* and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
* well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
* <p>By default, this method does nothing.
public void clear(W window, TriggerContext ctx) throws Exception {}

// ------------------------------------------------------------------------

* A context object that is given to {@link Trigger} methods to allow them to register timer
* callbacks and deal with state.
public interface TriggerContext {

* Returns the current watermark time.
long getCurrentWatermark();

* Register a system time callback. When the current system time passes the specified
* time {@link Trigger#onProcessingTime(long, Window, TriggerContext)} is called with the time specified here.
* @param time The time at which to invoke {@link Trigger#onProcessingTime(long, Window, TriggerContext)}
void registerProcessingTimeTimer(long time);

* Register an event-time callback. When the current watermark passes the specified
* time {@link Trigger#onEventTime(long, Window, TriggerContext)} is called with the time specified here.
* @param time The watermark at which to invoke {@link Trigger#onEventTime(long, Window, TriggerContext)}
* @see org.apache.flink.streaming.api.watermark.Watermark
void registerEventTimeTimer(long time);

* Delete the processing time trigger for the given time.
void deleteProcessingTimeTimer(long time);

* Delete the event-time trigger for the given time.
void deleteEventTimeTimer(long time);

* Retrieves an {@link State} object that can be used to interact with
* fault-tolerant state that is scoped to the window and key of the current
* trigger invocation.
* @param stateDescriptor The StateDescriptor that contains the name and type of the
*                        state that is being accessed.
* @param <S>             The type of the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
*                                       function (function is not part os a KeyedStream).
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

* Retrieves a {@link ValueState} object that can be used to interact with
* fault-tolerant state that is scoped to the window and key of the current
* trigger invocation.
* @param name The name of the key/value state.
* @param stateType The class of the type that is stored in the state. Used to generate
*                  serializers for managed memory and checkpointing.
* @param defaultState The default state value, returned when the state is accessed and
*                     no value has yet been set for the key. May be null.
* @param <S>          The type of the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
*                                       function (function is not part os a KeyedStream).
<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

* Retrieves a {@link ValueState} object that can be used to interact with
* fault-tolerant state that is scoped to the window and key of the current
* trigger invocation.
* @param name The name of the key/value state.
* @param stateType The type information for the type that is stored in the state.
*                  Used to create serializers for managed memory and checkpoints.
* @param defaultState The default state value, returned when the state is accessed and
*                     no value has yet been set for the key. May be null.
* @param <S>          The type of the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
*                                       function (function is not part os a KeyedStream).
<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);

package org.apache.flink.streaming.api.windowing.assigners;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.util.Collection;
import java.util.Collections;

* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements. Windows cannot overlap.
* <p>
* For example, in order to window into windows of 1 minute:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
*   keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
* } </pre>
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private long size;

protected TumblingEventTimeWindows(long size) {
this.size = size;

public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = timestamp - (timestamp % size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +

public long getSize() {
return size;

public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();

public String toString() {
return "TumblingEventTimeWindows(" + size + ")";

* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
* @param size The size of the generated windows.
* @return The time policy.
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds());

public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

* A {@link Trigger} that fires once the watermark passes the end of the window
* to which a pane belongs.
* @see org.apache.flink.streaming.api.watermark.Watermark
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private EventTimeTrigger() {}

public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;

public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;

public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;

public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

public String toString() {
return "EventTimeTrigger()";

* Creates an event-time trigger that fires once the watermark passes the end of the window.
* <p>
* Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
public static EventTimeTrigger create() {
return new EventTimeTrigger();

package org.apache.flink.streaming.api.windowing.windows;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;

* A {@link Window} that represents a time interval from {@code start} (inclusive) to
* {@code start + size} (exclusive).
public class TimeWindow extends Window {

private final long start;
private final long end;

public TimeWindow(long start, long end) {
this.start = start;
this.end = end;

public long getStart() {
return start;

public long getEnd() {
return end;

public long maxTimestamp() {
return end - 1;

public boolean equals(Object o) {
if (this == o) {
return true;
if (o == null || getClass() != o.getClass()) {
return false;

TimeWindow window = (TimeWindow) o;

return end == window.end && start == window.start;

public int hashCode() {
int result = (int) (start ^ (start >>> 32));
result = 31 * result + (int) (end ^ (end >>> 32));
return result;

public String toString() {
return "TimeWindow{" +
"start=" + start +
", end=" + end +

public static class Serializer extends TypeSerializer<TimeWindow> {
private static final long serialVersionUID = 1L;

public boolean isImmutableType() {
return true;

public TypeSerializer<TimeWindow> duplicate() {
return this;

public TimeWindow createInstance() {
return null;

public TimeWindow copy(TimeWindow from) {
return from;

public TimeWindow copy(TimeWindow from, TimeWindow reuse) {
return from;

public int getLength() {
return 0;

public void serialize(TimeWindow record, DataOutputView target) throws IOException {

public TimeWindow deserialize(DataInputView source) throws IOException {
long start = source.readLong();
long end = source.readLong();
return new TimeWindow(start, end);

public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
long start = source.readLong();
long end = source.readLong();
return new TimeWindow(start, end);

public void copy(DataInputView source, DataOutputView target) throws IOException {

public boolean equals(Object obj) {
return obj instanceof Serializer;

public boolean canEqual(Object obj) {
return obj instanceof Serializer;

public int hashCode() {
return 0;


package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.streaming.api.windowing.windows.Window;

* Result type for trigger methods. This determines what happens with the window,
* for example whether the window function should be called, or the window
* should be discarded.
public enum TriggerResult {

* No action is taken on the window.
CONTINUE(false, false),

* {@code FIRE_AND_PURGE} evaluates the window function and emits the window
* result.
FIRE_AND_PURGE(true, true),

* On {@code FIRE}, the window is evaluated and results are emitted.
* The window is not purged, though, all elements are retained.
FIRE(true, false),

* All elements in the window are cleared and the window is discarded,
* without evaluating the window function or emitting any elements.
PURGE(false, true);

// ------------------------------------------------------------------------

private final boolean fire;
private final boolean purge;

TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;

public boolean isFire() {
return fire;

public boolean isPurge() {
return purge;

// ------------------------------------------------------------------------

* Merges two {@code TriggerResults}. This specifies what should happen if we have
* two results from a Trigger, for example as a result from
* {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} and
* {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}.
* <p>
* For example, if one result says {@code CONTINUE} while the other says {@code FIRE}
* then {@code FIRE} is the combined result;
public static TriggerResult merge(TriggerResult a, TriggerResult b) {
if (a.purge || b.purge) {
if (a.fire || b.fire) {
} else {
return PURGE;
} else if (a.fire || b.fire) {
return FIRE;
} else {
return CONTINUE;

package org.apache.flink.streaming.api.functions;

import org.apache.flink.streaming.api.watermark.Watermark;

* A timestamp assigner that assigns timestamps based on the machine's wall clock.
* <p>If this assigner is used after a stream source, it realizes "ingestion time" semantics.
* @param <T> The elements that get timestamps assigned.
public class IngestionTimeExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = -4072216356049069301L;

private long maxTimestamp;

public long extractTimestamp(T element, long previousElementTimestamp) {
// make sure timestamps are monotonously increasing, even when the system clock re-syncs
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return now;

public Watermark getCurrentWatermark() {
// make sure timestamps are monotonously increasing, even when the system clock re-syncs
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return new Watermark(now - 1);
