「Confluent」:修訂間差異
(未顯示同一使用者於中間所作的 89 次修訂) | |||
第1行: | 第1行: | ||
'''Confluent'''是一家公司,也是該公司 | '''Confluent'''是一家公司,同時也是該公司[[Apache Kafka]]產品線的產品名。 | ||
== 簡介 == | == 簡介 == | ||
Confluent是[[Apache Kafka]]發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌,提供了 | Confluent是[[Apache Kafka]]發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌,提供了眾多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天試用)。 | ||
== | == 硬體 == | ||
在PoC時我使用了三台<code>t3.small</code>(2 GB RAM加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題, | 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。 | ||
建議在正式環境下,ZooKeeper與Kafka分開跑,都使用<code>m5a.large</code>(8 GB RAM)或是更好的機器執行。 | |||
== 安裝 == | == 安裝 == | ||
當然要先裝[[Java]]: | 當然要先裝[[Java]],然後安裝社群版本,這邊因為官方還沒有提供<code>jammy</code>版本(Ubuntu 22.04),這邊手動寫成<code>focal</code>: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
sudo apt install -y default-jre | wget -qO - https://packages.confluent.io/deb/7.4/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null; echo -e "deb [arch=amd64 signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/deb/7.4 stable main\ndeb [signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/clients/deb focal main" | sudo tee /etc/apt/sources.list.d/confluent.list; sudo apt update; sudo apt install -y confluent-community-2.13 default-jre; sudo apt clean | ||
</syntaxhighlight> | </syntaxhighlight> | ||
這 | == ZooKeeper模式或是KRaft模式 == | ||
現在這個時間點比較尷尬,得看需求背景取捨。很粗略的來說,如果你是既有的應用要使用[[Kafka]],用ZooKeeper模式會比較保險;如果你是自己開發的軟體要使用Kafka,用KRaft模式有機會減少將來的技術債。 | |||
=== KRaft === | |||
{{Main|Confluent/KRaft}} | |||
KRaft模式是官方要取代ZooKeeper模式所提出的方案。 | |||
=== ZooKeeper === | |||
{{Main|Confluent/ZooKeeper}} | |||
{{Main|Confluent/Kafka}} | |||
ZooKeeper模式比較成熟,但官方已經宣布有計畫要淘汰掉ZooKeeper模式了。 | |||
== 基本操作 == | |||
依照模式的不同會有不同的參數設定: | |||
* ZooKeeper模式的操作會有指定任何一台ZooKeeper服務,像是<code>--zookeeper lb-zookeeper.example.com</code>這樣的參數。 | |||
* KRaft模式的操作則是指定任何一台Broker服務,像是<code>--bootstrap-server lb-broker.example.com</code>這樣的參數。 | |||
=== topic === | |||
topic的操作都是透過<code>kafka-topics</code>處理,使用<code>--zookeeper</code>指定ZooKeeper位置(可省略Port資訊,預設使用2181)。 | |||
建立是<code>--create</code>: | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 2 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
觀看是透過<code>--list</code>(簡易)或是<code>--describe</code>(詳細): | |||
<syntaxhighlight lang="bash"> | |||
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list | |||
</syntaxhighlight> | |||
<syntaxhighlight lang=" | <syntaxhighlight lang="bash"> | ||
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe | |||
</syntaxhighlight> | </syntaxhighlight> | ||
刪除topic則是透過<code>--delete</code>: | |||
<syntaxhighlight lang=" | <syntaxhighlight lang="bash"> | ||
1 | kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test | ||
</syntaxhighlight> | </syntaxhighlight> | ||
預設值只會標記(<code>MarkedForDeletion</code>),而非實際刪除: | |||
<pre> | |||
Topic test is marked for deletion. | |||
Note: This will have no impact if delete.topic.enable is not set to true. | |||
</pre> | |||
=== 訊息 === | |||
把目前的日期資訊傳到<code>test</code>裡: | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test | |||
</syntaxhighlight> | </syntaxhighlight> | ||
接收訊息,並且從頭開始收(<code>--from-beginning</code>): | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning --property print.timestamp=true | |||
</syntaxhighlight> | </syntaxhighlight> | ||
新版的會使用Broker的點而非ZooKeeper的點,這時候會使用<code>--bootstrap_server</code>加上Broker的位置,而非<code>--zookeeper</code>加上ZooKeeper的位置,像是這樣: | |||
<syntaxhighlight lang="bash"> | |||
kafka-console-consumer --bootstrap_server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning --property print.timestamp=true | |||
</syntaxhighlight> | |||
== 設 | == 額外設定Schema Registry == | ||
修改<code>/etc/schema-registry/schema-registry.properties</code>: | |||
<syntaxhighlight lang="ini"> | <syntaxhighlight lang="ini"> | ||
kafkastore.bootstrap.servers=PLAINTEXT://kafka-broker.srv.example.net:9092 | |||
metadata.encoder.secret=x | |||
host.name=kafka-mishmash-1.private.example.net | |||
</syntaxhighlight> | </syntaxhighlight> | ||
啟動Schema Registry: | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
sudo systemctl enable --now confluent-schema-registry | |||
broker. | sudo service confluent-schema-registry status | ||
</syntaxhighlight> | |||
== 額外設定Kafka Connect == | |||
系統預設會跑distributed mode,所以修改<code>/etc/kafka/connect-distributed.properties</code>: | |||
<syntaxhighlight lang="ini"> | |||
bootstrap.servers=kafka-broker.srv.example.net:9092 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
自己建立三個需要的topic,因為自動建立的會有問題(因為需要<code>--config cleanup.policy=compact</code>),這邊的<code>--replication-factor</code>與<code>--partitions</code>都是用Kafka Connect的預設值: | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-configs --create --replication-factor 3 --partitions 1 --config cleanup.policy=compact | |||
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-offsets --create --replication-factor 3 --partitions 25 --config cleanup.policy=compact | |||
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-status --create --replication-factor 3 --partitions 5 --config cleanup.policy=compact | |||
</syntaxhighlight> | </syntaxhighlight> | ||
另外Kafka Connect因為預設值的關係,log只能在<code>/var/log/syslog</code>裡面看到,所以另外修改<code>/etc/kafka/connect-log4j.properties</code>讓他輸出到<code>/var/log/kafka/connect.log</code>: | |||
<syntaxhighlight lang="ini"> | |||
log4j.rootLogger=INFO, file | |||
log4j.appender.file=org.apache.log4j.RollingFileAppender | |||
log4j.appender.file.File=/var/log/kafka/connect.log | |||
log4j.appender.file.layout=org.apache.log4j.PatternLayout | |||
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n | |||
log4j.appender.file.MaxFileSize=10MB | |||
log4j.appender.file.MaxBackupIndex=5 | |||
log4j.appender.file.append=true | |||
</syntaxhighlight> | |||
但還要修改目錄權限讓Kafka Connect可以寫進去: | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
sudo | sudo chmod g+w /var/log/kafka | ||
</syntaxhighlight> | </syntaxhighlight> | ||
第99行: | 第151行: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
sudo | sudo systemctl enable --now confluent-kafka-connect | ||
sudo service confluent-kafka-connect status | sudo service confluent-kafka-connect status | ||
</syntaxhighlight> | </syntaxhighlight> | ||
理論上就可以看到有安裝的plugin資訊了: | |||
<syntaxhighlight lang="bash"> | |||
curl -s -v http://10.1.2.3:8083/connector-plugins | jq . | |||
</syntaxhighlight> | |||
可以把plugin安裝到預設的<code>/usr/share/java</code>下後重新啟動,像是MongoDB的plugin<ref>{{Cite web |url=https://www.mongodb.com/docs/kafka-connector/current/introduction/install/ |title=Install the MongoDB Kafka Connector |language=en |accessdate=2023-09-20}}</ref>: | |||
<syntaxhighlight lang="bash"> | |||
cd /usr/share/java; sudo wget -c https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.11.0/mongo-kafka-connect-1.11.0-all.jar; sudo service confluent-kafka-connect restart | |||
</syntaxhighlight> | |||
然後建立一個JSON檔案: | |||
<syntaxhighlight lang="json"> | |||
{ | |||
"name": "source-mongoatlas-dev", | |||
"config": { | |||
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", | |||
"tasks.max": "1", | |||
"connection.uri": "mongodb+srv://readonly:x@foo.bar.mongodb.net", | |||
"topic.prefix": "mongoatlas_dev" | |||
} | |||
} | |||
</syntaxhighlight> | |||
這邊使用的<code>mongodb+srv://</code>可以先驗證: | |||
<syntaxhighlight lang="bash"> | |||
dig _mongodb._tcp.foo.bar.mongodb.net srv | |||
</syntaxhighlight> | |||
接著就對著任何一台Kafka Connect的API打: | |||
<syntaxhighlight lang="bash"> | |||
curl -X POST -H "Content-Type: application/json" --data @a.json http://10.1.2.3:8083/connectors | |||
</syntaxhighlight> | |||
後續就可以看到: | |||
<syntaxhighlight lang="bash"> | |||
curl -s -v http://10.1.2.3:8083/connectors | jq . | |||
</syntaxhighlight> | |||
另外如果有MongoDB有動靜,就會同步到Kafka上面,會自動建立topic(以<code>mongoatlas_dev</code>開頭): | |||
<syntaxhighlight lang="bash"> | |||
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list | |||
</syntaxhighlight> | |||
== 設定其他套件 == | |||
啟動Kafka REST Proxy: | 啟動Kafka REST Proxy: | ||
第115行: | 第219行: | ||
sudo service confluent-ksql start | sudo service confluent-ksql start | ||
sudo service confluent-ksql status | sudo service confluent-ksql status | ||
</syntaxhighlight> | |||
== 測試 == | |||
新版幾乎都是使用<code>--bootstrap-server</code>指定一台活著的broker位置當作起點,像是簡單列出topics: | |||
<syntaxhighlight lang="bash"> | |||
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list | |||
</syntaxhighlight> | |||
或是詳細列出: | |||
<syntaxhighlight lang="bash"> | |||
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --describe | |||
</syntaxhighlight> | </syntaxhighlight> | ||
第121行: | 第239行: | ||
* [[Apache Kafka]] | * [[Apache Kafka]] | ||
* [[Apache ZooKeeper]] | * [[Apache ZooKeeper]] | ||
== 參考資料 == | |||
{{Reflist|2}} | |||
== 外部連結 == | == 外部連結 == | ||
* {{Official|https://www.confluent.io/}} {{en}} | * {{Official|https://www.confluent.io/}} {{en}} | ||
* [https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.htmlManual Install using Systemd on Ubuntu and Debian] {{en}} | * [https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.htmlManual Install using Systemd on Ubuntu and Debian] {{en}} | ||
* [https://cwiki.apache.org/confluence/display/ZOOKEEPER/Client-Server+mutual+authentication Client-Server mutual authentication] {{en}} | |||
* [https://medium.com/@johny.urgiles/overcoming-zookeeper-acls-1b205cfdc301 Overcoming Zookeeper ACLs] {{en}} | |||
[[Category:軟體]] | [[Category:軟體]] |
於 2023年10月23日 (一) 15:48 的最新修訂
Confluent是一家公司,同時也是該公司Apache Kafka產品線的產品名。
簡介
Confluent是Apache Kafka發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌,提供了眾多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天試用)。
硬體
在PoC時我使用了三台t3.small
(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。
建議在正式環境下,ZooKeeper與Kafka分開跑,都使用m5a.large
(8 GB RAM)或是更好的機器執行。
安裝
當然要先裝Java,然後安裝社群版本,這邊因為官方還沒有提供jammy
版本(Ubuntu 22.04),這邊手動寫成focal
:
wget -qO - https://packages.confluent.io/deb/7.4/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null; echo -e "deb [arch=amd64 signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/deb/7.4 stable main\ndeb [signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/clients/deb focal main" | sudo tee /etc/apt/sources.list.d/confluent.list; sudo apt update; sudo apt install -y confluent-community-2.13 default-jre; sudo apt clean
ZooKeeper模式或是KRaft模式
現在這個時間點比較尷尬,得看需求背景取捨。很粗略的來說,如果你是既有的應用要使用Kafka,用ZooKeeper模式會比較保險;如果你是自己開發的軟體要使用Kafka,用KRaft模式有機會減少將來的技術債。
KRaft
KRaft模式是官方要取代ZooKeeper模式所提出的方案。
ZooKeeper
ZooKeeper模式比較成熟,但官方已經宣布有計畫要淘汰掉ZooKeeper模式了。
基本操作
依照模式的不同會有不同的參數設定:
- ZooKeeper模式的操作會有指定任何一台ZooKeeper服務,像是
--zookeeper lb-zookeeper.example.com
這樣的參數。 - KRaft模式的操作則是指定任何一台Broker服務,像是
--bootstrap-server lb-broker.example.com
這樣的參數。
topic
topic的操作都是透過kafka-topics
處理,使用--zookeeper
指定ZooKeeper位置(可省略Port資訊,預設使用2181)。
建立是--create
:
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 2
觀看是透過--list
(簡易)或是--describe
(詳細):
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
刪除topic則是透過--delete
:
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
預設值只會標記(MarkedForDeletion
),而非實際刪除:
Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
訊息
把目前的日期資訊傳到test
裡:
date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test
接收訊息,並且從頭開始收(--from-beginning
):
kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning --property print.timestamp=true
新版的會使用Broker的點而非ZooKeeper的點,這時候會使用--bootstrap_server
加上Broker的位置,而非--zookeeper
加上ZooKeeper的位置,像是這樣:
kafka-console-consumer --bootstrap_server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning --property print.timestamp=true
額外設定Schema Registry
修改/etc/schema-registry/schema-registry.properties
:
kafkastore.bootstrap.servers=PLAINTEXT://kafka-broker.srv.example.net:9092
metadata.encoder.secret=x
host.name=kafka-mishmash-1.private.example.net
啟動Schema Registry:
sudo systemctl enable --now confluent-schema-registry
sudo service confluent-schema-registry status
額外設定Kafka Connect
系統預設會跑distributed mode,所以修改/etc/kafka/connect-distributed.properties
:
bootstrap.servers=kafka-broker.srv.example.net:9092
自己建立三個需要的topic,因為自動建立的會有問題(因為需要--config cleanup.policy=compact
),這邊的--replication-factor
與--partitions
都是用Kafka Connect的預設值:
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-configs --create --replication-factor 3 --partitions 1 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-offsets --create --replication-factor 3 --partitions 25 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-status --create --replication-factor 3 --partitions 5 --config cleanup.policy=compact
另外Kafka Connect因為預設值的關係,log只能在/var/log/syslog
裡面看到,所以另外修改/etc/kafka/connect-log4j.properties
讓他輸出到/var/log/kafka/connect.log
:
log4j.rootLogger=INFO, file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/kafka/connect.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true
但還要修改目錄權限讓Kafka Connect可以寫進去:
sudo chmod g+w /var/log/kafka
啟動Kafka Connect:
sudo systemctl enable --now confluent-kafka-connect
sudo service confluent-kafka-connect status
理論上就可以看到有安裝的plugin資訊了:
curl -s -v http://10.1.2.3:8083/connector-plugins | jq .
可以把plugin安裝到預設的/usr/share/java
下後重新啟動,像是MongoDB的plugin[1]:
cd /usr/share/java; sudo wget -c https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.11.0/mongo-kafka-connect-1.11.0-all.jar; sudo service confluent-kafka-connect restart
然後建立一個JSON檔案:
{
"name": "source-mongoatlas-dev",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"connection.uri": "mongodb+srv://readonly:x@foo.bar.mongodb.net",
"topic.prefix": "mongoatlas_dev"
}
}
這邊使用的mongodb+srv://
可以先驗證:
dig _mongodb._tcp.foo.bar.mongodb.net srv
接著就對著任何一台Kafka Connect的API打:
curl -X POST -H "Content-Type: application/json" --data @a.json http://10.1.2.3:8083/connectors
後續就可以看到:
curl -s -v http://10.1.2.3:8083/connectors | jq .
另外如果有MongoDB有動靜,就會同步到Kafka上面,會自動建立topic(以mongoatlas_dev
開頭):
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list
設定其他套件
啟動Kafka REST Proxy:
sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status
啟動KSQL:
sudo service confluent-ksql start
sudo service confluent-ksql status
測試
新版幾乎都是使用--bootstrap-server
指定一台活著的broker位置當作起點,像是簡單列出topics:
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list
或是詳細列出:
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --describe
相關連結
參考資料
- ↑ Install the MongoDB Kafka Connector. [2023-09-20] (English).