Confluent

出自Gea-Suan Lin's Wiki
跳至導覽 跳至搜尋
可列印版不再被支援且可能有渲染錯誤。請更新您的瀏覽器書籤並改用瀏覽器預設的列印功能。

Confluent是一家公司,同時也是該公司Apache Kafka產品線的產品名。

簡介

Confluent是Apache Kafka發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌,提供了眾多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天試用)。

硬體

在PoC時我使用了三台t3.small(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。

建議在正式環境下,ZooKeeper與Kafka分開跑,都使用m5a.large(8 GB RAM)或是更好的機器執行。

安裝

當然要先裝Java,然後安裝社群版本,這邊因為官方還沒有提供jammy版本(Ubuntu 22.04),這邊手動寫成focal

wget -qO - https://packages.confluent.io/deb/7.4/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null; echo -e "deb [arch=amd64 signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/deb/7.4 stable main\ndeb [signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/clients/deb focal main" | sudo tee /etc/apt/sources.list.d/confluent.list; sudo apt update; sudo apt install -y confluent-community-2.13 default-jre; sudo apt clean

ZooKeeper模式或是KRaft模式

現在這個時間點比較尷尬,得看需求背景取捨。很粗略的來說,如果你是既有的應用要使用Kafka,用ZooKeeper模式會比較保險;如果你是自己開發的軟體要使用Kafka,用KRaft模式有機會減少將來的技術債。

KRaft

KRaft模式是官方要取代ZooKeeper模式所提出的方案。

ZooKeeper

ZooKeeper模式比較成熟,但官方已經宣布有計畫要淘汰掉ZooKeeper模式了。

基本操作

依照模式的不同會有不同的參數設定:

  • ZooKeeper模式的操作會有指定任何一台ZooKeeper服務,像是--zookeeper lb-zookeeper.example.com這樣的參數。
  • KRaft模式的操作則是指定任何一台Broker服務,像是--bootstrap-server lb-broker.example.com這樣的參數。

topic

topic的操作都是透過kafka-topics處理,使用--zookeeper指定ZooKeeper位置(可省略Port資訊,預設使用2181)。

建立是--create

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 2

觀看是透過--list(簡易)或是--describe(詳細):

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe

刪除topic則是透過--delete

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test

預設值只會標記(MarkedForDeletion),而非實際刪除:

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

訊息

把目前的日期資訊傳到test裡:

date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test

接收訊息,並且從頭開始收(--from-beginning):

kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning --property print.timestamp=true

新版的會使用Broker的點而非ZooKeeper的點,這時候會使用--bootstrap_server加上Broker的位置,而非--zookeeper加上ZooKeeper的位置,像是這樣:

kafka-console-consumer --bootstrap_server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning --property print.timestamp=true

額外設定Schema Registry

修改/etc/schema-registry/schema-registry.properties

kafkastore.bootstrap.servers=PLAINTEXT://kafka-broker.srv.example.net:9092
metadata.encoder.secret=x
host.name=kafka-mishmash-1.private.example.net

啟動Schema Registry:

sudo systemctl enable --now confluent-schema-registry
sudo service confluent-schema-registry status

額外設定Kafka Connect

系統預設會跑distributed mode,所以修改/etc/kafka/connect-distributed.properties

bootstrap.servers=kafka-broker.srv.example.net:9092

自己建立三個需要的topic,因為自動建立的會有問題(因為需要--config cleanup.policy=compact),這邊的--replication-factor--partitions都是用Kafka Connect的預設值:

kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-configs --create --replication-factor 3 --partitions 1 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-offsets --create --replication-factor 3 --partitions 25 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-status --create --replication-factor 3 --partitions 5 --config cleanup.policy=compact

另外Kafka Connect因為預設值的關係,log只能在/var/log/syslog裡面看到,所以另外修改/etc/kafka/connect-log4j.properties讓他輸出到/var/log/kafka/connect.log

log4j.rootLogger=INFO, file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/kafka/connect.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

但還要修改目錄權限讓Kafka Connect可以寫進去:

sudo chmod g+w /var/log/kafka

啟動Kafka Connect:

sudo systemctl enable --now confluent-kafka-connect
sudo service confluent-kafka-connect status

理論上就可以看到有安裝的plugin資訊了:

curl -s -v http://10.1.2.3:8083/connector-plugins | jq .

可以把plugin安裝到預設的/usr/share/java下後重新啟動,像是MongoDB的plugin[1]

cd /usr/share/java; sudo wget -c https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.11.0/mongo-kafka-connect-1.11.0-all.jar; sudo service confluent-kafka-connect restart

然後建立一個JSON檔案:

{
    "name": "source-mongoatlas-dev",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "tasks.max": "1",
        "connection.uri": "mongodb+srv://readonly:x@foo.bar.mongodb.net",
        "topic.prefix": "mongoatlas_dev"
    }
}

這邊使用的mongodb+srv://可以先驗證:

dig _mongodb._tcp.foo.bar.mongodb.net srv

接著就對著任何一台Kafka Connect的API打:

curl -X POST -H "Content-Type: application/json" --data @a.json http://10.1.2.3:8083/connectors

後續就可以看到:

curl -s -v http://10.1.2.3:8083/connectors | jq .

另外如果有MongoDB有動靜,就會同步到Kafka上面,會自動建立topic(以mongoatlas_dev開頭):

kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list

設定其他套件

啟動Kafka REST Proxy:

sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status

啟動KSQL:

sudo service confluent-ksql start
sudo service confluent-ksql status

測試

新版幾乎都是使用--bootstrap-server指定一台活著的broker位置當作起點,像是簡單列出topics:

kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list

或是詳細列出:

kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --describe

相關連結

參考資料

  1. Install the MongoDB Kafka Connector. [2023-09-20] (English). 

外部連結