kafaka学习笔记之快速入门
业务需要,最近在看尚硅谷的教程,本着好记性不如烂笔头的原则,有了这一系列的文章。
关键词:kafaka
安装部署
docker安装
这里使用docker的安装方式,这里贴出使用的镜像地址:https://hub.docker.com/r/bitnami/kafka/
docker安装方式十分简单,这里贴出使用的docker-compose.yml文件:
1 | version: "3" |
有几点需要注意:
KAFKA_BROKER_ID:设置broker.id,每一个broker必须设置不同的值,否则会启动失败;KAFKA_CFG_LISTENERS:设置kafka服务端监听的网络地址,这里是默认值0.0.0.0;KAFKA_CFG_ADVERTISED_LISTENERS:向外通告kafka实例的网络地址,一般使用宿主机的局域网地址或者公网地址;KAFKA_CFG_ZOOKEEPER_CONNECT:设置zookeeper的地址,这里最好是加上/kafka后缀,不然导致zookeeper的一级目录看起来混乱;
配置文件
可以通过环境变量快速设置几个关键选项,其他选项可以通过挂载配置文件进行修改:
- 环境变量设置
The configuration can easily be setup with the Bitnami Apache Kafka Docker image using the following environment variables:
ALLOW_PLAINTEXT_LISTENER: Allow to use the PLAINTEXT listener. Default: no.KAFKA_INTER_BROKER_USER: Apache Kafka inter broker communication user. Default: admin. Default: user.KAFKA_INTER_BROKER_PASSWORD: Apache Kafka inter broker communication password. Default: bitnami.KAFKA_CERTIFICATE_PASSWORD: Password for certificates. No defaults.KAFKA_HEAP_OPTS: Apache Kafka's Java Heap size. Default: -Xmx1024m -Xms1024m.KAFKA_ZOOKEEPER_PROTOCOL: Authentication protocol for Zookeeper connections. Allowed protocols: PLAINTEXT, SASL, SSL, and SASL_SSL. Defaults: PLAINTEXT.KAFKA_ZOOKEEPER_USER: Apache Kafka Zookeeper user for SASL authentication. No defaults.KAFKA_ZOOKEEPER_PASSWORD: Apache Kafka Zookeeper user password for SASL authentication. No defaults.KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD: Apache Kafka Zookeeper keystore file password and key password. No defaults.KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD: Apache Kafka Zookeeper truststore file password. No defaults.KAFKA_ZOOKEEPER_TLS_VERIFY_HOSTNAME: Verify Zookeeper hostname on TLS certificates. Defaults: true.KAFKA_ZOOKEEPER_TLS_TYPE: Choose the TLS certificate format to use. Allowed values:JKS,PEM. Defaults: JKS.KAFKA_CFG_SASL_ENABLED_MECHANISMS: Allowed mechanism when using SASL either for clients, inter broker, or zookeeper comunications. Allowed values:PLAIN,SCRAM-SHA-256,SCRAM-SHA-512or a comma separated combination of those values. Default: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SASL mechanism to use for inter broker communications. No defaults.KAFKA_TLS_CLIENT_AUTH: Configures kafka brokers to request client authentication. Allowed values:required,requested,none. Defaults: required.KAFKA_TLS_TYPE: Choose the TLS certificate format to use. Allowed values:JKS,PEM. Defaults: JKS.KAFKA_CLIENT_USERS: Users that will be created into Zookeeper when using SASL for client communications. Separated by commas. Default: userKAFKA_CLIENT_PASSWORDS: Passwords for the users specified atKAFKA_CLIENT_USERS. Separated by commas. Default: bitnamiKAFKA_CFG_MAX_PARTITION_FETCH_BYTES: The maximum amount of data per-partition the server will return. Default: 1048576KAFKA_CFG_MAX_REQUEST_SIZE: The maximum size of a request in bytes. Default: 1048576Additionally, any environment variable beginning with
KAFKA_CFG_will be mapped to its corresponding Apache Kafka key. For example, useKAFKA_CFG_BACKGROUND_THREADSin order to setbackground.threadsorKAFKA_CFG_AUTO_CREATE_TOPICS_ENABLEin order to configureauto.create.topics.enable.
- 挂载配置文件
1 | ... |
一些推荐配置
1 | #broker 的全局唯一编号,不能重复,只能是数字。 |
kafka命令行操作

主题命令行参数
1 | bin/kafka-topics.sh |
| 参数 | 描述 |
|---|---|
| --bootstrap-server <String: server toconnect to> | 链接kafka broker主机名称 |
| --topic <String: topic> | 操作的 topic 名称 |
| --create | 创建主题 |
| --alter | 修改主题 |
| --delete | 删除主题 |
| --list | 查看所有主题 |
| --describe | 查看主题详细描述 |
| --partitions <Integer: # of partitions> | 设置分区数 |
| --replication-factor<Integer: replication factor> | 设置分区副本 |
| --config <String: name=value> | 更新系统默认的配置 |
主题命令行相关操作
1 | # 查看当前服务器所有topic |
生产者命令行操作
生产者命令行参数
| 参数 | 描述 |
|---|---|
| --bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
| --topic <String: topic> | 操作的 topic 名称 |
发送消息
1 | bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first |
消费者命令行参操作
消费者命令行参数
| 参数 | 描述 |
|---|---|
| --bootstrap-server <String: server toconnect to> | 链接的kafka broker主机名和短端口号 |
| --topic | 操作的topic名称 |
| --from-beginning | 从头开始消费 |
| --group <String: consumer group id> | 指定消费者组名称 |
消费消息
1 | bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first |