1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.HashMap; import java.util.Random;
public class GroupedProcessingTimeWindowSample {
private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
private volatile boolean isRunning = true;
@Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { Random random = new Random(); while (isRunning) { Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5); String key = "类别" + (char) ('A' + random.nextInt(3)); int value = random.nextInt(10) + 1;
System.out.println(String.format("Emits\t(%s, %d)", key, value)); ctx.collect(new Tuple2<>(key, value)); } }
@Override public void cancel() { isRunning = false; } }
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2);
DataStream<Tuple2<String, Integer>> ds = env.addSource(new DataSource());
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);
keyedStream.sum(1).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return ""; } }) .fold(new HashMap<String, Integer>(), new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() { @Override public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception { accumulator.put(value.f0, value.f1); return accumulator; } }) .addSink(new SinkFunction<HashMap<String, Integer>>() { @Override public void invoke(HashMap<String, Integer> value, Context context) throws Exception { System.out.println(value); System.out.println(value.values().stream().mapToInt(v -> v).sum()); } });
env.execute(); } }
|