Confluent
Confluent是一家公司,同时也是该公司Apache Kafka产品线的产品名。
简介
Confluent是Apache Kafka发明人出来开的公司,也是目前最知名的Kafka商业支援服务。同时Confluent也是该公司推出的软体品牌,提供了众多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天试用)。
硬体
在PoC时我使用了三台t3.small
(2 GB RAM,另外手动设定加上512 MB Swap),三台都安装完整的套件并且跑起来,一开始不会有问题,但跑一阵子后会因为记忆体不足而异常。
建议在正式环境下,ZooKeeper与Kafka分开跑,都使用m5a.large
(8 GB RAM)或是更好的机器执行。
安装
当然要先装Java,然后安装社群版本,这边因为官方还没有提供jammy
版本(Ubuntu 22.04),这边手动写成focal
:
sudo apt install -y default-jre; wget -qO - https://packages.confluent.io/deb/7.4/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null; echo -e "deb [arch=amd64 signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/deb/7.4 stable main\ndeb [signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/clients/deb focal main" | sudo tee /etc/apt/sources.list.d/confluent.list; sudo apt update; sudo apt install -y confluent-community-2.13; sudo apt clean
设定ZooKeeper或是KRaft
现在的版本可以跑ZooKeeper模式或是KRaft模式,二择一。
设定Kafka
基本操作
topic
topic的操作都是透过kafka-topics
处理,使用--zookeeper
指定ZooKeeper位置(可省略Port资讯,预设使用2181)。
建立是--create
:
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 2
观看是透过--list
(简易)或是--describe
(详细):
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
删除topic则是透过--delete
:
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
预设值只会标记(MarkedForDeletion
),而非实际删除:
Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
讯息
把目前的日期资讯传到test
里:
date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test
接收讯息,并且从头开始收(--from-beginning
):
kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning
新版的会使用Broker的点而非ZooKeeper的点,这时候会使用--bootstrap_server
加上Broker的位置,而非--zookeeper
加上ZooKeeper的位置,像是这样:
kafka-console-consumer --bootstrap_server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning
设定其他套件
上面提到的是Kafka最低运作的设定,通常会安装其他的套件提供服务。要注意其他的套件会需要额外的CPU与记忆体资源。
启动Schema Registry:
sudo service confluent-schema-registry start
sudo service confluent-schema-registry status
启动Kafka Connect:
sudo service confluent-kafka-connect start
sudo service confluent-kafka-connect status
启动Kafka REST Proxy:
sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status
启动KSQL:
sudo service confluent-ksql start
sudo service confluent-ksql status