1. 服务命令
- 启动
zk
:zkServer.sh start
- 启动
kafka
:./bin/kafka-server-start.sh -daemon config/server.properties
- 停止
kafka
:./bin/kafka-server-stop.sh
2. topic
命令
旧版将 topic 存储在 zookeeper 中,所以使用 zookeeper
参数,新版全部存储在本身一个 topic 中,叫 __consumer_offsets
,因此改用 bootstrap-server
参数
1. 查看
- 查看所有 :
./bin/kafka-topics.sh --list --bootstrap-server 10.10.10.10:9092
- 查看某个 :
./bin/kafka-topics.sh --describe --bootstrap-server 10.10.10.10:9092 --topic TPNAME
2. 创建
./bin/kafka-topics.sh --create --bootstrap-server 10.10.10.10:9092 --topic TPNAME --replication-factor 1 --partitions 1
参数
--topic
: 定义 topic 名--replication-factor
: 定义副本数--partition
: 定义分区数
3. 删除
./bin/kafka-topics.sh --delete --bootstrap-server 10.10.10.10:9092 --topic TPNAME
4. 修改
- 修改分区数 :
bin/kafka-topics.sh --alter --bootstrap-server 10.10.10.10:9092 --topic TPNAME --partition 3
5. 查看版本
find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
3. 消费组命令
1. 查看
- 配置文件方式 : 查看
consumer.properties
中的group.id=test-consumer-group
- 命令行方式 :
1 | 新版 |
4. offset
命令
1. 查看最小 offset
,即最早的
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.1:9092 --topic mykafka --time -2
2. 查看最大 offset
,即最新的
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.1:9092 --topic mykafka --time -1
5. broker
命令
1. 查看
一般利用 zookeeper
来查看
6. 测试生产者与消费者
1. 测试生产者启动
./bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic mykafka
可选参数
参数 | 值类型 | 说明 | 有效值 |
---|---|---|---|
–broker-list |
String | (必需)服务器地址 | 如:host1:prot1,host2:prot2 |
–topic |
String | (必需)接收消息的主题名称 | |
–batch-size |
Integer | 单个批处理中发送的消息数 | 200(默认值) |
–compression-codec |
String | 压缩编解码器 | none、gzip(默认值)snappy、lz4、zstd |
–max-block-ms |
Long | 在发送请求期间,生产者将阻止的最长时间 | 60000(默认值) |
–max-memory-bytes |
Long | 生产者用来缓冲等待发送到服务器的总内存 | 33554432(默认值) |
–max-partition-memory-bytes |
Long | 为分区分配的缓冲区大小 | 16384 |
–message-send-max-retries |
Integer | 最大的重试发送次数 | 3 |
–metadata-expiry-ms |
Long | 强制更新元数据的时间阈值(ms) | 300000 |
–producer-property |
String | 将自定义属性传递给生成器的机制 | 如:key=value |
–producer.config |
String | 生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径 | |
–property |
String | 自定义消息读取器 | parse.key=true/false key.separator=key.separator ignore.error=true/false |
--queue-enqueuetimeout-ms |
integer | 事件入队超时时间 | 2147483647 |
--queue-size |
integer | 异步模式时,最大批处理消息量 | 10000 |
–request-required-acks |
String | 生产者请求的确认方式 | 0、1(默认值)、all |
–request-timeout-ms |
Integer | 生产者请求的确认超时时间 | 1500(默认值) |
–retry-backoff-ms |
Integer | 生产者重试前,刷新元数据的等待时间阈值 | 100(默认值) |
–socket-buffer-size |
Integer | TCP接收缓冲大小 | 102400(默认值) |
–timeout |
Integer | 消息排队异步等待处理的时间阈值 | 1000(默认值) |
–sync |
同步发送消息 | ||
–version |
显示 Kafka 版本 | 不配合其他参数时,显示为本地Kafka版本 | |
–help |
打印帮助信息 |
2. 测试消费者启动
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.1:9092 --topic mykafka --from-beginning --max-messages 10
可选参数
参数 | 值类型 | 说明 | 有效值 |
---|---|---|---|
--bootstrap-server |
string | 必需(除非使用旧版本的消费者),要连接的服务器 | |
--topic |
string | 被消费的topic | |
--offset |
string | 执行消费的起始offset位置 默认值:latest |
latest earliest |
--from-beginning |
从存在的最早消息开始,而不是从最新消息开始 | ||
--whitelist |
string | 正则表达式,指定要包含以供使用的主题的白名单 | |
--partition |
integer | 指定分区,除非指定’–offset’,否则从分区结束(latest)开始消费 | |
--property |
string | 初始化消息格式化程序的属性 | print.timestamp=true,false print.key=true,false print.value=true,false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> |
--consumer-property |
string | 将用户定义的属性以key=value的形式传递给使用者 | |
--consumer.config |
string | 消费者配置属性文件 请注意,[consumer-property]优先于此配置 |
|
--formatter |
string | 用于格式化kafka消息以供显示的类的名称 默认值:kafka.tools.DefaultMessageFormatter |
|
--max-messages |
integer | 消费的最大数据量,若不指定,则持续消费下去 | |
--timeout-ms |
integer | 在指定时间间隔内没有消息可用时退出 | |
--skip-message-on-error |
如果处理消息时出错,请跳过它而不是暂停 | ||
--key-deserializer |
string | ||
--value-deserializer |
string | ||
--enable-systest-events |
除记录消费的消息外,还记录消费者的生命周期 (用于系统测试) |
||
--isolation-level |
string | 设置为 read_committed 以过滤掉未提交的事务性消息 设置为 read_uncommitted 以读取所有消息默认值:read_uncommitted |
|
--group |
string | 指定消费者所属组的ID | |
--blacklist |
string | 要从消费中排除的主题黑名单 | |
--csv-reporter-enabled |
如果设置,将启用csv metrics报告器 | ||
--delete-consumer-offsets |
如果指定,则启动时删除zookeeper中的消费者信息 | ||
--metrics-dir |
string | 输出csv度量值,需与’csv-reporter-enable’配合使用 | |
--zookeeper |
string | 必需(仅当使用旧的使用者时)连接zookeeper的字符串 |