Confluent/ZooKeeper

来自Gea-Suan Lin's Wiki
跳到导航 跳到搜索

Confluent/ZooKeeper指的是Confluent所包装的Apache ZooKeeper,里面有些路径与设定档的设法与官方版本不同。

防火墙

ZooKeeper的主机之间需要对开这些Port:

  • 2888/tcp
  • 3888/tcp

ZooKeeper的主机对外需要开这些Port:

  • 2181/tcp
  • 32181/tcp(JMX)

设定

修改/etc/kafka/zookeeper.properties,其中IP address需要填写对应的位置:

#
tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=10.1.1.1:2888:3888
server.2=10.2.2.2:2888:3888
server.3=10.3.3.3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24

新增/var/lib/zookeeper/myid,每一台都需要不同,123

1

然后修改档案拥有人:

sudo chown cp-kafka:confluent /var/lib/zookeeper/myid

目前的ZooKeeper(Confluent 2.11版内的ZooKeeper)预设值是使用512 MB的内存,但主机有7.5 GB的内存,所以会想要让ZooKeeper可以用7 GB,因此需要修改ZooKeeper的JVM参数。

另外是增加JMX的监控机制,使用Port 32181。

这两个需要新增/lib/systemd/system/confluent-zookeeper.service.d/30-options.conf(目录可能需要自己建立):

[Service]
Environment=JMX_PORT=32181
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dzookeeper.4lw.commands.whitelist=stat,ruok,conf,isro"

对应的指令:

sudo mkdir /lib/systemd/system/confluent-zookeeper.service.d
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dzookeeper.4lw.commands.whitelist=stat,ruok,conf,isro"' | sudo tee /lib/systemd/system/confluent-zookeeper.service.d/30-options.conf

最后修改/etc/kafka/log4j.properties,让log不要吃爆硬盘空间[1]

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Unspecified loggers and loggers with additivity=true output to server.log and stdout
# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
log4j.rootLogger=INFO, stdout, kafkaAppender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender.MaxFileSize=500MB
log4j.appender.kafkaAppender.MaxBackupIndex=5

log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stateChangeAppender.MaxFileSize=500MB
log4j.appender.stateChangeAppender.MaxBackupIndex=5

log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.requestAppender.MaxFileSize=500MB
log4j.appender.requestAppender.MaxBackupIndex=5

log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender.MaxFileSize=500MB
log4j.appender.cleanerAppender.MaxBackupIndex=5

log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender.MaxFileSize=500MB
log4j.appender.controllerAppender.MaxBackupIndex=5

log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.authorizerAppender.MaxFileSize=500MB
log4j.appender.authorizerAppender.MaxBackupIndex=5

# Change the two lines below to adjust ZK client logging
log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
log4j.logger.org.apache.zookeeper=INFO

# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

# Change to DEBUG or TRACE to enable request logging
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output
# related to the handling of requests
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

理论上就可以启动了:

sudo systemctl daemon-reload
sudo systemctl enable confluent-zookeeper
sudo service confluent-zookeeper start
sudo service confluent-zookeeper status

可以看输出的资讯判断系统状态,可以看ModeNode count资讯简单确认cluster的情况:

echo stat | nc 127.0.0.1 2181

或是直接透过指令操作测试:

zookeeper-shell 127.0.0.1:2181

接下来可以将TCP Port 2181建立对应的TCP Load Balancer(像是用ELB)。

认证

这个章节不太建议设定,因为太容易搞到自己。

如果需要让ZooKeeper启用认证,需要先建立对应的账号与密码字串(这个例子里面是adminpassword):

java -cp "$(echo /usr/share/java/kafka/* | sed 's/ /:/g')" org.apache.zookeeper.server.auth.DigestAuthenticationProvider admin:password

会产生像是这样的输出,其中后面的那串值是重点:

admin:password->admin:bjkZ9W+M82HUZ9xb8/Oy4cmJGfg=

其中->后面是将密码处理过的字串,直接放进设定内:

KAKFA_OPTS=-Dzookeeper.DigestAuthenticationProvider.superDigest=admin:bjkZ9W+M82HUZ9xb8/Oy4cmJGfg=

这样就可以在zookeeper-shell里面认证:

addauth digest admin:password

相关连结

参考文献

外部链接