Flink 实战代码归档-双流 Join

在两条流上做 OLAP 分析时,会用到双流 join,常用方法如下:

  • join()
  • coGroup()
  • intervalJoin()

1. 准备测试数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 接入 点击流和订单流
DataStream<String> clickSourceStream = env .addSource(new FlinkKafkaConsumer011<>(
"ods_analytics_access_log",
new SimpleStringSchema(),
kafkaProps
).setStartFromLatest());

DataStream<String> orderSourceStream = env.addSource(new FlinkKafkaConsumer011<>(
"ods_ms_order_done",
new SimpleStringSchema(),
kafkaProps
).setStartFromLatest());

// 转换为 pojo
DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream.map(message -> JSON.parseObject(message,
AnalyticsAccessLogRecord.class));

DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream.map(message -> JSON.parseObject(message,
OrderDoneLogRecord.class));

2. inner join

join() 算子提供的语义为”Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。

以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的售价相关字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
clickRecordStream.join(orderRecordStream)
.where(record -> record.getMerchandiseId())
.equalTo(record -> record.getMerchandiseId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {

@Override
public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {
return StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t');
}

})
.print().setParallelism(1);

3. left/right outer join

调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunctionJoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。

以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
clickRecordStream.coGroup(orderRecordStream)
.where(record -> record.getMerchandiseId())
.equalTo(record -> record.getMerchandiseId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {

@Override
public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords
,Iterable<OrderDoneLogRecord> orderRecords
,Collector<Tuple2<String, Long>> collector) throws Exception {
for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
boolean isMatched = false;
for (OrderDoneLogRecord orderRecord : orderRecords) {
// 右流中有对应的记录
collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
isMatched = true;
}
if (!isMatched) {
// 右流中没有对应的记录
collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
}
}
}

})
.print().setParallelism(1);

4. 非窗口关联

前两者都是基于窗口做关联的,但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易关联不上。所以 Flink 又提供了 Interval join 的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:

right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

intervalJoin() 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。

示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
clickRecordStream.keyBy(record -> record.getMerchandiseId())
.intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {

@Override
public void processElement(AnalyticsAccessLogRecord accessRecord
,OrderDoneLogRecord orderRecord, Context context
,Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t'));
}

})
.print().setParallelism(1);