KafKa的集群搭建

发表时间:2017-07-19 22:09:05 浏览量( 36 ) 留言数( 0 )

学习目标:

1、了解消息队列相关技术

2、了解消息队列的一些常见技术和对比


学习过程:

   这里我们不使用KafKa内置的zookeeper组件,因为在上一节我们已经安装了zookeeper的集群了。像kafka框架,在设计之初就已经对集群的支持,所以Kafka集群安装非常简单。

1、下载kafka并解压

     [root@run1 opt]# tar -zxvf zookeeper-3.4.9.tar.gz

2、修改权限为liubao:liubao

     chown -R liubao:liubao kafka

3、修改server.properties配置文件信息,三台不同的run,其id和host.name对应为其broker.id=2,broker.id=3;host.name=run2.com,host.name=run3.com

listeners=PLAINTEXT://run2.com:9092    之前没有这配置监听,导致出错,

broker.id=1

log.dirs=/opt/kafka/logs

host.name=run1.com

zookeeper.connect=run1.com:2181,run2.com:2181,run3.com:2181

备注:listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的(但是后来应该是修复了这个错误)

4、启动

kafka-server-start.sh /opt/kafka/config/server.properties >/dev/null 2>&1 &

切换到liubao

su -liubao

vim .bashrc

启动比较长,可以建立别名

alias kafkastart='kafka-server-start.sh /opt/kafka/config/server.properties >/dev/null 2>&1 &'

5、建立一个队列topic ,名字是hellotest

 kafka-topics.sh --create --zookeeper run1.com:2181 run2.com:2181 run3.com:2181  --replication-factor 3 --partitions 3 --topic hellotest

6、查看队列信息

[liubao@run1 ~]$ kafka-topics.sh --describe --zookeeper run1.com:2181 run2.com:2181 run3.com:2181  --topic hellotest

Topic:hellotest PartitionCount:3        ReplicationFactor:3     Configs:

        Topic: hellotest        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

        Topic: hellotest        Partition: 1    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2

        Topic: hellotest        Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

7、使用zookeeper客户端登录查看

zkCli.sh  -server run1.com:2181  --当然也可以登录其他的zookeeper

[zk: run2.com:2181(CONNECTED) 0] ls /

[controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, config]

[zk: run2.com:2181(CONNECTED) 1] ls /brokers/ids

[1, 2, 3]

[zk: run2.com:2181(CONNECTED) 2] ls /brokers

[ids, topics, seqid]

[zk: run2.com:2181(CONNECTED) 3] ls /brokers/topics

[hellotest]

[zk: run2.com:2181(CONNECTED) 4] ls /brokers/topics/hellotest

[partitions]

[zk: run2.com:2181(CONNECTED) 5] ls /brokers/topics/hellotest/partitions

[0, 1, 2]

[zk: run2.com:2181(CONNECTED) 6] ls /brokers/topics/hellotest/partitions/0

[state]

[zk: run2.com:2181(CONNECTED) 7] ls /brokers/topics/hellotest/partitions/0/state

[]

[zk: run2.com:2181(CONNECTED) 8]

8、修改防火墙,kafka的默认端口是9092

vim /etc/sysconfig/iptables

 -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

重启防火墙

/etc/init.d/iptables restart

查看防火墙状态

/etc/init.d/iptables status

9、使用两台客户端,一个是生产者,一个是消费者

生产者

[liubao@run1 ~]$ kafka-console-producer.sh -broker-list run1.com:9092,run2.com:9092,run3.com:9092 -topic hellotest

消费者

[liubao@run2 config]$ kafka-console-consumer.sh --zookeeper run1.com:2181,run2.com:2181,run3.com:2181 --from-beginning --topic hellotest

这个时候你可以使用生产者不断写东西,消费者会自动打印这些东西出来

启动消费者报错,根据提示:修改/etc/hosts 添加主机名的映射,再次启动消费者  三台服务器都必须如此

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 run2