Flink 双流 Join
在两条流上做 OLAP 分析时,会用到双流 join,常用方法如下:
join()
coGroup()
intervalJoin()
1. 准备测试数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| DataStream<String> clickSourceStream = env .addSource(new FlinkKafkaConsumer011<>( "ods_analytics_access_log", new SimpleStringSchema(), kafkaProps ).setStartFromLatest());
DataStream<String> orderSourceStream = env.addSource(new FlinkKafkaConsumer011<>( "ods_ms_order_done", new SimpleStringSchema(), kafkaProps ).setStartFromLatest());
DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream.map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));
DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream.map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));
|
2. inner join
join()
算子提供的语义为”Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行 inner join
,支持处理时间和事件时间两种时间特征。
以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的售价相关字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| clickRecordStream.join(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() { @Override public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception { return StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount() ), '\t'); } }) .print().setParallelism(1);
|
3. left/right outer join
调用方式类似于 join()
算子,也需要开窗,但是 CoGroupFunction
比 JoinFunction
更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。
以下的例子就实现了点击流 left join
订单流的功能,是很朴素的 nested loop join
思想(二重循环)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| clickRecordStream.coGroup(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() { @Override public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords ,Iterable<OrderDoneLogRecord> orderRecords ,Collector<Tuple2<String, Long>> collector) throws Exception { for (AnalyticsAccessLogRecord accessRecord : accessRecords) { boolean isMatched = false; for (OrderDoneLogRecord orderRecord : orderRecords) { collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice())); isMatched = true; } if (!isMatched) { collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null)); } } } }) .print().setParallelism(1);
|
4. 非窗口关联
前两者都是基于窗口做关联的,但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易关联不上。所以 Flink 又提供了 Interval join
的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
intervalJoin()
也是 inner join
,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。
示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks()
方法获取事件时间戳和水印。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| clickRecordStream.keyBy(record -> record.getMerchandiseId()) .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId())) .between(Time.seconds(-30), Time.seconds(30)) .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() { @Override public void processElement(AnalyticsAccessLogRecord accessRecord ,OrderDoneLogRecord orderRecord, Context context ,Collector<String> collector) throws Exception { collector.collect(StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount() ), '\t')); } }) .print().setParallelism(1);
|