「Confluent」:修訂間差異
(未顯示同一使用者於中間所作的 38 次修訂) | |||
行 1: | 行 1: | ||
'''Confluent'''是一家公司,也是該公司的[[Apache Kafka]]產品名。 | '''Confluent'''是一家公司,也是該公司的[[Apache Kafka]] 產品線的 產品名。 | ||
== 簡介 == | == 簡介 == | ||
Confluent是[[Apache Kafka]]發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌。 | Confluent是[[Apache Kafka]]發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌 ,提供了眾多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天試用) 。 | ||
== 硬體 == | |||
在PoC時我使用 了 三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的 套件 並且跑起來,一開始不會有問題 ,但 跑一陣子後會因為記憶體 不 足而異常。 | |||
建議在正式環境下ZooKeeper部分建議使 用<code>m5.large</code>(8 GB RAM ) 或是更好的機器執行 。 | |||
== 安裝 == | == 安裝 == | ||
當然要先裝[[Java]]: | 當然要先裝[[Java]] ,然後安裝社群版本 : | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
sudo apt install -y default-jre | sudo apt install -y default-jre | ||
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== 設定ZooKeeper == | |||
{{See also|Confluent/ZooKeeper}} | |||
< | == 設定Kafka == | ||
修改<code>/etc/kafka/server.properties</code>的<code>broker.id</code>設定,讓他自動產生而不需要自己指定: | |||
<syntaxhighlight lang="ini"> | |||
#broker.id=0 | |||
broker.id.generation.enable=true | |||
</syntaxhighlight> | </syntaxhighlight> | ||
修改<code>listeners</code>設定,雖然預 設 值會聽正確的介面,但因為這個資訊會被註冊到ZooKeeper裡面,所以需要註冊正確的介面資訊: | |||
<syntaxhighlight lang="ini"> | |||
#listeners=PLAINTEXT://:9092 | |||
listeners=PLAINTEXT://10.1.1.1:9092 | |||
</syntaxhighlight> | |||
修改<code>/etc/kafka/zookeeper. | 如果[[ZooKeeper]]與[[Kafka]]不同伺服器,需要 修改<code>/etc/kafka/server.properties</code>的<code>zookeeper.connect</code> 的值 , 像是這樣 : | ||
<syntaxhighlight lang="ini"> | <syntaxhighlight lang="ini"> | ||
# | #zookeeper.connect=localhost:2181 | ||
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
另外可以設定<code>/lib/systemd/system/confluent-kafka.service.d/30-options.conf</code> (目錄可能會 需要 自己建立),讓Kafka吃滿記憶體(這邊假設是8 GB的記憶體,保留1 GB給系統與其他情境使用),以及開啟JMX監控機制 , 跑在Port 32181 : | |||
<syntaxhighlight lang="ini"> | <syntaxhighlight lang="ini"> | ||
[Service] | |||
Environment=JMX_PORT=32181 | |||
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g" | |||
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" | |||
</syntaxhighlight> | |||
對應的指令: | |||
<syntaxhighlight lang="bash"> | |||
sudo mkdir /lib/systemd/system/confluent-kafka.service.d/ | |||
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf | |||
</syntaxhighlight> | </syntaxhighlight> | ||
設完 後 就可以讓systemd重讀設定後啟動 : | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
sudo | sudo systemctl daemon-reload | ||
sudo service confluent-kafka start | |||
sudo service confluent-kafka status | |||
</syntaxhighlight> | </syntaxhighlight> | ||
理 | == 基本操作 == | ||
=== topic === | |||
topic的操作都是透過<code>kafka-topics</code>處 理 ,其中在較新的版本是使用<code>--bootstrap-server</code>指定Broker位置當作接口( 可 省略Port資訊,預設使用Port 9092),在較舊版本則是使用<code>--zookeeper</code>指定ZooKeeper位置當作接口(可省略Port資訊,預設使用9092)。 | |||
建立是<code>--create</code> : | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1 | |||
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
觀看是透過<code>--list</code> ( 簡易)或 是<code>--describe</code>(詳細 ) : | |||
== | <syntaxhighlight lang="bash"> | ||
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --list | |||
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list | |||
</syntaxhighlight> | |||
<syntaxhighlight lang="bash"> | |||
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe | |||
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe | |||
</syntaxhighlight> | |||
刪除topic則是透過<code>--delete</code>: | |||
<syntaxhighlight lang=" | <syntaxhighlight lang="bash"> | ||
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test | |||
zookeeper | kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test | ||
</syntaxhighlight> | </syntaxhighlight> | ||
預設值只會標記(<code>MarkedForDeletion</code>),而非實際刪除: | |||
<pre> | |||
Topic test is marked for deletion. | |||
Note: This will have no impact if delete.topic.enable is not set to true. | |||
</pre> | |||
=== 訊息 === | |||
把目前 的 日期資訊傳到<code>test</code> 裡 : | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test | |||
</syntaxhighlight> | </syntaxhighlight> | ||
接收訊息,並且從頭開始收(<code>--from-beginning</code>) : | |||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
date | kafka-console-consumer --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== 設定其他套件 == | == 設定其他套件 == | ||
上面提到的是Kafka最低運作的設定,通常會安裝其他的套件提供服務。要注意其他的套件會需要額外的CPU與記憶體資源。 | |||
啟動Schema Registry: | 啟動Schema Registry: | ||
行 123: | 行 164: | ||
* {{Official|https://www.confluent.io/}} {{en}} | * {{Official|https://www.confluent.io/}} {{en}} | ||
* [https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.htmlManual Install using Systemd on Ubuntu and Debian] {{en}} | * [https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.htmlManual Install using Systemd on Ubuntu and Debian] {{en}} | ||
* [https://cwiki.apache.org/confluence/display/ZOOKEEPER/Client-Server+mutual+authentication Client-Server mutual authentication] {{en}} | |||
* [https://medium.com/@johny.urgiles/overcoming-zookeeper-acls-1b205cfdc301 Overcoming Zookeeper ACLs] {{en}} | |||
[[Category:軟體]] | [[Category:軟體]] |
於 2019年5月2日 (四) 07:42 的修訂
Confluent是一家公司,也是該公司的Apache Kafka產品線的產品名。
簡介
Confluent是Apache Kafka發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌,提供了眾多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天試用)。
硬體
在PoC時我使用了三台t3.small
(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。
建議在正式環境下ZooKeeper部分建議使用m5.large
(8 GB RAM)或是更好的機器執行。
安裝
當然要先裝Java,然後安裝社群版本:
sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11
設定ZooKeeper
設定Kafka
修改/etc/kafka/server.properties
的broker.id
設定,讓他自動產生而不需要自己指定:
#broker.id=0
broker.id.generation.enable=true
修改listeners
設定,雖然預設值會聽正確的介面,但因為這個資訊會被註冊到ZooKeeper裡面,所以需要註冊正確的介面資訊:
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://10.1.1.1:9092
如果ZooKeeper與Kafka不同伺服器,需要修改/etc/kafka/server.properties
的zookeeper.connect
的值,像是這樣:
#zookeeper.connect=localhost:2181
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181
另外可以設定/lib/systemd/system/confluent-kafka.service.d/30-options.conf
(目錄可能會需要自己建立),讓Kafka吃滿記憶體(這邊假設是8 GB的記憶體,保留1 GB給系統與其他情境使用),以及開啟JMX監控機制,跑在Port 32181:
[Service]
Environment=JMX_PORT=32181
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
對應的指令:
sudo mkdir /lib/systemd/system/confluent-kafka.service.d/
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf
設完後就可以讓systemd重讀設定後啟動:
sudo systemctl daemon-reload
sudo service confluent-kafka start
sudo service confluent-kafka status
基本操作
topic
topic的操作都是透過kafka-topics
處理,其中在較新的版本是使用--bootstrap-server
指定Broker位置當作接口(可省略Port資訊,預設使用Port 9092),在較舊版本則是使用--zookeeper
指定ZooKeeper位置當作接口(可省略Port資訊,預設使用9092)。
建立是--create
:
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
觀看是透過--list
(簡易)或是--describe
(詳細):
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
刪除topic則是透過--delete
:
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
預設值只會標記(MarkedForDeletion
),而非實際刪除:
Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
訊息
把目前的日期資訊傳到test
裡:
date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test
接收訊息,並且從頭開始收(--from-beginning
):
date | kafka-console-consumer --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning
設定其他套件
上面提到的是Kafka最低運作的設定,通常會安裝其他的套件提供服務。要注意其他的套件會需要額外的CPU與記憶體資源。
啟動Schema Registry:
sudo service confluent-schema-registry start
sudo service confluent-schema-registry status
啟動Kafka Connect:
sudo service confluent-kafka-connect start
sudo service confluent-kafka-connect status
啟動Kafka REST Proxy:
sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status
啟動KSQL:
sudo service confluent-ksql start
sudo service confluent-ksql status