Confluent/KRaft

来自Gea-Suan Lin's Wiki
Gslin讨论 | 贡献2023年10月3日 (二) 06:52的版本 →‎Broker
(差异) ←上一版本 | 最后版本 (差异) | 下一版本→ (差异)
跳到导航 跳到搜索

Confluent/KRaftApache Kafka在2.8之后推出的功能,由Confluent包装的版本,目的是为了取代Apache ZooKeeper的功能。

安装

软件的安装在controller或是broker都一样(比较简单,没跑起来的软件只是占个磁盘空间),参考Confluent条目的说明即可。

设定

Controller

这边是三controller机的设定。

先把/etc/kafka/kraft/controller.properties复制到/etc/kafka/server.properties下,然后把以下的设定值改成对应的值:

process.roles=controller
node.id=1
controller.quorum.voters=1@kafka-controller-1.example.com:19092,2@kafka-controller-2.example.com:19092,3@kafka-controller-3.example.com:19092
listeners=CONTROLLER://kafka-controller-1.example.com:19092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/opt/kraft-controller-logs

其中需要特别讲解的是:

  • node.id每台机器都不同。
  • controller.quorum.votersn@hostname:19092格式中,n就是node.id,而hostname为可以连到的位置(可以是IP),另外因为目前官方没有比较好的TCP port选择,社群蛮多人都是设定19092。
  • CONTROLLER是从其他会动的设定档里面抄来的。这个部分是沿袭Apache Kafka文件的传统,你看完一堆文件后不知道他到底是什么意思,文件自身也互相矛盾。

接着先产生后续kafka-storage需要用的值,首先是要产生出一组叫做“UUID”的字串(22 chars,对应到Base62或是Base64的entropy都够128bits,但格式不是常见的UUID字串表示法)。这边要注意,产生一次就可以,三台用的值都会是同一个:

kafka-storage random-uuid

接着先建立/opt/kraft-controller-logs与对应的权限:

sudo mkdir /opt/kraft-controller-logs
sudo chown cp-kafka:confluent /opt/kraft-controller-logs

再来透过kafka-storage/opt/kraft-controller-logs这个目录初始化,这边会用sudo -u cp-kafka要确保目录权限正确:

sudo -u cp-kafka kafka-storage format --config=/etc/kafka/server.properties --cluster-id=xxxxxxxxxxxxxxxxxxxxxx

设定完成后可以启动,然后看一下状态与log档案,目前的版本会看到每0.5秒打一次snapshot的讯息[1]

sudo service confluent-kafka start
sudo service confluent-kafka status
sudo tail -F /var/log/kafka/server.log


另外从TCP连线也可以看到连线的情况(通常只会剩下一个连到leader的连线):

netstat -an | grep 19092

都没问题后可以设定开机跑起来:

sudo systemctl enable confluent-kafka

另外这些log档案预设不会被清除,所以要修改/etc/kafka/log4j.properties,把server.log的部分换成[2]

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log                                                                 
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender.MaxFileSize=128MB
log4j.appender.kafkaAppender.MaxBackupIndex=10

Broker

这边是三broker机的设定。

先把/etc/kafka/kraft/broker.properties复制到/etc/kafka/server.properties下,然后把以下的设定值改成对应的值:

process.roles=broker
node.id=4
controller.quorum.voters=1@kafka-controller-1.example.com:19092,2@kafka-controller-2.example.com:19092,3@kafka-controller-3.example.com:19092
listeners=PLAINTEXT://kafka-broker-1.example.com:9092
advertised.listeners=PLAINTEXT://kafka-broker-1.example.com:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/opt/kraft-broker-logs
default.replication.factor=3

其中需要特别讲解的是:

  • node.id每台机器都不同,如果controller已经吃掉123,这边就要设定456
  • controller.quorum.voters会从controller那边的设定复制过来。
  • default.replication.factor预设值建议设定成3

接着先建立/opt/kraft-controller-logs与对应的权限:

sudo mkdir /opt/kraft-broker-logs
sudo chown cp-kafka:confluent /opt/kraft-broker-logs

再来透过kafka-storage/opt/kraft-broker-logs这个目录初始化,这边会用sudo -u cp-kafka要确保目录权限正确,另外cluster-id就是跟上面controller时设定的一样:

sudo -u cp-kafka kafka-storage format --config=/etc/kafka/server.properties --cluster-id=xxxxxxxxxxxxxxxxxxxxxx

设定完成后可以启动,然后看一下状态与log档案:

sudo service confluent-kafka start
sudo service confluent-kafka status
sudo tail -F /var/log/kafka/server.log

另外从TCP连线也可以看到连线的情况(通常会看到连到leader的19092与自己在听的9092):

netstat -an | grep 9092

都没问题后可以设定开机跑起来:

sudo systemctl enable confluent-kafka

接着可以建立broker的load balancer,提供给Kafka的用户端连线使用。

另外这些log档案预设不会被清除,所以要修改/etc/kafka/log4j.properties,把server.log的部分换成[2]

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log                                                                 
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender.MaxFileSize=128MB
log4j.appender.kafkaAppender.MaxBackupIndex=10

相关条目

参考资料

  1. KRaft - Kafka nodes continuously generating metadata deltas from snapshots #120. [2023-05-31] (English). 
  2. 2.0 2.1 How to rotate Kafka logs. 2021-12-10 [2023-08-18] (English). 

外部链接