Kafka 常用命令

1. 服务命令

  • 启动 zk : zkServer.sh start
  • 启动 kafka : ./bin/kafka-server-start.sh -daemon config/server.properties
  • 停止 kafka : ./bin/kafka-server-stop.sh

2. topic命令

旧版将 topic 存储在 zookeeper 中,所以使用 zookeeper 参数,新版全部存储在本身一个 topic 中,叫 __consumer_offsets,因此改用 bootstrap-server 参数

1. 查看

  • 查看所有 : ./bin/kafka-topics.sh --list --bootstrap-server 10.10.10.10:9092
  • 查看某个 : ./bin/kafka-topics.sh --describe --bootstrap-server 10.10.10.10:9092 --topic TPNAME

2. 创建

./bin/kafka-topics.sh --create --bootstrap-server 10.10.10.10:9092 --topic TPNAME --replication-factor 1 --partitions 1

参数

  • --topic : 定义 topic 名
  • --replication-factor : 定义副本数
  • --partition : 定义分区数

3. 删除

./bin/kafka-topics.sh --delete --bootstrap-server 10.10.10.10:9092 --topic TPNAME

4. 修改

  • 修改分区数 : bin/kafka-topics.sh --alter --bootstrap-server 10.10.10.10:9092 --topic TPNAME --partition 3

5. 查看版本

find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

3. 消费组命令

1. 查看

  • 配置文件方式 : 查看 consumer.properties 中的 group.id=test-consumer-group
  • 命令行方式 :
1
2
3
4
5
6
7
# 新版
./bin/kafka-consumer-groups.sh --bootstrap-server 172.17.0.3:9092 --list
./bin/kafka-consumer-groups.sh --bootstrap-server 172.17.0.3:9092 --group test-consumer-group --describe

# 旧版
./bin/kafka-consumer-groups.sh --zookeeper 172.17.0.2:2181 --list
./bin/kafka-consumer-groups.sh --zookeeper 172.17.0.2:2181 --group test-consumer-group --describe

4. offset 命令

1. 查看最小 offset,即最早的

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.1:9092 --topic mykafka --time -2

2. 查看最大 offset,即最新的

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.1:9092 --topic mykafka --time -1

5. broker 命令

1. 查看

一般利用 zookeeper 来查看

6. 测试生产者与消费者

1. 测试生产者启动

./bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic mykafka

可选参数

参数 值类型 说明 有效值
–broker-list String (必需)服务器地址 如:host1:prot1,host2:prot2
–topic String (必需)接收消息的主题名称
–batch-size Integer 单个批处理中发送的消息数 200(默认值)
–compression-codec String 压缩编解码器 none、gzip(默认值)snappy、lz4、zstd
–max-block-ms Long 在发送请求期间,生产者将阻止的最长时间 60000(默认值)
–max-memory-bytes Long 生产者用来缓冲等待发送到服务器的总内存 33554432(默认值)
–max-partition-memory-bytes Long 为分区分配的缓冲区大小 16384
–message-send-max-retries Integer 最大的重试发送次数 3
–metadata-expiry-ms Long 强制更新元数据的时间阈值(ms) 300000
–producer-property String 将自定义属性传递给生成器的机制 如:key=value
–producer.config String 生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径
–property String 自定义消息读取器
parse.key=true/false
key.separator=key.separator
ignore.error=true/false
--queue-enqueuetimeout-ms integer 事件入队超时时间 2147483647
--queue-size integer 异步模式时,最大批处理消息量 10000
–request-required-acks String 生产者请求的确认方式 0、1(默认值)、all
–request-timeout-ms Integer 生产者请求的确认超时时间 1500(默认值)
–retry-backoff-ms Integer 生产者重试前,刷新元数据的等待时间阈值 100(默认值)
–socket-buffer-size Integer TCP接收缓冲大小 102400(默认值)
–timeout Integer 消息排队异步等待处理的时间阈值 1000(默认值)
–sync 同步发送消息
–version 显示 Kafka 版本 不配合其他参数时,显示为本地Kafka版本
–help 打印帮助信息

2. 测试消费者启动

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.1:9092 --topic mykafka --from-beginning --max-messages 10

可选参数

参数 值类型 说明 有效值
--bootstrap-server string 必需(除非使用旧版本的消费者),要连接的服务器
--topic string 被消费的topic
--offset string 执行消费的起始offset位置
默认值:latest
latest
earliest
--from-beginning 从存在的最早消息开始,而不是从最新消息开始
--whitelist string 正则表达式,指定要包含以供使用的主题的白名单
--partition integer 指定分区,除非指定’–offset’,否则从分区结束(latest)开始消费
--property string 初始化消息格式化程序的属性
print.timestamp=true,false
print.key=true,false
print.value=true,false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.deserializer>
--consumer-property string 将用户定义的属性以key=value的形式传递给使用者
--consumer.config string 消费者配置属性文件
请注意,[consumer-property]优先于此配置
--formatter string 用于格式化kafka消息以供显示的类的名称
默认值:kafka.tools.DefaultMessageFormatter
--max-messages integer 消费的最大数据量,若不指定,则持续消费下去
--timeout-ms integer 在指定时间间隔内没有消息可用时退出
--skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停
--key-deserializer string
--value-deserializer string
--enable-systest-events 除记录消费的消息外,还记录消费者的生命周期
(用于系统测试)
--isolation-level string
设置为 read_committed 以过滤掉未提交的事务性消息
设置为 read_uncommitted 以读取所有消息
默认值:read_uncommitted
--group string 指定消费者所属组的ID
--blacklist string 要从消费中排除的主题黑名单
--csv-reporter-enabled 如果设置,将启用csv metrics报告器
--delete-consumer-offsets 如果指定,则启动时删除zookeeper中的消费者信息
--metrics-dir string 输出csv度量值,需与’csv-reporter-enable’配合使用
--zookeeper string 必需(仅当使用旧的使用者时)连接zookeeper的字符串