Confluent:修订间差异
(→基本操作) |
|||
(未显示同一用户的5个中间版本) | |||
第33行: | 第33行: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
修改<code>listeners</code>設定: | 修改<code>listeners</code>設定 ,雖然預設值會聽正確的介面,但因為這個資訊會被註冊到ZooKeeper裡面,所以需要註冊正確的介面資訊 : | ||
<syntaxhighlight lang="ini"> | <syntaxhighlight lang="ini"> | ||
#listeners=PLAINTEXT://:9092 | #listeners=PLAINTEXT://:9092 | ||
listeners=PLAINTEXT:// | listeners=PLAINTEXT://10.1.1.1:9092 | ||
</syntaxhighlight> | </syntaxhighlight> | ||
第47行: | 第47行: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
另外可以設定<code>/lib/systemd/system/confluent-kafka.service.d/30-options.conf</code>(目錄可能會需要自己建立),讓Kafka吃滿記憶體(這邊假設是8 GB的記憶體,保留1 GB給系統與其他情境使用): | 另外可以設定<code>/lib/systemd/system/confluent-kafka.service.d/30-options.conf</code>(目錄可能會需要自己建立),讓Kafka吃滿記憶體(這邊假設是8 GB的記憶體,保留1 GB給系統與其他情境使用) ,以及開啟JMX監控機制,跑在Port 32181 : | ||
<syntaxhighlight lang="ini"> | <syntaxhighlight lang="ini"> | ||
[Service] | [Service] | ||
Environment=JMX_PORT=32181 | |||
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g" | Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g" | ||
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" | |||
</syntaxhighlight> | </syntaxhighlight> | ||
第58行: | 第60行: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
sudo mkdir /lib/systemd/system/confluent-kafka.service.d/ | sudo mkdir /lib/systemd/system/confluent-kafka.service.d/ | ||
echo -e '[Service]\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf | echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf | ||
</syntaxhighlight> | </syntaxhighlight> | ||
第73行: | 第75行: | ||
=== topic === | === topic === | ||
topic的操作都是透過<code>kafka-topics</code>處理,其中在較新的版本是使用<code>--bootstrap-server</code>指定Broker位置當作接口,在較舊版本則是使用<code>--zookeeper</code>指定ZooKeeper位置當作接口。 | topic的操作都是透過<code>kafka-topics</code>處理,其中在較新的版本是使用<code>--bootstrap-server</code>指定Broker位置當作接口 (可省略Port資訊,預設使用Port 9092) ,在較舊版本則是使用<code>--zookeeper</code>指定ZooKeeper位置當作接口 (可省略Port資訊,預設使用9092) 。 | ||
建立是<code>--create</code>: | 建立是<code>--create</code>: | ||
第114行: | 第116行: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test | date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test | ||
</syntaxhighlight> | |||
接收訊息,並且從頭開始收(<code>--from-beginning</code>): | |||
<syntaxhighlight lang="bash"> | |||
date | kafka-console-consumer --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== 設定其他套件 == | == 設定其他套件 == | ||
上面提到的是Kafka最低運作的設定,通常會安裝其他的套件提供服務。要注意其他的套件會需要額外的CPU與記憶體資源。 | |||
啟動Schema Registry: | 啟動Schema Registry: |
2019年5月2日 (四) 07:42的版本
Confluent是一家公司,也是该公司的Apache Kafka产品线的产品名。
简介
Confluent是Apache Kafka发明人出来开的公司,也是目前最知名的Kafka商业支援服务。同时Confluent也是该公司推出的软体品牌,提供了众多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天试用)。
硬体
在PoC时我使用了三台t3.small
(2 GB RAM,另外手动设定加上512 MB Swap),三台都安装完整的套件并且跑起来,一开始不会有问题,但跑一阵子后会因为记忆体不足而异常。
建议在正式环境下ZooKeeper部分建议使用m5.large
(8 GB RAM)或是更好的机器执行。
安装
当然要先装Java,然后安装社群版本:
sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11
设定ZooKeeper
设定Kafka
修改/etc/kafka/server.properties
的broker.id
设定,让他自动产生而不需要自己指定:
#broker.id=0
broker.id.generation.enable=true
修改listeners
设定,虽然预设值会听正确的介面,但因为这个资讯会被注册到ZooKeeper里面,所以需要注册正确的介面资讯:
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://10.1.1.1:9092
如果ZooKeeper与Kafka不同伺服器,需要修改/etc/kafka/server.properties
的zookeeper.connect
的值,像是这样:
#zookeeper.connect=localhost:2181
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181
另外可以设定/lib/systemd/system/confluent-kafka.service.d/30-options.conf
(目录可能会需要自己建立),让Kafka吃满记忆体(这边假设是8 GB的记忆体,保留1 GB给系统与其他情境使用),以及开启JMX监控机制,跑在Port 32181:
[Service]
Environment=JMX_PORT=32181
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"
Environment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
对应的指令:
sudo mkdir /lib/systemd/system/confluent-kafka.service.d/
echo -e '[Service]\nEnvironment=JMX_PORT=32181\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"\nEnvironment=KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${hostip} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=32181 -Dcom.sun.management.jmxremote.port=32181 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf
设完后就可以让systemd重读设定后启动:
sudo systemctl daemon-reload
sudo service confluent-kafka start
sudo service confluent-kafka status
基本操作
topic
topic的操作都是透过kafka-topics
处理,其中在较新的版本是使用--bootstrap-server
指定Broker位置当作接口(可省略Port资讯,预设使用Port 9092),在较旧版本则是使用--zookeeper
指定ZooKeeper位置当作接口(可省略Port资讯,预设使用9092)。
建立是--create
:
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
观看是透过--list
(简易)或是--describe
(详细):
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
删除topic则是透过--delete
:
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
预设值只会标记(MarkedForDeletion
),而非实际删除:
Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
讯息
把目前的日期资讯传到test
里:
date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test
接收讯息,并且从头开始收(--from-beginning
):
date | kafka-console-consumer --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test --from-beginning
设定其他套件
上面提到的是Kafka最低运作的设定,通常会安装其他的套件提供服务。要注意其他的套件会需要额外的CPU与记忆体资源。
启动Schema Registry:
sudo service confluent-schema-registry start
sudo service confluent-schema-registry status
启动Kafka Connect:
sudo service confluent-kafka-connect start
sudo service confluent-kafka-connect status
启动Kafka REST Proxy:
sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status
启动KSQL:
sudo service confluent-ksql start
sudo service confluent-ksql status