Flink 基础-流处理过程(第二章)

1. 创建环境

1.1. getExecutionEnvironment

根据查询运行的方式返回相应的运行环境,并行度可以从flink-conf.yaml中获取,或者手动设置

1
2
3
4
5
// 批
ExecutionEnvironment env = new ExecutionEnvironment.getExecutionEnvironment();

// 流
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

1.2. createLocalEnvironment()

返回本地运行环境,必须设置并行度

1
2
// 返回本地环境
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

1.3. createRemoteEnvironment()

返回集群运行环境,必须设置设置jobManager的 IP 和端口号、在集群中运行的 jar

1
2
// 返回远程环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123, "YOUPATH//WordCount.jar");

2. 建立 source

2.1. 集合 collection

1
2
env.fromCollection(List<...>);
env.fromElement(...data);

2.2. 文件 file

1
env.readTextFile("/usr/local/demo/test.txt");

2.3. addSource(SourceFunction<T>)

添加其他 source ,可以使用官方提供或者自己实现

2.3.1. kafka

  • FlinkKafkaConsumer011<String>()
  • FlinkKafkaConsumer09<String>()
  • FlinkKafkaConsumer010<String>()
  • FlinkKafkaConsumerBase<String>()

2.4. addSource(new MySourceFunction<OUT>)

需要实现接口 SourceFunction<OUT>

  • run(ctx):使用 ctx.collect() 将数据传送出去
  • cancel():停止传输的条件
1
2
3
4
5
6
7
8
9
10
11
12
public static class MySourceFunction implements SourceFunction<String> {

@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect();
}

@Override
public void cancel() {

}
}

3. 转换 Transform

常用算子

4. 建立 sink

4.1. addSink(SinkFunction<T>)

添加其他 sink ,可以使用官方提供或者自己实现

4.1.1. kafka

  • FlinkKafkaProducer011<String>()
  • FlinkKafkaProducer09<String>()
  • FlinkKafkaProducer010<String>()
  • FlinkKafkaProducerBase<String>()

4.1.2. redis

  • RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper)

4.1.3. Elasticsearch

  • ElasticsearchSinkBase<T, C entends AutoCloseable>

4.1.4. JDEBC

实现RichSinkFunction<T>,创建数据库连接,操作数据库

5. 装配并执行

1
2
3
DataStreamSource<String> source = env.addSource(kafka10);
source.addSink(sink);
env.execute();