Confluent/KRaft

出自Gea-Suan Lin's Wiki
跳至導覽 跳至搜尋

Confluent/KRaftApache Kafka在2.8之後推出的功能,由Confluent包裝的版本,目的是為了取代Apache ZooKeeper的功能。

安裝

軟體的安裝在controller或是broker都一樣(比較簡單,沒跑起來的軟體只是佔個磁碟空間),參考Confluent條目的說明即可。

設定

Controller

這邊是三controller機的設定。

先把/etc/kafka/kraft/controller.properties複製到/etc/kafka/server.properties下,然後把以下的設定值改成對應的值:

process.roles=controller
node.id=1
controller.quorum.voters=1@kafka-controller-1.example.com:19092,2@kafka-controller-2.example.com:19092,3@kafka-controller-3.example.com:19092
listeners=CONTROLLER://kafka-controller-1.example.com:19092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/opt/kraft-controller-logs

其中需要特別講解的是:

  • node.id每台機器都不同。
  • controller.quorum.votersn@hostname:19092格式中,n就是node.id,而hostname為可以連到的位置(可以是IP),另外因為目前官方沒有比較好的TCP port選擇,社群蠻多人都是設定19092。
  • CONTROLLER是從其他會動的設定檔裡面抄來的。這個部分是沿襲Apache Kafka文件的傳統,你看完一堆文件後不知道他到底是什麼意思,文件自身也互相矛盾。

接著先產生後續kafka-storage需要用的值,首先是要產生出一組叫做「UUID」的字串(22 chars,對應到Base62或是Base64的entropy都夠128bits,但格式不是常見的UUID字串表示法)。這邊要注意,產生一次就可以,三台用的值都會是同一個:

kafka-storage random-uuid

接著先建立/opt/kraft-controller-logs與對應的權限:

sudo mkdir /opt/kraft-controller-logs
sudo chown cp-kafka:confluent /opt/kraft-controller-logs

再來透過kafka-storage/opt/kraft-controller-logs這個目錄初始化,這邊會用sudo -u cp-kafka要確保目錄權限正確:

sudo -u cp-kafka kafka-storage format --config=/etc/kafka/server.properties --cluster-id=xxxxxxxxxxxxxxxxxxxxxx

設定完成後可以啟動,然後看一下狀態與log檔案,目前的版本會看到每0.5秒打一次snapshot的訊息[1]

sudo service confluent-kafka start
sudo service confluent-kafka status
sudo tail -F /var/log/kafka/server.log


另外從TCP連線也可以看到連線的情況(通常只會剩下一個連到leader的連線):

netstat -an | grep 19092

都沒問題後可以設定開機跑起來:

sudo systemctl enable confluent-kafka

另外這些log檔案預設不會被清除,所以要修改/etc/kafka/log4j.properties,把server.log的部分換成[2]

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log                                                                 
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender.MaxFileSize=128MB
log4j.appender.kafkaAppender.MaxBackupIndex=10

Broker

這邊是三broker機的設定。

先把/etc/kafka/kraft/broker.properties複製到/etc/kafka/server.properties下,然後把以下的設定值改成對應的值:

process.roles=broker
node.id=4
controller.quorum.voters=1@kafka-controller-1.example.com:19092,2@kafka-controller-2.example.com:19092,3@kafka-controller-3.example.com:19092
listeners=PLAINTEXT://kafka-broker-1.example.com:9092
advertised.listeners=PLAINTEXT://kafka-broker-1.example.com:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/opt/kraft-broker-logs
default.replication.factor=3

其中需要特別講解的是:

  • node.id每台機器都不同,如果controller已經吃掉123,這邊就要設定456
  • controller.quorum.voters會從controller那邊的設定複製過來。
  • default.replication.factor預設值建議設定成3

接著先建立/opt/kraft-controller-logs與對應的權限:

sudo mkdir /opt/kraft-broker-logs
sudo chown cp-kafka:confluent /opt/kraft-broker-logs

再來透過kafka-storage/opt/kraft-broker-logs這個目錄初始化,這邊會用sudo -u cp-kafka要確保目錄權限正確,另外cluster-id就是跟上面controller時設定的一樣:

sudo -u cp-kafka kafka-storage format --config=/etc/kafka/server.properties --cluster-id=xxxxxxxxxxxxxxxxxxxxxx

設定完成後可以啟動,然後看一下狀態與log檔案:

sudo service confluent-kafka start
sudo service confluent-kafka status
sudo tail -F /var/log/kafka/server.log

另外從TCP連線也可以看到連線的情況(通常會看到連到leader的19092與自己在聽的9092):

netstat -an | grep 9092

都沒問題後可以設定開機跑起來:

sudo systemctl enable confluent-kafka

接著可以建立broker的load balancer,提供給Kafka的用戶端連線使用。

另外這些log檔案預設不會被清除,所以要修改/etc/kafka/log4j.properties,把server.log的部分換成[2]

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log                                                                 
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender.MaxFileSize=128MB
log4j.appender.kafkaAppender.MaxBackupIndex=10

相關條目

參考資料

  1. KRaft - Kafka nodes continuously generating metadata deltas from snapshots #120. [2023-05-31] (English). 
  2. 2.0 2.1 How to rotate Kafka logs. 2021-12-10 [2023-08-18] (English). 

外部連結