Confluent:修订间差异

来自Gea-Suan Lin's Wiki
跳到导航 跳到搜索
此页面具有访问限制。如果您看见此消息,则说明您没有权限访问此页面。
 
(未显示同一用户的79个中间版本)
第1行: 第1行:
'''Confluent'''是一家公司,也是該公司 [[Apache Kafka]]產品名。
'''Confluent'''是一家公司, 同時 也是該公司[[Apache Kafka]] 產品線的 產品名。


== 簡介 ==
== 簡介 ==
第7行: 第7行:
== 硬體 ==
== 硬體 ==


 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。建議在正式環境下 建議 使用<code>m5.large</code>(8 GB RAM)或是更好的機器執行。
 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。
 
建議在正式環境下 ,ZooKeeper與Kafka分開跑,都 使用<code>m5a.large</code>(8 GB RAM)或是更好的機器執行。


== 安裝 ==
== 安裝 ==


 當然要先裝[[Java]]:
 當然要先裝[[Java]] ,然後安裝社群版本,這邊因為官方還沒有提供<code>jammy</code>版本(Ubuntu 22.04),這邊手動寫成<code>focal</code>


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/7.4/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null; echo -e "deb [arch=amd64 signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/deb/7.4 stable main\ndeb [signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/clients/deb focal main" | sudo tee /etc/apt/sources.list.d/confluent.list; sudo apt update; sudo apt install -y confluent-community-2.13 default-jre; sudo apt clean
</syntaxhighlight>
</syntaxhighlight>


 這 邊只安裝社群版 本:
== ZooKeeper模式或是KRaft模式 ==
 
  現在 個時間點比較尷尬,得看需求背景取捨。很粗略的來說,如果你是既有的應用要使用[[Kafka]],用ZooKeeper模式會比較保險;如果你是自己開發的軟體要使用Kafka,用KRaft模式有機會減少將來的技術債。
 
=== KRaft ===
 
{{Main|Confluent/KRaft}}
 
KRaft模式是官方要取代ZooKeeper模式所提出的方案。
 
=== ZooKeeper ===
 
{{Main|Confluent/ZooKeeper}}
{{Main|Confluent/Kafka}}
 
ZooKeeper模式比較成熟,但官方已經宣布有計畫要淘汰掉ZooKeeper模式了。
 
== 基 操作 ==
 
依照模式的不同會有不同的參數設定:
 
* ZooKeeper模式的操作會有指定任何一台ZooKeeper服務,像是<code>--zookeeper lb-zookeeper.example.com</code>這樣的參數。
* KRaft模式的操作則是指定任何一台Broker服務,像是<code>--bootstrap-server lb-broker.example.com</code>這樣的參數。
 
=== topic ===
 
topic的操作都是透過<code>kafka-topics</code>處理,使用<code>--zookeeper</code>指定ZooKeeper位置(可省略Port資訊,預設使用2181)。
 
建立是<code>--create</code>


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 2
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"
sudo apt install -y confluent-community-2.11
</syntaxhighlight>
</syntaxhighlight>


== 設定ZooKeeper ==
  觀看是透過<code>--list</code>(簡易)或是<code>--describe</code>(詳細):


修改<code>/etc/kafka/zookeeper.properties</code> ,其中IP address需要填寫對應的位置:
<syntaxhighlight lang="bash">
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
</syntaxhighlight>


<syntaxhighlight lang="ini">
<syntaxhighlight lang="bash">
#
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
tickTime=2000
</syntaxhighlight>
dataDir=/var/lib/zookeeper/
 
clientPort=2181
刪除topic則是透過<code>--delete</code>:
initLimit=5
 
syncLimit=2
<syntaxhighlight lang="bash">
server.1=1.2.3.4:2888:3888
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
server.2=5.6.7.8:2888:3888
</syntaxhighlight>
server.3=9.10.11.12:2888:3888
 
