“Confluent”的版本间的差异

来自Gea-Suan Lin's Wiki
跳到导航 跳到搜索
硬體
設定Kafka
 
(未显示同一用户的34个中间版本)
第1行: 第1行:
'''Confluent'''是一家公司,也是該公司的[[Apache Kafka]]產品名。
+
'''Confluent'''是一家公司,也是該公司的[[Apache Kafka]] 產品線的 產品名。
  
 
== 簡介 ==
 
== 簡介 ==
  
Confluent是[[Apache Kafka]]發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌,提供了 很多Open Source套件管理Kafka Cluster(即社群版本), 但不 包括Control Center( 提供三十天試用)。
+
Confluent是[[Apache Kafka]]發明人出來開的公司,也是目前最知名的Kafka商業支援服務。同時Confluent也是該公司推出的軟體品牌,提供了 眾多的Open Source套件 用以 管理Kafka Cluster(即社群版本 元件 ), 另外提供商用版本, 包括Control Center(提供三十天試用)。
  
 
== 硬體 ==
 
== 硬體 ==
  
 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題, 但跑一陣子後會因為記憶體不足而 開始 異常。建議在正式環境 建議使用<code>m5.large</code>(8 GB RAM)或是更好的機器執行。
+
 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。
 +
 
 +
建議在正式環境 下ZooKeeper部分 建議使用<code>m5.large</code>(8 GB RAM)或是更好的機器執行。
  
 
== 安裝 ==
 
== 安裝 ==
  
 當然要先裝[[Java]]:
+
 當然要先裝[[Java]] ,然後安裝社群版本
  
 
<syntaxhighlight lang="bash">
 
<syntaxhighlight lang="bash">
 
sudo apt install -y default-jre
 
sudo apt install -y default-jre
 +
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11
 
</syntaxhighlight>
 
</syntaxhighlight>
  
  這邊只安裝社群版本:
+
== 設定ZooKeeper ==
 +
 
 +
{{See also|Confluent/ZooKeeper}}
 +
 
 +
== 設定Kafka ==
  
<syntaxhighlight lang="bash">
+
修改<code>/etc/kafka/server.properties</code>的<code>broker.id</code>設定,讓他自動產生而不需要自己指定:
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -
+
 
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"
+
<syntaxhighlight lang="ini">
sudo apt install -y confluent-community-2.11
+
#broker.id=0
 +
broker.id.generation.enable=true
 
</syntaxhighlight>
 
</syntaxhighlight>
  
== 定ZooKeeper ==
+
  修改<code>listeners</code>設定,雖然預 值會聽正確的介面,但因為這個資訊會被註冊到ZooKeeper裡面,所以需要註冊正確的介面資訊:
 +
 
 +
<syntaxhighlight lang="ini">
 +
#listeners=PLAINTEXT://:9092
 +
listeners=PLAINTEXT://10.1.1.1:9092
 +
</syntaxhighlight>
  
 修改<code>/etc/kafka/zookeeper.properties</code>, 其中IP address需要填寫對應的位置
+
  如果[[ZooKeeper]]與[[Kafka]]不同伺服器,需要 修改<code>/etc/kafka/server.properties</code>的<code>zookeeper.connect</code> 的值 像是這樣
  
 
<syntaxhighlight lang="ini">
 
<syntaxhighlight lang="ini">
#
+
#zookeeper.connect=localhost:2181
tickTime=2000
+
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181
dataDir=/var/lib/zookeeper/
 
clientPort=2181
 
initLimit=5
 
syncLimit=2
 
server.1=1.2.3.4:2888:3888
 
server.2=5.6.7.8:2888:3888
 
server.3=9.10.11.12:2888:3888
 
autopurge.snapRetainCount=3
 
autopurge.purgeInterval=24
 
 
</syntaxhighlight>
 
</syntaxhighlight>
  
  新增<code>/var/lib/zookeeper/myid</code> ,每一台都 需要 不同 <code>1</code>或<code>2</code>或<code>3</code>
