Confluent/ZooKeeper

出自Gea-Suan Lin's Wiki
跳至導覽 跳至搜尋

Confluent/ZooKeeper指的是Confluent所包裝的Apache ZooKeeper,裡面有些路徑與設定檔的設法與官方版本不同。現在的版本只有在ZooKeeper模式才需要安裝ZooKeeper;在KRaft模式是不需要安裝ZooKeeper的。

需求

在測試環境下,ZooKeeper可以只使用t3.nano(512 MB的記憶體),另外手動加上512 MB的Swap執行。

在正式環境下,Confluent官方的建議是4 GB的記憶體與64 GB以上的SSD[1]

防火牆

ZooKeeper的主機之間需要對開這些Port:

  • 2888/tcp
  • 3888/tcp

ZooKeeper的主機對外需要開這些Port:

  • 2181/tcp
  • 32181/tcp(JMX)

設定

修改/etc/kafka/zookeeper.properties,其中IP address需要填寫對應的位置:

#
tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=10.1.1.1:2888:3888
server.2=10.2.2.2:2888:3888
server.3=10.3.3.3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24

新增/var/lib/zookeeper/myid,每一台都需要不同,123

1

然後修改檔案擁有人:

sudo chown cp-kafka:confluent /var/lib/zookeeper/myid

目前的ZooKeeper(Confluent 2.11版內的ZooKeeper)預設值是使用512 MB的記憶體,但主機有7.5 GB的記憶體,所以會想要讓ZooKeeper可以用7 GB,因此需要修改ZooKeeper的JVM參數。

另外是增加JMX的監控機制,使用Port 32181。

這兩個需要新增/lib/systemd/system/confluent-zookeeper.service.d/30-options.conf(目錄可能需要自己建立):

[Service]
Environment=JMX_PORT=32181
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dzookeeper.4lw.commands.whitelist=stat,ruok,conf,isro"

對應的指令:

sudo mkdir /lib/systemd/system/confluent-zookeeper.service.d
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dzookeeper.4lw.commands.whitelist=stat,ruok,conf,isro"' | sudo tee /lib/systemd/system/confluent-zookeeper.service.d/30-options.conf

最後修改/etc/kafka/log4j.properties,讓log不要吃爆硬碟空間[2]

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Unspecified loggers and loggers with additivity=true output to server.log and stdout
# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
log4j.rootLogger=INFO, stdout, kafkaAppender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender.MaxFileSize=500MB
log4j.appender.kafkaAppender.MaxBackupIndex=5

log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stateChangeAppender.MaxFileSize=500MB
log4j.appender.stateChangeAppender.MaxBackupIndex=5

log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.requestAppender.MaxFileSize=500MB
log4j.appender.requestAppender.MaxBackupIndex=5

log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender.MaxFileSize=500MB
log4j.appender.cleanerAppender.MaxBackupIndex=5

log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender.MaxFileSize=500MB
log4j.appender.controllerAppender.MaxBackupIndex=5

log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.authorizerAppender.MaxFileSize=500MB
log4j.appender.authorizerAppender.MaxBackupIndex=5

# Change the two lines below to adjust ZK client logging
log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
log4j.logger.org.apache.zookeeper=INFO

# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

# Change to DEBUG or TRACE to enable request logging
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output
# related to the handling of requests
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

理論上就可以啟動了:

sudo systemctl daemon-reload
sudo systemctl enable confluent-zookeeper
sudo service confluent-zookeeper start
sudo service confluent-zookeeper status

可以看輸出的資訊判斷系統狀態,可以看ModeNode count資訊簡單確認cluster的情況:

echo stat | nc 127.0.0.1 2181

或是直接透過指令操作測試:

zookeeper-shell 127.0.0.1:2181

接下來可以將TCP Port 2181建立對應的TCP Load Balancer(像是用ELB)。

認證

這個章節不太建議設定,因為太容易搞到自己。

如果需要讓ZooKeeper啟用認證,需要先建立對應的帳號與密碼字串(這個例子裡面是adminpassword):

java -cp "$(echo /usr/share/java/kafka/* | sed 's/ /:/g')" org.apache.zookeeper.server.auth.DigestAuthenticationProvider admin:password

會產生像是這樣的輸出,其中後面的那串值是重點:

admin:password->admin:bjkZ9W+M82HUZ9xb8/Oy4cmJGfg=

其中->後面是將密碼處理過的字串,直接放進設定內:

KAKFA_OPTS=-Dzookeeper.DigestAuthenticationProvider.superDigest=admin:bjkZ9W+M82HUZ9xb8/Oy4cmJGfg=

這樣就可以在zookeeper-shell裡面認證:

addauth digest admin:password

相關連結

參考文獻

外部連結