Kafka的基本命令

发表时间:2017-11-22 14:35:30 浏览量( 17 ) 留言数( 0 )


客户

 /opt/software/kafka_2.11-0.10.0.0/bin/kafka-console-consumer.sh --zookeeper 112.74.85.154 2181 --topic    队列名称

生产端

 /opt/software/kafka_2.11-0.10.0.0/bin/kafka-console-producer.sh --broker-list  112.74.85.154:9092 --topic  队列名称


  • 列出所有topic:

bin/kafka-topics.sh --zookeeper localhost:2181 --list

说明:其实就是去检查zk上节点的/brokers/topics子节点,打印出来

##查看topic分布情况kafka-list-topic.sh

bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)

bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --topic test (查看test的分区情况)

  • 创建topic:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic order_ledger_check_complete_test --partitions 4 --replication-factor 3

说明:线上环境我们会将自动创建topic禁用掉,改为手动创建(auto.create.topics.enable=false),parttitions和replication-factor是两个必备选项,第一个参数是消费并行度的一个重要参数,第二个极大提高了topic的可用性.备份因子默认是1,相当于没有备份,注意其值不能大于broker个数,否则会报错。同时还可以指定topic级别的配置参数,这种特定的配置会覆盖掉默认配置,并且存储在zookeeper的/config/topics/[topic_name]节点数据里。--alter  --config  --deleteConfig

  • 删除topic:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic payment_completed --delete

说明:在0.8.2.1之前的版本一般不建议之行删除操作,因为有各种各样的bug存在,目前的版本稳定些,同时我们需要将配置参数打开(delete.topic.enable=true),删除操作其实是通过更改一个zk节点,由另外的删除线程异步做的topicdeletionmanager。

  • 增加partitions:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10

说明:只能增加,不能减少。如果原有分散策略是hash的方式,将会受影响。发送端(默认10分钟会刷新本地存储元信息)和消费端都无需重启即可生效。

  • 增减replica:

只需要编辑 --reassignment-json-file,添加或者减少broker id即可。其他操作可以依赖partition reassignment tool 的--execute --verify.

  • 消费消息:

watch --interval=2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic payment_completed_dev --from-beginning

  • 查询消费信息:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group group_order_ledger --topic payment_completed_dev

  • 写入消息性能测试:

bin/kafka-producer-perf-test.sh --broker-list kafka1.weibo.com:9092 --batch-size 1 --message-size 1024 --messages 10000 --sync --topics topic_test

kafka彻底删除topic

如果只是用kafka-topics.sh的delete命令删除topic,会有两种情况:

  1. 如果当前topic没有使用过即没有传输过信息:可以彻底删除

  2. 如果当前topic有使用过即有过传输过信息:并没有真正删除topic只是把这个topic标记为删除(marked for deletion)。

要彻底把情况2中的topic删除必须把kafka中与当前topic相关的数据目录和zookeeper与当前topic相关的路径一并删除。

相关组件的版本

zookeeper: 3.4.6

kafka: 0.9.0.0

 

这里假设要删除的topic是test,kafka的zookeeper root为/kafka

删除kafka相关的数据目录

数据目录请参考目标机器上的kafka配置:server.properties -> log.dirs=/var/kafka/log/tmp

su rm -r /var/kafka/log/tmp/test*

 

删除kafka topic

/home/kafka/bin/kafka-topics.sh --delete --zookeeper HadoopMaster:2181/kafka --topic test

 

删除zookeeper相关的路径

打开zookeeper client

/home/ZooKeeper/bin/zkCli.sh

执行下面的命令

把test替换成你要删除的topic

#删除topic test的consumer group,如果有消费记录的话

rmr /kafka/consumers/test-group

rmr /kafka/config/topics/test

rmr /kafka/brokers/topics/test

rmr /kafka/admin/delete_topics/test

 

完成

重启zookeeper和kafka可以用下面命令查看相关的topic还在不在:

/home/kafka/bin/kafka-topics.sh --list --zookeeper HadoopMaster:2181/kafka