+
  另外可以設定<code>/lib/systemd/system/confluent-kafka.service.d/30-options.conf</code> (目錄可能會 需要 自己建立),讓Kafka吃滿記憶體(這邊假設是8 GB的記憶體,保留1 GB給系統與其他情境使用),以及開啟JMX監控機制 跑在Port 32181
  
 
<syntaxhighlight lang="ini">
 
<syntaxhighlight lang="ini">
1
+
[Service]
 +
Environment=JMX_PORT=32181
 +
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
 +
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
 
</syntaxhighlight>
 
</syntaxhighlight>
  
  然後修改檔案擁有人
+
  對應的指令
  
 
<syntaxhighlight lang="bash">
 
<syntaxhighlight lang="bash">
sudo chown cp-kafka:confluent /var/lib/zookeeper/myid
+
sudo mkdir /lib/systemd/system/confluent-kafka.service.d/
 +
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf
 
</syntaxhighlight>
 
</syntaxhighlight>
  
  理論上 就可以啟動
+
  設完後 就可以 讓systemd重讀設定後 啟動:
  
 
<syntaxhighlight lang="bash">
 
<syntaxhighlight lang="bash">
sudo service confluent-zookeeper start
+
sudo systemctl daemon-reload
sudo service confluent-zookeeper status
+
sudo service confluent-kafka start
 +
sudo service confluent-kafka status
 
</syntaxhighlight>
 
</syntaxhighlight>
  
  接下來可以將TCP Port 2181建立對應的TCP Load Balancer(像是用ELB)。
+
== 基本操作 ==
  
==  設定Kafka ==
+
=== topic ===
 +
 
 +
topic的操作都是透過<code>kafka-topics</code>處理,其中在較新的版本是使用<code>--bootstrap-server</code>指定Broker位置當作接口(可省略Port資訊,預設使用Port 9092),在較舊版本則是使用<code>--zookeeper</code>指定ZooKeeper位置當作接口(可省略Port資訊,預設使用9092)。
 +
 
 +
建立是<code>--create</code>:
 +
 
 +
<syntaxhighlight lang="bash">
 +
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
 +
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
 +
</syntaxhighlight>
 +
 
 +
  觀看是透過<code>--list</code>(簡易)或是<code>--describe</code>(詳細):
 +
 
 +
<syntaxhighlight lang="bash">
 +
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
 +
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
 +
</syntaxhighlight>
 +
 
 +
<syntaxhighlight lang="bash">
 +
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
 +
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
 +
</syntaxhighlight>
  
  如果[[ZooKeeper]]與[[Kafka]]不同伺服器,需要修改<code>/etc/kafka/server.properties</code>的<code>zookeeper.connect</code> 的值,像是這樣
+
  刪除topic則是透過<code>--delete</code>:
  
<syntaxhighlight lang="ini">
+
<syntaxhighlight lang="bash">
#zookeeper.connect=localhost:2181
+
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
zookeeper.connect=internal-test-gslin-confluent-xxxxxxxxx.us-east-1.elb.amazonaws.com:2181
+
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
 
</syntaxhighlight>
 
</syntaxhighlight>
  
  修改<code>/etc/kafka/server.properties</code>的<code>broker.id</code> 設定,讓他自動產生而不需要自己指定
+
  預設值只會標記(<code>MarkedForDeletion</code>),而非實際刪除:
 +
 
 +
<pre>
 +
Topic test is marked for deletion.
 +
Note: This will have no impact if delete.topic.enable is not set to true.
 +
</pre>
 +
 
 +
=== 訊息 ===
 +
 
 +
把目前 日期資訊傳到<code>test</code>
  
 
<syntaxhighlight lang="bash">
 
<syntaxhighlight lang="bash">
#broker.id=0
+
date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test
broker.id.generation.enable=true
 
 
</syntaxhighlight>
 
</syntaxhighlight>
  
  理論上就可以啟動了
+
  接收訊息,並且從頭開始收(<code>--from-beginning</code>)
  
 
<syntaxhighlight lang="bash">
 
<syntaxhighlight lang="bash">
sudo service confluent-confluent start
+
date | kafka-console-consumer --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning
sudo service confluent-confluent status
 
 
</syntaxhighlight>
 
</syntaxhighlight>
  
 
== 設定其他套件 ==
 
