「Confluent」:修訂間差異

出自Gea-Suan Lin's Wiki
跳至導覽 跳至搜尋
本頁面具有訪問限制。如果您看見此訊息,這代表您沒有訪問本頁面的權限。
(未顯示同一使用者於中間所作的 24 次修訂)
行 1: 行 1:
'''Confluent'''是一家公司,也是該公司的[[Apache Kafka]]產品名。
'''Confluent'''是一家公司,也是該公司的[[Apache Kafka]] 產品線的 產品名。


== 簡介 ==
== 簡介 ==
行 7: 行 7:
== 硬體 ==
== 硬體 ==


 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。建議在正式環境 建議使用<code>m5.large</code>(8 GB RAM)或是更好的機器執行。
 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。
 
建議在正式環境 下ZooKeeper部分 建議使用<code>m5.large</code>(8 GB RAM)或是更好的機器執行。


== 安裝 ==
== 安裝 ==


 當然要先裝[[Java]]:
 當然要先裝[[Java]] ,然後安裝社群版本


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo apt install -y default-jre
sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11
</syntaxhighlight>
</syntaxhighlight>


  這邊只安裝社群版本:
== 設定ZooKeeper ==


<syntaxhighlight lang="bash">
{{See also|Confluent/ZooKeeper}}
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"
sudo apt install -y confluent-community-2.11
</syntaxhighlight>


== 設 定ZooKeeper ==
== 設 定Kafka ==


 修改<code>/etc/kafka/zookeeper.properties</code>, 其中IP address 需要 填寫對應的位置
 修改<code>/etc/kafka/server.properties</code> 的<code>broker.id</code>設定 讓他自動產生而不 需要 自己指定


<syntaxhighlight lang="ini">
<syntaxhighlight lang="ini">
#
#broker.id=0
tickTime=2000
broker.id.generation.enable=true
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=1.2.3.4:2888:3888
server.2=5.6.7.8:2888:3888
server.3=9.10.11.12:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
</syntaxhighlight>
</syntaxhighlight>


  新增<code>/var/lib/zookeeper/myid</code>, 每一台都 需要 不同,<code>1</code>或<code>2</code>或<code>3</code>
  修改<code>listeners</code> 設定 雖然預設值會聽正確的介面,但因為這個資訊會被註冊到ZooKeeper裡面,所以 需要 註冊正確的介面資訊


<syntaxhighlight lang="ini">
<syntaxhighlight lang="ini">
1
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://10.1.1.1:9092
</syntaxhighlight>
</syntaxhighlight>


  然後 修改 檔案擁有人
  如果[[ZooKeeper]]與[[Kafka]]不同伺服器,需要 修改<code>/etc/kafka/server.properties</code>的<code>zookeeper.connect</code>的值,像是這樣


<syntaxhighlight lang="bash">
<syntaxhighlight lang="ini">
sudo chown cp-kafka:confluent /var/lib/zookeeper/myid
#zookeeper.connect=localhost:2181
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181
</syntaxhighlight>
</syntaxhighlight>


  目前的ZooKeeper(Confluent 2.11版內的ZooKeeper)預設值是使用512 MB的記憶體,但主機有7.5 GB的記憶體,所以會想要讓ZooKeeper 可以 用7 GB,因此需要修改ZooKeeper的JVM參數。這邊需要新增<code>/lib/systemd/system/confluent-zookeeper.service.d/30-options.conf</code>(目錄可能需要自己建立):
  另外 可以 設定<code>/lib/systemd/system/confluent-kafka.service.d/30-options.conf</code>(目錄可能 需要自己建立) ,讓Kafka吃滿記憶體(這邊假設是8 GB的記憶體,保留1 GB給系統與其他情境使用),以及開啟JMX監控機制,跑在Port 32181


<syntaxhighlight lang="ini">
<syntaxhighlight lang="ini">
[Service]
[Service]
Environment=JMX_PORT=32181
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
</syntaxhighlight>
</syntaxhighlight>


  理論上就可以啟動了
  對應的指令


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo systemctl daemon-reload
sudo mkdir /lib/systemd/system/confluent-kafka.service.d/
sudo service confluent-zookeeper start
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf
sudo service confluent-zookeeper status
</syntaxhighlight>
</syntaxhighlight>


 可以 看輸出的資訊判斷系統狀態
  設完後就 可以 讓systemd重讀設定後啟動


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
echo stat | nc 127.0.0.1 2181
sudo systemctl daemon-reload
sudo service confluent-kafka start
sudo service confluent-kafka status
</syntaxhighlight>
</syntaxhighlight>


  或是直接透過指令 操作 測試:
== 基本 操作 ==


<syntaxhighlight lang="bash">
=== topic ===
zookeeper-shell 127.0.0.1:2181
</syntaxhighlight>
 
接下來可以將TCP Port 2181建立對應的TCP Load Balancer(像是用ELB)。


=== 認證 ===
topic的操作都是透過<code>kafka-topics</code>處理,其中在較新的版本是使用<code>--bootstrap-server</code>指定Broker位置當作接口(可省略Port資訊,預設使用Port 9092),在較舊版本則是使用<code>--zookeeper</code>指定ZooKeeper位置當作接口(可省略Port資訊,預設使用9092)。


  如果需要讓ZooKeeper啟用認證,需要先 建立 對應的帳號與密碼字串(這個例子裡面 是<code>admin</code> 與<code>password</code>)
 建立是<code>--create</code>:


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
java -cp "$(echo /usr/share/java/kafka/* | sed 's/ /:/g')" org.apache.zookeeper.server.auth.DigestAuthenticationProvider admin:password
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
</syntaxhighlight>
</syntaxhighlight>


  會產生像 這樣的輸出,其中後面的那串值 重點
  觀看 透過<code>--list</code>(簡易)或 <code>--describe</code>(詳細)


