Confluent
Confluent是一家公司,同时也是该公司Apache Kafka产品线的产品名。
简介
Confluent是Apache Kafka发明人出来开的公司,也是目前最知名的Kafka商业支援服务。同时Confluent也是该公司推出的软件品牌,提供了众多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天试用)。
硬件
在PoC时我使用了三台t3.small
(2 GB RAM,另外手动设定加上512 MB Swap),三台都安装完整的套件并且跑起来,一开始不会有问题,但跑一阵子后会因为内存不足而异常。
建议在正式环境下,ZooKeeper与Kafka分开跑,都使用m5a.large
(8 GB RAM)或是更好的机器执行。
安装
当然要先装Java,然后安装社群版本,这边因为官方还没有提供jammy
版本(Ubuntu 22.04),这边手动写成focal
:
wget -qO - https://packages.confluent.io/deb/7.4/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null; echo -e "deb [arch=amd64 signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/deb/7.4 stable main\ndeb [signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/clients/deb focal main" | sudo tee /etc/apt/sources.list.d/confluent.list; sudo apt update; sudo apt install -y confluent-community-2.13 default-jre; sudo apt clean
ZooKeeper模式或是KRaft模式
现在这个时间点比较尴尬,得看需求背景取舍。很粗略的来说,如果你是既有的应用要使用Kafka,用ZooKeeper模式会比较保险;如果你是自己开发的软件要使用Kafka,用KRaft模式有机会减少将来的技术债。
ZooKeeper
ZooKeeper模式比较成熟,软件的问题会比较少,但官方已经宣布有计划要淘汰掉ZooKeeper模式了。
KRaft
KRaft模式是官方要取代ZooKeeper模式所提出的方案,但目前还有很多限制与问题。
基本操作
依照模式的不同会有不同的参数设定:
- ZooKeeper模式的操作会有指定任何一台ZooKeeper服务,像是
--zookeeper lb-zookeeper.example.com
这样的参数。 - KRaft模式的操作则是指定任何一台Broker服务,像是
--bootstrap-server lb-broker.example.com
这样的参数。
topic
topic的操作都是透过kafka-topics
处理,使用--zookeeper
指定ZooKeeper位置(可省略Port资讯,预设使用2181)。
建立是--create
:
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 2
观看是透过--list
(简易)或是--describe
(详细):
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
删除topic则是透过--delete
:
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
预设值只会标记(MarkedForDeletion
),而非实际删除:
Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
讯息
把目前的日期资讯传到test
里:
date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test
接收讯息,并且从头开始收(--from-beginning
):
kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning
新版的会使用Broker的点而非ZooKeeper的点,这时候会使用--bootstrap_server
加上Broker的位置,而非--zookeeper
加上ZooKeeper的位置,像是这样:
kafka-console-consumer --bootstrap_server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning
额外设定Schema Registry
修改/etc/schema-registry/schema-registry.properties
:
kafkastore.bootstrap.servers=PLAINTEXT://kafka-broker.srv.example.net:9092
metadata.encoder.secret=x
host.name=kafka-mishmash-1.private.example.net
启动Schema Registry:
sudo systemctl enable --now confluent-schema-registry
sudo service confluent-schema-registry status
额外设定Kafka Connect
系统预设会跑distributed mode,所以修改/etc/kafka/connect-distributed.properties
:
bootstrap.servers=kafka-broker.srv.example.net:9092
自己建立三个需要的topic,因为自动建立的会有问题(因为需要--config cleanup.policy=compact
),这边的--replication-factor
与--partitions
都是用Kafka Connect的预设值:
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-configs --create --replication-factor 3 --partitions 1 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-offsets --create --replication-factor 3 --partitions 25 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-status --create --replication-factor 3 --partitions 5 --config cleanup.policy=compact
另外Kafka Connect因为预设值的关系,log只能在/var/log/syslog
里面看到,所以另外修改/etc/kafka/connect-log4j.properties
让他输出到/var/log/kafka/connect.log
:
log4j.rootLogger=INFO, file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/kafka/connect.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true
但还要修改目录权限让Kafka Connect可以写进去:
sudo chmod g+w /var/log/kafka
启动Kafka Connect:
sudo systemctl enable --now confluent-kafka-connect
sudo service confluent-kafka-connect status
理论上就可以看到有安装的plugin资讯了:
curl -s -v http://10.1.2.3:8083/connector-plugins | jq .
可以把plugin安装到预设的/usr/share/java
下后重新启动,像是MongoDB的plugin[1]:
cd /usr/share/java; sudo wget -c https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.11.0/mongo-kafka-connect-1.11.0-all.jar; sudo service confluent-kafka-connect restart
然后建立一个JSON档案:
{
"name": "source-mongoatlas-dev",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"connection.uri": "mongodb+srv://readonly:x@foo.bar.mongodb.net",
"topic.prefix": "mongoatlas_dev"
}
}
接着就对着任何一台Kafka Connect的API打:
curl -X POST -H "Content-Type: application/json" --data @a.json http://10.1.2.3:8083/connector
后续就可以看到:
curl -s -v http://10.1.2.3:8083/connector | jq .
另外如果有MongoDB有动静,就会同步到Kafka上面,会自动建立topic(以mongoatlas_dev
开头):
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list
设定其他套件
启动Kafka REST Proxy:
sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status
启动KSQL:
sudo service confluent-ksql start
sudo service confluent-ksql status
相关连结
参考资料
- ↑ Install the MongoDB Kafka Connector. [2023-09-20] (English).