autopurge.snapRetainCount=3
預設值只會標記(<code>MarkedForDeletion</code>),而非實際刪除:
autopurge.purgeInterval=24
 
<pre>
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
</pre>
 
=== 訊息 ===
 
把目前的日期資訊傳到<code>test</code>裡:
 
<syntaxhighlight lang="bash">
date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test
</syntaxhighlight>
 
接收訊息,並且從頭開始收(<code>--from-beginning</code>):
 
<syntaxhighlight lang="bash">
kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning --property print.timestamp=true
</syntaxhighlight>
 
新版的會使用Broker的點而非ZooKeeper的點,這時候會使用<code>--bootstrap_server</code>加上Broker的位置,而非<code>--zookeeper</code>加上ZooKeeper的位置,像是這樣:
 
<syntaxhighlight lang="bash">
kafka-console-consumer --bootstrap_server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning --property print.timestamp=true
</syntaxhighlight>
</syntaxhighlight>


  新增<code>/var/lib/zookeeper/myid</code>,每一台都需要不同,<code>1</code>或<code>2</code>或<code>3</code>:
== 額外設定Schema Registry ==
 
修改<code>/etc/schema-registry/schema-registry.properties</code>:


<syntaxhighlight lang="ini">
<syntaxhighlight lang="ini">
1
kafkastore.bootstrap.servers=PLAINTEXT://kafka-broker.srv.example.net:9092
metadata.encoder.secret=x
host.name=kafka-mishmash-1.private.example.net
</syntaxhighlight>
</syntaxhighlight>


  然後修改檔案擁有人
  啟動Schema Registry


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo chown cp-kafka:confluent /var/lib/zookeeper/myid
sudo systemctl enable --now confluent-schema-registry
sudo service confluent-schema-registry status
</syntaxhighlight>
</syntaxhighlight>


  目前的ZooKeeper(Confluent 2.11版內的ZooKeeper) 預設 值是使用512 MB的記憶體,但主機有7.5 GB的記憶體 ,所以 會想要讓ZooKeeper可以用7 GB,因此需要 改ZooKeeper的JVM參數。這邊需要新增<code>/lib/systemd/system/confluent-zookeeper.service.d/30-options.conf</code> (目錄可能需要自己建立)
== 額外設定Kafka Connect ==
 
系統 預設 會跑distributed mode ,所以修 <code>/etc/kafka/connect-distributed.properties</code>:


<syntaxhighlight lang="ini">
<syntaxhighlight lang="ini">
[Service]
bootstrap.servers=kafka-broker.srv.example.net:9092
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
</syntaxhighlight>
</syntaxhighlight>


  理論上就可以啟
  自己建立三個需要的topic,因為自 建立的會有問題(因為需要<code>--config cleanup.policy=compact</code>),這邊的<code>--replication-factor</code>與<code>--partitions</code>都是用Kafka Connect的預設值


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo systemctl daemon-reload
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-configs --create --replication-factor 3 --partitions 1 --config cleanup.policy=compact
sudo service confluent-zookeeper start
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-offsets --create --replication-factor 3 --partitions 25 --config cleanup.policy=compact
sudo service confluent-zookeeper status
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-status --create --replication-factor 3 --partitions 5 --config cleanup.policy=compact
</syntaxhighlight>
</syntaxhighlight>


  輸出 的資訊判斷系統狀態
  另外Kafka Connect因為預設值的關係,log只能在<code>/var/log/syslog</code>裡面看到,所 另外修改<code>/etc/kafka/connect-log4j.properties</code>讓他 輸出 到<code>/var/log/kafka/connect.log</code>