<syntaxhighlight lang="text">
<syntaxhighlight lang="bash">
admin:password->admin:bjkZ9W+M82HUZ9xb8/Oy4cmJGfg=
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
</syntaxhighlight>
</syntaxhighlight>
然後直接放進設定內:


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
KAKFA_OPTS=-Dzookeeper.DigestAuthenticationProvider.superDigest=admin:bjkZ9W+M82HUZ9xb8/Oy4cmJGfg=
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
</syntaxhighlight>
</syntaxhighlight>


  這樣就可以在<code>zookeeper-shell</code> 裡面認證
  刪除topic則是透過<code>--delete</code>:


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
addauth digest admin:password
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
</syntaxhighlight>
</syntaxhighlight>


== 定Kafka ==
  值只會標記(<code>MarkedForDeletion</code>),而非實際刪除:


如果[[ZooKeeper]]與[[Kafka]]不同伺服器,需要修改<code>/etc/kafka/server.properties</code>的<code>zookeeper.connect</code> 的值,像是這樣:
<pre>
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
</pre>


<syntaxhighlight lang="ini">
=== 訊息 ===
#zookeeper.connect=localhost:2181
zookeeper.connect=internal-test-gslin-confluent-xxxxxxxxx.us-east-1.elb.amazonaws.com:2181
</syntaxhighlight>


  修改<code>/etc/kafka/server.properties</code> 的<code>broker.id</code> 設定,讓他自動產生而不需要自己指定
  把目前 日期資訊傳到<code>test</code>


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
#broker.id=0
date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test
broker.id.generation.enable=true
</syntaxhighlight>
</syntaxhighlight>


  理論上就可以啟動了
  接收訊息,並且從頭開始收(<code>--from-beginning</code>)


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo service confluent-confluent start
date | kafka-console-consumer --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning
sudo service confluent-confluent status
</syntaxhighlight>
</syntaxhighlight>


== 設定其他套件 ==
== 設定其他套件 ==
上面提到的是Kafka最低運作的設定,通常會安裝其他的套件提供服務。要注意其他的套件會需要額外的CPU與記憶體資源。


 啟動Schema Registry:
 啟動Schema Registry:

於 2019年5月2日 (四) 07:42 的修訂

Confluent是一家公司,也是該公司的Apache Kafka產品線的產品名。

簡介

Confluent是Apache Kafka發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌,提供了眾多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天試用)。

硬體

在PoC時我使用了三台t3.small(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。

建議在正式環境下ZooKeeper部分建議使用m5.large(8 GB RAM)或是更好的機器執行。

安裝

當然要先裝Java,然後安裝社群版本:

sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11

設定ZooKeeper

設定Kafka

修改/etc/kafka/server.propertiesbroker.id設定,讓他自動產生而不需要自己指定:

#broker.id=0
broker.id.generation.enable=true

修改listeners設定,雖然預設值會聽正確的介面,但因為這個資訊會被註冊到ZooKeeper裡面,所以需要註冊正確的介面資訊:

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://10.1.1.1:9092

如果ZooKeeperKafka不同伺服器,需要修改/etc/kafka/server.propertieszookeeper.connect的值,像是這樣:

#zookeeper.connect=localhost:2181
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181

另外可以設定/lib/systemd/system/confluent-kafka.service.d/30-options.conf(目錄可能會需要自己建立),讓Kafka吃滿記憶體(這邊假設是8 GB的記憶體,保留1 GB給系統與其他情境使用),以及開啟JMX監控機制,跑在Port 32181:

[Service]
Environment=JMX_PORT=32181
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

對應的指令:

sudo mkdir /lib/systemd/system/confluent-kafka.service.d/
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf

設完後就可以讓systemd重讀設定後啟動:

sudo systemctl daemon-reload
sudo service confluent-kafka start
sudo service confluent-kafka status

基本操作

topic

topic的操作都是透過kafka-topics處理,其中在較新的版本是使用--bootstrap-server指定Broker位置當作接口(可省略Port資訊,預設使用Port 9092),在較舊版本則是使用--zookeeper指定ZooKeeper位置當作接口(可省略Port資訊,預設使用9092)。

建立是--create

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1

觀看是透過--list(簡易)或是--describe(詳細):

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe

刪除topic則是透過--delete

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test

預設值只會標記(MarkedForDeletion),而非實際刪除:

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

訊息

把目前的日期資訊傳到test裡:

date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test

接收訊息,並且從頭開始收(--from-beginning):

date | kafka-console-consumer --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning

設定其他套件

上面提到的是Kafka最低運作的設定,通常會安裝其他的套件提供服務。要注意其他的套件會需要額外的CPU與記憶體資源。

啟動Schema Registry:

sudo service confluent-schema-registry start
sudo service confluent-schema-registry status

啟動Kafka Connect:

sudo service confluent-kafka-connect start
sudo service confluent-kafka-connect status

啟動Kafka REST Proxy:

sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status

啟動KSQL:

sudo service confluent-ksql start
sudo service confluent-ksql status

相關連結

外部連結