Confluent:修订间差异

来自Gea-Suan Lin's Wiki
跳到导航 跳到搜索
此页面具有访问限制。如果您看见此消息,则说明您没有权限访问此页面。
(未显示同一用户的21个中间版本)
第9行: 第9行:
 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。
 在PoC時我使用了三台<code>t3.small</code>(2 GB RAM,另外手動設定加上512 MB Swap),三台都安裝完整的套件並且跑起來,一開始不會有問題,但跑一陣子後會因為記憶體不足而異常。


 建議在正式環境 下ZooKeeper部 建議 使用<code>m5.large</code>(8 GB RAM)或是更好的機器執行。
 建議在正式環境 下,ZooKeeper與Kafka 開跑,都 使用<code>m5.large</code>(8 GB RAM)或是更好的機器執行。


== 安裝 ==
== 安裝 ==
第17行: 第17行:
<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo apt install -y default-jre
sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11
wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main"; sudo apt install -y confluent-community-2.12
</syntaxhighlight>
</syntaxhighlight>


== 設定ZooKeeper ==
== 設定ZooKeeper ==


{{See also|Confluent/ZooKeeper}}
{{Main|Confluent/ZooKeeper}}


== 設定Kafka ==
== 設定Kafka ==


如果[[ZooKeeper]]與[[Kafka]]不同伺服器,需要修改<code>/etc/kafka/server.properties</code>的<code>zookeeper.connect</code>的值,像是這樣:
{{Main|Confluent/Kafka}}


<syntaxhighlight lang="ini">
== 基本操作 ==
#zookeeper.connect=localhost:2181
 
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181
=== topic ===
</syntaxhighlight>
 
topic的操作都是透過<code>kafka-topics</code>處理,使用<code>--zookeeper</code> 指定ZooKeeper位置(可省略Port資訊,預設使用2181)。


  修改<code>/etc/kafka/server.properties</code>的<code>broker.id</code> 設定,讓他自動產生而不需要自己指定
  建立是<code>--create</code>:


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
#broker.id=0
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
broker.id.generation.enable=true
</syntaxhighlight>
</syntaxhighlight>


  另外可以設定<code>/lib/systemd/system/confluent-kafka.service.d/30-options.conf</code>( 目錄可能會需要自己建立 ,讓Kafka吃滿記憶體 這邊假設是8 GB的記憶體,保留1 GB給系統與其他情境使用 ):
  觀看是透過<code>--list</code>( 簡易 或是<code>--describe</code> 詳細 ):


<syntaxhighlight lang="ini">
<syntaxhighlight lang="bash">
[Service]
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
</syntaxhighlight>
</syntaxhighlight>
對應的指令:


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo mkdir /lib/systemd/system/confluent-kafka.service.d/
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
echo -e '[Service]\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf
</syntaxhighlight>
</syntaxhighlight>


  設完後就可以讓systemd重讀設定後啟動
  刪除topic則是透過<code>--delete</code>


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
sudo systemctl daemon-reload
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
sudo service confluent-kafka start
sudo service confluent-kafka status
</syntaxhighlight>
</syntaxhighlight>


==  基本操作 ==
預設值只會標記(<code>MarkedForDeletion</code>),而非實際刪除:
 
<pre>
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
</pre>
 
===  訊息 ===
 
{{更新|time=2019-12-31T10:36:45+00:00}}


  建立topic可以用<code>kafka-topics</code>操作,在較新 版本是使用<code>--bootstrap-server</code>指定Broker位置,在較舊版本則是使用<code>--zookeeper</code> 指定ZooKeeper位置
  把目前 日期資訊傳到<code>test</code>


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
</syntaxhighlight>
</syntaxhighlight>


  觀看有哪些topic也可以用<code>kafka-topics</code> 操作,跟前面有一樣的版本問題
  接收訊息,並且從頭開始收(<code>--from-beginning</code>


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
</syntaxhighlight>
</syntaxhighlight>


== 設定其他套件 ==
== 設定其他套件 ==
上面提到的是Kafka最低運作的設定,通常會安裝其他的套件提供服務。要注意其他的套件會需要額外的CPU與記憶體資源。


 啟動Schema Registry:
 啟動Schema Registry:

2019年12月31日 (二) 10:36的版本

Confluent是一家公司,也是该公司的Apache Kafka产品线的产品名。

简介

Confluent是Apache Kafka发明人出来开的公司,也是目前最知名的Kafka商业支援服务。同时Confluent也是该公司推出的软体品牌,提供了众多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天试用)。

硬体

在PoC时我使用了三台t3.small(2 GB RAM,另外手动设定加上512 MB Swap),三台都安装完整的套件并且跑起来,一开始不会有问题,但跑一阵子后会因为记忆体不足而异常。

建议在正式环境下,ZooKeeper与Kafka分开跑,都使用m5.large(8 GB RAM)或是更好的机器执行。

安装

当然要先装Java,然后安装社群版本:

sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main"; sudo apt install -y confluent-community-2.12

设定ZooKeeper

设定Kafka

基本操作

topic

topic的操作都是透过kafka-topics处理,使用--zookeeper指定ZooKeeper位置(可省略Port资讯,预设使用2181)。

建立是--create

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1

观看是透过--list(简易)或是--describe(详细):

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe

删除topic则是透过--delete

kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test

预设值只会标记(MarkedForDeletion),而非实际删除:

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

讯息

把目前的日期资讯传到test里:

date | kafka-console-producer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test

接收讯息,并且从头开始收(--from-beginning):

kafka-console-consumer --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --topic test --from-beginning

设定其他套件

上面提到的是Kafka最低运作的设定,通常会安装其他的套件提供服务。要注意其他的套件会需要额外的CPU与记忆体资源。

启动Schema Registry:

sudo service confluent-schema-registry start
sudo service confluent-schema-registry status

启动Kafka Connect:

sudo service confluent-kafka-connect start
sudo service confluent-kafka-connect status

启动Kafka REST Proxy:

sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status

启动KSQL:

sudo service confluent-ksql start
sudo service confluent-ksql status

相关连结

外部连结