Flink 实战代码归档

1. 单词词频统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
* 统计单词的出现频率 Flink 处理任务流程如下:
* ① 获取执行环境 (ExecutionEnvironment),实时数据流需要创建一个StreamExecutionEnvironment
* ② 加载或者创建数据源(source),例如 kafka,ES等
* ③ 转化处理数据(transformation)
* ④ 输出目的端(sink)
* ⑤ 执行任务(execute)
*/
public class WordCount {

public static void main(String[] args) throws Exception {
//定义socket的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
} catch (Exception e) {
System.err.println("没有指定port参数,使用默认值9000");
port = 9000;
}

// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 加载或者创建数据源
DataStreamSource<String> text = env.socketTextStream("192.168.3.121", port, "\n");

// 3. 转化处理数据
SingleOutputStreamOperator<WordWithCount> windowCount = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {

public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {

String[] splits = s.split("\\s+");

for (String word : splits) {
collector.collect(new WordWithCount(word, 1L));
}
}

})//打平操作,把每行的单词转为<word,count>类型的数据
//针对相同的word数据进行分组
.keyBy("word")
//指定计算数据的窗口大小和滑动窗口大小
.timeWindow(Time.seconds(2), Time.seconds(1))
.sum("count");

// 4. 输出目的端
//使用一个并行度
windowCount.print().setParallelism(1);
// 5. 执行任务。因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");

}

/**
* 主要为了存储单词以及单词出现的次数
*/
public static class WordWithCount {

public String word;
public long count;

public WordWithCount() {
}

public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}

}

2. 统计交易额

假设我们有一个数据源,它监控系统中订单的情况,当有新订单时,它使用 Tuple2 输出订单中商品的类型和交易额。然后,我们希望实时统计每个类别的交易额,以及实时统计全部类别的交易额。链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.HashMap;
import java.util.Random;

/**
* @author gaop
*/
public class GroupedProcessingTimeWindowSample {

// 继承RichParallelSourceFunction模拟数据源
// Flink 在运行时对 Source 会直接调用 run 方法,该方法需要不断的输出数据,从而形成初始流
// Flink 需要 Cancel Source Task 的时候会调用 cancel 方法
private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {

// 使用一个 Volatile 类型的变量来标记和控制执行的状态
private volatile boolean isRunning = true;

// 随机的产生商品类别和交易量的记录,然后通过 ctx#collect 方法进行发送
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
String key = "类别" + (char) ('A' + random.nextInt(3));
int value = random.nextInt(10) + 1;

System.out.println(String.format("Emits\t(%s, %d)", key, value));
ctx.collect(new Tuple2<>(key, value));
}
}

@Override
public void cancel() {
isRunning = false;
}
}


public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

// 创建了 Source,得到初始的<商品类型,成交量>流
DataStream<Tuple2<String, Integer>> ds = env.addSource(new DataSource());

// 使用 KeyBy 按 Tuple 的第 1 个字段(即商品类型)对输入流进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);

//对每一个 Key 对应的记录的第 2 个字段(即成交量)进行求合
keyedStream.sum(1).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return "";
}
}) // Fold 方法来在算子中维护每种类型商品的成交量
// Fold 接收一个初始值,然后当后续流中每条记录到达的时候,算子会调用所传递的 FoldFunction 对初始值进行更新,并发送更新后的值
.fold(new HashMap<String, Integer>(), new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
// 使用一个 HashMap 来对各个类别的当前成交量进行维护,当有一条新的<商品类别,成交量>到达时,就更新该 HashMap
@Override
public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
accumulator.put(value.f0, value.f1);
return accumulator;
}
})
.addSink(new SinkFunction<HashMap<String, Integer>>() {
@Override
public void invoke(HashMap<String, Integer> value, Context context) throws Exception {
// 每个类型的商品成交量
System.out.println(value);
// 商品成交总量
System.out.println(value.values().stream().mapToInt(v -> v).sum());
}
});

env.execute();
}
}