<syntaxhighlight lang="bash">
<syntaxhighlight lang="ini">
echo stat | nc 127.0.0.1 2181
log4j.rootLogger=INFO, file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/kafka/connect.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true
</syntaxhighlight>
</syntaxhighlight>


  或是直接透過指令操作測試
  但還要修改目錄權限讓Kafka Connect可以寫進去


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
zookeeper-shell 127.0.0.1:2181
sudo chmod g+w /var/log/kafka
</syntaxhighlight>
</syntaxhighlight>


  接下來可以將TCP Port 2181建立對應的TCP Load Balancer(像是用ELB)。
  啟動Kafka Connect:


=== 認證 ===
<syntaxhighlight lang="bash">
sudo systemctl enable --now confluent-kafka-connect
sudo service confluent-kafka-connect status
</syntaxhighlight>


  如果需要讓ZooKeeper啟用認證,需要先建立對應的帳號與密碼字串(這個例子裡面是<code>admin</code>與<code>password</code>)
  理論上就可以看到有安裝的plugin資訊了


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
java -cp "$(echo /usr/share/java/kafka/* | sed 's/ /:/g')" org.apache.zookeeper.server.auth.DigestAuthenticationProvider admin:password
curl -s -v http://10.1.2.3:8083/connector-plugins | jq .
</syntaxhighlight>
</syntaxhighlight>


  會產生像是這樣 輸出,其中 面的那串值是
  可以把plugin安裝到預設 <code>/usr/share/java</code>下 後重 新啟動,像是MongoDB的plugin<ref>{{Cite web |url=https://www.mongodb.com/docs/kafka-connector/current/introduction/install/ |title=Install the MongoDB Kafka Connector |language=en |accessdate=2023-09-20}}</ref>


<syntaxhighlight lang="text">
<syntaxhighlight lang="bash">
admin:password->admin:bjkZ9W+M82HUZ9xb8/Oy4cmJGfg=
cd /usr/share/java; sudo wget -c https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.11.0/mongo-kafka-connect-1.11.0-all.jar; sudo service confluent-kafka-connect restart
</syntaxhighlight>
</syntaxhighlight>


== 設定Kafka ==
  然後建立一個JSON檔案
 
如果[[ZooKeeper]]與[[Kafka]]不同伺服器,需要修改<code>/etc/kafka/server.properties</code>的<code>zookeeper.connect</code>的值,像是這樣


<syntaxhighlight lang="ini">
<syntaxhighlight lang="json">
#zookeeper.connect=localhost:2181
{
zookeeper.connect=internal-test-gslin-confluent-xxxxxxxxx.us-east-1.elb.amazonaws.com:2181
   "name": "source-mongoatlas-dev",
   "config": {
     "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
     "tasks.max": "1",
     "connection.uri": "mongodb+srv://readonly:x@foo.bar.mongodb.net",
     "topic.prefix": "mongoatlas_dev"
   }
}
</syntaxhighlight>
</syntaxhighlight>


  修改<code>/etc/kafka/server.properties</code>的<code>broker.id</code> 設定,讓他自動產生而不需要自己指定
  這邊使用的<code>mongodb+srv://</code> 可以先驗證


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
#broker.id=0
dig _mongodb._tcp.foo.bar.mongodb.net srv
broker.id.generation.enable=true
</syntaxhighlight>
</syntaxhighlight>


  理論上 可以啟動了
  接著 對著任何一台Kafka Connect的API打


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo service confluent-confluent start
curl -X POST -H "Content-Type: application/json" --data @a.json http://10.1.2.3:8083/connectors
sudo service confluent-confluent status
</syntaxhighlight>
</syntaxhighlight>


== 設定其他套件 ==
  後續就可以看到
 
啟動Schema Registry


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo service confluent-schema-registry start
curl -s -v http://10.1.2.3:8083/connectors | jq .
sudo service confluent-schema-registry status
</syntaxhighlight>
</syntaxhighlight>


  啟動Kafka Connect
  另外如果有MongoDB有動靜,就會同步到Kafka上面,會自動建立topic(以<code>mongoatlas_dev</code>開頭)


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo service confluent-kafka-connect start
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list
sudo service confluent-kafka-connect status
</syntaxhighlight>
</syntaxhighlight>
== 設定其他套件 ==


 啟動Kafka REST Proxy:
 啟動Kafka REST Proxy:
第149行: 第219行:
sudo service confluent-ksql start
sudo service confluent-ksql start
sudo service confluent-ksql status
sudo service confluent-ksql status
</syntaxhighlight>
== 測試 ==
新版幾乎都是使用<code>--bootstrap-server</code>指定一台活著的broker位置當作起點,像是簡單列出topics:
<syntaxhighlight lang="bash">
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list
</syntaxhighlight>
或是詳細列出:
<syntaxhighlight lang="bash">
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --describe
</syntaxhighlight>
</syntaxhighlight>


第155行: 第239行:
* [[Apache Kafka]]
* [[Apache Kafka]]
* [[Apache ZooKeeper]]
* [[Apache ZooKeeper]]
== 參考資料 ==
{{Reflist|2}}


== 外部連結 ==
== 外部連結 ==
* {{Official|https://www.confluent.io/}} {{en}}
* {{Official|https://www.confluent.io/}} {{en}}
* [https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.htmlManual Install using Systemd on Ubuntu and Debian] {{en}}
* [https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.htmlManual Install using Systemd on Ubuntu and Debian] {{en}}
* [https://cwiki.apache.org/confluence/display/ZOOKEEPER/Client-Server+mutual+authentication Client-Server mutual authentication] {{en}}
* [https://medium.com/@johny.urgiles/overcoming-zookeeper-acls-1b205cfdc301 Overcoming Zookeeper ACLs] {{en}}
* [https://medium.com/@johny.urgiles/overcoming-zookeeper-acls-1b205cfdc301 Overcoming Zookeeper ACLs] {{en}}


[[Category:軟體]]
[[Category:軟體]]

2023年10月23日 (一) 15:48的最新版本

Confluent是一家公司,同时也是该公司Apache Kafka产品线的产品名。

简介

Confluent是Apache Kafka发明人出来开的公司,也是目前最知名的Kafka商业支援服务。同时Confluent也是该公司推出的软件品牌,提供了众多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天试用)。

硬件

在PoC时我使用了三台t3.small(2 GB RAM,另外手动设定加上512 MB Swap),三台都安装完整的套件并且跑起来,一开始不会有问题,但跑一阵子后会因为内存不足而异常。

建议在正式环境下,ZooKeeper与Kafka分开跑,都使用m5a.large(8 GB RAM)或是更好的机器执行。

安装

当然要先装Java,然后安装社群版本,这边因为官方还没有提供jammy版本(Ubuntu 22.04),这边手动写成focal

wget -qO - https://packages.confluent.io/deb/7.4/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null; echo -e "deb [arch=amd64 signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/deb/7.4 stable main\ndeb [signed-by=/etc/apt/keyrings/confluent.gpg] https://packages.confluent.io/clients/deb focal main" | sudo tee /etc/apt/sources.list.d/confluent.list; sudo apt update; sudo apt install -y confluent-community-2.13 default-jre; sudo apt clean

ZooKeeper模式或是KRaft模式

现在这个时间点比较尴尬,得看需求背景取舍。很粗略的来说,如果你是既有的应用要使用Kafka,用ZooKeeper模式会比较保险;如果你是自己开发的软件要使用Kafka,用KRaft模式有机会减少将来的技术债。

KRaft

KRaft模式是官方要取代ZooKeeper模式所提出的方案。

ZooKeeper

ZooKeeper模式比较成熟,但官方已经宣布有计划要淘汰掉ZooKeeper模式了。

基本操作

依照模式的不同会有不同的参数设定:

  • ZooKeeper模式的操作会有指定任何一台ZooKeeper服务,像是--zookeeper lb-zookeeper.example.com这样的参数。
  • KRaft模式的操作则是指定任何一台Broker服务,像是--bootstrap-server lb-broker.example.com这样的参数。

topic

topic的操作都是透过kafka-topics处理,使用--zookeeper指定ZooKeeper位置(可省略Port资讯,预设使用2181)。

建立是--create

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 2

观看是透过--list(简易)或是--describe(详细):

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe

删除topic则是透过--delete

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test

预设值只会标记(MarkedForDeletion),而非实际删除:

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

讯息

把目前的日期资讯传到test里:

date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test

接收讯息,并且从头开始收(--from-beginning):

kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning --property print.timestamp=true

新版的会使用Broker的点而非ZooKeeper的点,这时候会使用--bootstrap_server加上Broker的位置,而非--zookeeper加上ZooKeeper的位置,像是这样:

kafka-console-consumer --bootstrap_server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning --property print.timestamp=true

额外设定Schema Registry

修改/etc/schema-registry/schema-registry.properties

kafkastore.bootstrap.servers=PLAINTEXT://kafka-broker.srv.example.net:9092
metadata.encoder.secret=x
host.name=kafka-mishmash-1.private.example.net

启动Schema Registry:

sudo systemctl enable --now confluent-schema-registry
sudo service confluent-schema-registry status

额外设定Kafka Connect

系统预设会跑distributed mode,所以修改/etc/kafka/connect-distributed.properties

bootstrap.servers=kafka-broker.srv.example.net:9092

自己建立三个需要的topic,因为自动建立的会有问题(因为需要--config cleanup.policy=compact),这边的--replication-factor--partitions都是用Kafka Connect的预设值:

kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-configs --create --replication-factor 3 --partitions 1 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-offsets --create --replication-factor 3 --partitions 25 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --topic connect-status --create --replication-factor 3 --partitions 5 --config cleanup.policy=compact

另外Kafka Connect因为预设值的关系,log只能在/var/log/syslog里面看到,所以另外修改/etc/kafka/connect-log4j.properties让他输出到/var/log/kafka/connect.log

log4j.rootLogger=INFO, file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/kafka/connect.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

但还要修改目录权限让Kafka Connect可以写进去:

sudo chmod g+w /var/log/kafka

启动Kafka Connect:

sudo systemctl enable --now confluent-kafka-connect
sudo service confluent-kafka-connect status

理论上就可以看到有安装的plugin资讯了:

curl -s -v http://10.1.2.3:8083/connector-plugins | jq .

可以把plugin安装到预设的/usr/share/java下后重新启动,像是MongoDB的plugin[1]

cd /usr/share/java; sudo wget -c https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.11.0/mongo-kafka-connect-1.11.0-all.jar; sudo service confluent-kafka-connect restart

然后建立一个JSON档案:

{
    "name": "source-mongoatlas-dev",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "tasks.max": "1",
        "connection.uri": "mongodb+srv://readonly:x@foo.bar.mongodb.net",
        "topic.prefix": "mongoatlas_dev"
    }
}

这边使用的mongodb+srv://可以先验证:

dig _mongodb._tcp.foo.bar.mongodb.net srv

接着就对着任何一台Kafka Connect的API打:

curl -X POST -H "Content-Type: application/json" --data @a.json http://10.1.2.3:8083/connectors

后续就可以看到:

curl -s -v http://10.1.2.3:8083/connectors | jq .

另外如果有MongoDB有动静,就会同步到Kafka上面,会自动建立topic(以mongoatlas_dev开头):

kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list

设定其他套件

启动Kafka REST Proxy:

sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status

启动KSQL:

sudo service confluent-ksql start
sudo service confluent-ksql status

测试

新版几乎都是使用--bootstrap-server指定一台活着的broker位置当作起点,像是简单列出topics:

kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --list

或是详细列出:

kafka-topics --bootstrap-server kafka-broker.srv.example.net:9092 --describe

相关连结

参考资料

  1. Install the MongoDB Kafka Connector. [2023-09-20] (English). 

外部链接