Confluent:修订间差异

来自Gea-Suan Lin's Wiki
跳到导航 跳到搜索
此页面具有访问限制。如果您看见此消息,则说明您没有权限访问此页面。
第73行: 第73行:
=== topic ===
=== topic ===


topic的操作都是透過<code>kafka-topics</code>處理,其中在較新的版本是使用<code>--bootstrap-server</code>指定Broker位置當作接口,在較舊版本則是使用<code>--zookeeper</code>指定ZooKeeper位置當作接口。
topic的操作都是透過<code>kafka-topics</code>處理,其中在較新的版本是使用<code>--bootstrap-server</code>指定Broker位置當作接口 (可省略Port資訊,預設使用Port 9092) ,在較舊版本則是使用<code>--zookeeper</code>指定ZooKeeper位置當作接口 (可省略Port資訊,預設使用9092)


 建立是<code>--create</code>:
 建立是<code>--create</code>:

2019年4月23日 (二) 10:20的版本

Confluent是一家公司,也是该公司的Apache Kafka产品线的产品名。

简介

Confluent是Apache Kafka发明人出来开的公司,也是目前最知名的Kafka商业支援服务。同时Confluent也是该公司推出的软体品牌,提供了众多的Open Source套件用以管理Kafka Cluster(即社群版本元件),另外提供商用版本,包括Control Center(提供三十天试用)。

硬体

在PoC时我使用了三台t3.small(2 GB RAM,另外手动设定加上512 MB Swap),三台都安装完整的套件并且跑起来,一开始不会有问题,但跑一阵子后会因为记忆体不足而异常。

建议在正式环境下ZooKeeper部分建议使用m5.large(8 GB RAM)或是更好的机器执行。

安装

当然要先装Java,然后安装社群版本:

sudo apt install -y default-jre
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -; sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"; sudo apt install -y confluent-community-2.11

设定ZooKeeper

设定Kafka

修改/etc/kafka/server.propertiesbroker.id设定,让他自动产生而不需要自己指定:

#broker.id=0
broker.id.generation.enable=true

修改listeners设定:

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://172.30.3.232:9092

如果ZooKeeperKafka不同伺服器,需要修改/etc/kafka/server.propertieszookeeper.connect的值,像是这样:

#zookeeper.connect=localhost:2181
zookeeper.connect=internal-test-kafka-zookeeper-123456789.us-east-1.elb.amazonaws.com:2181

另外可以设定/lib/systemd/system/confluent-kafka.service.d/30-options.conf(目录可能会需要自己建立),让Kafka吃满记忆体(这边假设是8 GB的记忆体,保留1 GB给系统与其他情境使用):

[Service]
Environment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"

对应的指令:

sudo mkdir /lib/systemd/system/confluent-kafka.service.d/
echo -e '[Service]\nEnvironment=KAFKA_HEAP_OPTS="-Xmx7g -Xms7g"' | sudo tee /lib/systemd/system/confluent-kafka.service.d/30-options.conf

设完后就可以让systemd重读设定后启动:

sudo systemctl daemon-reload
sudo service confluent-kafka start
sudo service confluent-kafka status

基本操作

topic

topic的操作都是透过kafka-topics处理,其中在较新的版本是使用--bootstrap-server指定Broker位置当作接口(可省略Port资讯,预设使用Port 9092),在较旧版本则是使用--zookeeper指定ZooKeeper位置当作接口(可省略Port资讯,预设使用9092)。

建立是--create

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --create --topic test --replication-factor 3 --partitions 1

观看是透过--list(简易)或是--describe(详细):

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --list
kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --describe

删除topic则是透过--delete

kafka-topics --bootstrap-server internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test
kafka-topics --zookeeper internal-test-kafka-zookeeper-lb-123456789.ap-southeast-1.elb.amazonaws.com --delete --topic test

预设值只会标记(MarkedForDeletion),而非实际删除:

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

讯息

把目前的日期资讯传到test里:

date | kafka-console-producer --broker-list internal-test-kafka-broker-lb-123456789.ap-southeast-1.elb.amazonaws.com:9092 --topic test

设定其他套件

启动Schema Registry:

sudo service confluent-schema-registry start
sudo service confluent-schema-registry status

启动Kafka Connect:

sudo service confluent-kafka-connect start
sudo service confluent-kafka-connect status

启动Kafka REST Proxy:

sudo service confluent-kafka-rest start
sudo service confluent-kafka-rest status

启动KSQL:

sudo service confluent-ksql start
sudo service confluent-ksql status

相关连结

外部连结