== 設定其他套件 ==
 +
 +
上面提到的是Kafka最低運作的設定,通常會安裝其他的套件提供服務。要注意其他的套件會需要額外的CPU與記憶體資源。
  
 
 啟動Schema Registry:
 
 啟動Schema Registry:
第125行: 第164行:
 
* {{Official|https://www.confluent.io/}} {{en}}
 
* {{Official|https://www.confluent.io/}} {{en}}
 
* [https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.htmlManual Install using Systemd on Ubuntu and Debian] {{en}}
 
* [https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.htmlManual Install using Systemd on Ubuntu and Debian] {{en}}
 +
* [https://cwiki.apache.org/confluence/display/ZOOKEEPER/Client-Server+mutual+authentication Client-Server mutual authentication] {{en}}
 +
* [https://medium.com/@johny.urgiles/overcoming-zookeeper-acls-1b205cfdc301 Overcoming Zookeeper ACLs] {{en}}
  
 
[[Category:軟體]]
 
[[Category:軟體]]

2019年5月2日 (四) 07:42的最新版本

Confluent是一家公司,也是该公司的Apache Kafka产品线的产品名。

简介

Confluent是Apache Kafka发明人出来开的公司,也是目前最知名的Kafka商业支援服务。同时Confluent也是该公司推出的软体品牌,提供了众多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天试用)。

硬体

在PoC时我使用了三台t3.small(2 GB RAM,另外手动设定加上512 MB Swap),三台都安装完整的套件并且跑起来,一开始不会有问题,但跑一阵子后会因为记忆体不足而异常。

建议在正式环境下ZooKeeper部分建议使用m5.large(8 GB RAM)或是更好的机器执行。

安装

当然要先装Java,然后安装社群版本:

sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11

设定ZooKeeper

设定Kafka

修改/etc/kafka/server.propertiesbroker.id设定,让他自动产生而不需要自己指定:

#broker.id=0
broker.id.generation.enable=true

修改listeners设定,虽然预设值会听正确的介面,但因为这个资讯会被注册到ZooKeeper里面,所以需要注册正确的介面资讯:

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://10.1.1.1:9092

如果ZooKeeperKafka不同伺服器,需要修改/etc/kafka/server.propertieszookeeper.connect的值,像是这样:

#zookeeper.connect=localhost:2181
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181

另外可以设定/lib/systemd/system/confluent-kafka.service.d/30-options.conf(目录可能会需要自己建立),让Kafka吃满记忆体(这边假设是8 GB的记忆体,保留1 GB给系统与其他情境使用),以及开启JMX监控机制,跑在Port 32181:

[Service]
Environment=JMX_PORT=32181
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

对应的指令:

sudo mkdir /lib/systemd/system/confluent-kafka.service.d/
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf

设完后就可以让systemd重读设定后启动:

sudo systemctl daemon-reload
sudo service confluent-kafka start
sudo service confluent-kafka status

基本操作

topic

topic的操作都是透过kafka-topics处理,其中在较新的版本是使用--bootstrap-server指定Broker位置当作接口(可省略Port资讯,预设使用Port 9092),在较旧版本则是使用--zookeeper指定ZooKeeper位置当作接口(可省略Port资讯,预设使用9092)。

建立是--create

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1

观看是透过--list(简易)或是--describe(详细):

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe

删除topic则是透过--delete

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test

预设值只会标记(MarkedForDeletion),而非实际删除:

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

讯息

把目前的日期资讯传到test里:

date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test

接收讯息,并且从头开始收(--from-beginning):

date | kafka-console-consumer --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning

设定其他套件

上面提到的是Kafka最低运作的设定,通常会安装其他的套件提供服务。要注意其他的套件会需要额外的CPU与记忆体资源。

启动Schema Registry:

sudo service confluent-schema-registry start
sudo service confluent-schema-registry status

启动Kafka Connect:

sudo service confluent-kafka-connect start
sudo service confluent-kafka-connect status

启动Kafka REST Proxy:

sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status

启动KSQL:

sudo service confluent-ksql start
sudo service confluent-ksql status

相关连结

外部连结