1. 创建环境
1.1. getExecutionEnvironment
根据查询运行的方式返回相应的运行环境,并行度可以从flink-conf.yaml
中获取,或者手动设置
1 | // 批 |
1.2. createLocalEnvironment()
返回本地运行环境,必须设置并行度
1 | // 返回本地环境 |
1.3. createRemoteEnvironment()
返回集群运行环境,必须设置设置jobManager
的 IP 和端口号、在集群中运行的 jar
包
1 | // 返回远程环境 |
2. 建立 source
2.1. 集合 collection
1 | env.fromCollection(List<...>); |
2.2. 文件 file
1 | env.readTextFile("/usr/local/demo/test.txt"); |
2.3. addSource(SourceFunction<T>)
添加其他 source ,可以使用官方提供或者自己实现
2.3.1. kafka
FlinkKafkaConsumer011<String>()
FlinkKafkaConsumer09<String>()
FlinkKafkaConsumer010<String>()
FlinkKafkaConsumerBase<String>()
2.4. addSource(new MySourceFunction<OUT>)
需要实现接口 SourceFunction<OUT>
run(ctx)
:使用ctx.collect()
将数据传送出去cancel()
:停止传输的条件
1 | public static class MySourceFunction implements SourceFunction<String> { |
3. 转换 Transform
见 常用算子
4. 建立 sink
4.1. addSink(SinkFunction<T>)
添加其他 sink ,可以使用官方提供或者自己实现
4.1.1. kafka
FlinkKafkaProducer011<String>()
FlinkKafkaProducer09<String>()
FlinkKafkaProducer010<String>()
FlinkKafkaProducerBase<String>()
4.1.2. redis
RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper)
4.1.3. Elasticsearch
ElasticsearchSinkBase<T, C entends AutoCloseable>
4.1.4. JDEBC
实现RichSinkFunction<T>
,创建数据库连接,操作数据库
5. 装配并执行
1 | DataStreamSource<String> source = env.addSource(kafka10); |