杂记-Flink 实战

1. conf 目录下几个配置文件文件作用

How to use logging

  • log4j.properties : 默认日志配置,适用于 jm/tm
  • log4j-cli.properties : 只适用于命令行,如 ./flink run
  • logback.xml : 默认日志配置,适用于命令行和 jm/tm
  • log4j-console.properties/logback-console.xml : 使用 jm/tm 时,前台展示时适用
  • log4j-session.properties/logback-session.xml : 命令行客户端使用 Kubernetes/Yarn session 模式时适用

2. 日志按照每个作分开业打印

  1. 使用 per job 模式,每个作业占用一个 tm,可以保证一个节点只输出一个作业的日志
  2. 修改环境变量,查看 ./bin/flink脚本可知,其日志打印时会调用两个变量:
    • FLINK_LOG_DIR : 指定日志输出文件路径
    • FLINK_IDENT_STRING : 指定日志文件标识,默认为 $USER,当指定时,文件名为 flink-标识-client-$HOSTNAME.log
  3. (待考)根据 log4j.properties 配置,文件名是取值 ${sys:log.file} ,可在 log.file 中增加动态参数,将作业名传递进去,或者,在启动作业时,加载自定义 lig4j 配置文件,已知信息
    • 文件启用了每 30s 更新一次配置文件的配置 monitorInterval=30
    • 文件名配置 appender.main.fileName = ${sys:log.file}

3. 日志名

日志命名方式, 来自 flink-daemon.sh : ${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}

  • FLINK_LOG_DIR : 日志目录, 在配置文件中配置
  • FLINK_IDENT_STRING : 来自变量, $USER, 用户名
  • DAEMON : 来自传参, 启动集群默认为 standalonesession, 其他情况, 参考上述脚本的 32-57 行, case 分支
  • id : 根据 pid 文件中行数取值, 此举是为了启动多个 daemon 程序
  • HOSTNAME : 主机 hostname 名
  1. 按照 zeppelin 添加 jar 并重启 f+z
  • Set zeppelin.flink.enableHive to be true
  • Set zeppelin.flink.hive.version to be the hive version you are using.
  • Set HIVE_CONF_DIR to be the location where hive-site.xml is located. Make sure hive metastore is started and you have configured hive.metastore.uris in hive-site.xml
  • Copy the following dependencies to the lib folder of flink installation.
    • flink-connector-hive_2.11–*.jar
    • flink-hadoop-compatibility_2.11–*.jar
    • hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303–0.9.2.jar and - libthrift-0.9.2.jar)
  1. Caused by: java.lang.ClassNotFoundException: org.apache.commons.configuration.Configuration
    添加包 commons-configuration-1.9.jar 到 flink lib 并重启

  2. Caused by: java.lang.ClassNotFoundException: javax.servlet.Filter
    添加包 javax.servlet-api-4.0.1.jar 到 flink lib 并重启

  3. Caused by: java.lang.ClassNotFoundException: org.antlr.runtime.tree.CommonTree
    添加包 antlr-runtime-3.5.2.jar 到 flink lib 并重启

  4. Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.conf.Configuration
    安装 hadoop 客户端, 将 HADOOP_CLASSPAT 设置为 hadoop classpath

  5. 转换

    1
    2
    3
    4
    Caused by: java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetFileInfoRequestProto cannot be cast to com.google.protobuf.Message
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:226)
    at com.sun.proxy.$Proxy49.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752)

    ClientNamenodeProtocolProtos 来自 hadoop-common-x.x.x.jar
    ClientNamenodeProtocolTranslatorPB 来自 hadoop-hdfs-client-x.x.x.jar
    原因 hadoop-common 版本与其他不一致导致

  6. 可以查询数据, 但无法写入

  • 一定要开启 checkpoint 才能使文件从 inprogress 变为结束状态
  • 开启 checkpoint 需要在 zeppelin 配置

5. 指标

背压指标

  • outPoolUsage : 发送端 Buffer 的使用率
  • inPoolUsage : 接收端 Buffer 的使用率,inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage
  • floatingBuffersUsage : 接收端 Floating Buffer 的使用率
  • exclusiveBuffersUsage : 接收端 Exclusive Buffer 的使用率