Apache Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开发,后来成为 Apache 项目。Kafka 主要用于高吞吐量、低延迟的实时数据流处理,常用于日志收集、实时分析、消息传递等场景。以下是关于 Kafka 的详细讲解:
Kafka 是一个分布式的消息队列系统,通常用于处理大规模的实时数据流。它具有以下几个核心组件:
Producer(生产者):生产者负责将消息发布到 Kafka 集群中的一个或多个主题(Topic)。它可以是任何能够生成数据的应用程序。
Consumer(消费者):消费者从 Kafka 中订阅主题并消费消息。消费者可以是一个或多个应用程序,它们处理从 Kafka 接收到的消息。
Broker(代理):Kafka 集群由多个 Broker 组成。每个 Broker 负责存储和管理消息,并处理来自生产者和消费者的请求。
Topic(主题):Kafka 中的消息按照主题进行分类。每个消息都有一个指定的主题,消费者可以基于主题进行订阅。
Partition(分区):每个主题可以有多个分区,Kafka 会将消息分散到多个分区中,以实现负载均衡和高可用性。分区的数量在创建主题时指定,数据按照分区顺序进行存储。
Replica(副本):Kafka 通过副本机制保证数据的高可用性。每个分区可以有多个副本,分布在不同的 Broker 上。这样可以避免单点故障,提高容错性。
Kafka 使用发布-订阅模式来传递消息。消息的传递过程包括以下几个步骤:
Producer 发布消息:生产者将消息发送到 Kafka 中指定的主题。消息可以按照特定的策略(如随机、轮询等)分配到不同的分区中。
Broker 存储消息:消息会被存储在相应的分区中。每个分区的消息是有序的,并且可以被消费者按顺序消费。
Consumer 消费消息:消费者订阅一个或多个主题,并消费其中的消息。消费者会根据分区的位移(offset)来追踪已经消费的消息,确保不重复消费。
消息确认与可靠性:消费者在成功消费消息后会发送确认给 Kafka,表示消息已经被成功处理。Kafka 会根据配置(如 acks
设置)决定消息的确认机制。如果消息在某个阶段发生丢失,Kafka 会利用副本机制进行恢复。
Kafka 中的 主题(Topic) 是消息的基本分类方式,用于组织和区分消息流。可以把主题看作是 Kafka 中的一种 消息类别,消费者和生产者通过主题来进行消息的发送和接收。
生产者向主题发送消息:Kafka 的生产者将消息发送到指定的主题。每个主题都有一个唯一的名字。
消费者订阅主题:消费者根据主题名来订阅消息。一个消费者可以订阅多个主题,获取来自不同主题的消息。
多个生产者和消费者可以订阅同一个主题:一个主题可以被多个生产者写入,同时也可以被多个消费者读取。消费者组的管理确保每条消息在一个组内只能被一个消费者消费。
Kafka 中的 分区(Partition) 是主题的子部分,它是 Kafka 中并行处理的基本单元。每个主题可以有多个分区。分区的作用是将主题的消息分散到多个服务器(Kafka Broker)上存储,从而提高吞吐量和可伸缩性。
消息的顺序性:每个分区内的消息是有顺序的。Kafka 保证同一分区中的消息按照发送顺序排列。
并行消费:通过将一个主题划分为多个分区,Kafka 可以支持并行处理。当多个消费者组中的消费者订阅一个主题时,Kafka 会将分区分配给不同的消费者,实现消息的并行消费。
每个分区中的消息都有一个唯一的编号,称为 偏移量(Offset)。偏移量是 Kafka 用来跟踪消息消费进度的标识符。
消费进度管理:消费者通过偏移量来跟踪自己消费到哪条消息。例如,消费者读取消息时会记录最后消费的消息偏移量,下次启动时会从该偏移量处继续消费。
消费者组的偏移量:每个消费者组有独立的偏移量。不同的消费者组会有不同的消费进度。
Kafka 中的 生产者(Producer) 是负责将消息发送到 Kafka 主题的客户端。
Kafka 中的 消费者(Consumer) 是从 Kafka 中读取消息的客户端。
Kafka 中的 消费者组(Consumer Group) 是多个消费者的集合,共同消费一个或多个主题中的消息。消费者组的核心目的是 并行消费,确保消息在组内每个分区只被一个消费者消费。
Kafka 的存储机制基于日志文件,每个主题的分区都会有一个独立的日志文件。Kafka 的日志机制具有以下特点:
顺序写入:Kafka 使用顺序写入的方式,所有的消息都会追加到日志文件的末尾,这使得磁盘的 I/O 操作非常高效。
不可修改的日志:消息一旦被写入日志文件,就不能修改。这种设计简化了分布式系统中的数据一致性问题,同时也为后期的消息回溯和重放提供了便利。
日志保留策略:Kafka 会根据配置的日志保留策略来决定日志文件的保存时间或大小。可以设置基于时间(如保留 7 天内的日志)或基于大小(如日志文件达到一定大小后删除旧的日志)。
Kafka 具有高度的容错性,主要通过以下几种机制实现:
副本机制:每个分区的消息会有多个副本,存储在不同的 Broker 上。一个分区的领导者(leader)负责处理所有的读写请求,而副本(follower)则跟随领导者同步数据。如果领导者故障,Kafka 会自动选举新的领导者,保证数据的可用性。
数据复制:Kafka 会将消息同步到副本中,以防止单点故障。在副本数配置较高的情况下,即使有多个 Broker 故障,Kafka 也能保持数据的高可用性。
消息确认(ACKs):Kafka 提供三种不同的确认策略来保证消息的可靠性:
acks=0
:生产者不等待确认,消息可能会丢失。acks=1
:生产者等待至少一个副本确认,消息如果被写入一个副本就会认为成功。acks=all
(或 acks=-1
):生产者等待所有副本确认,提供最高的可靠性。假设我本地 D:kafka
下有下载好的 kafka 和 zookeeper。(kafka默认自带了一个 zookeeper, 这里我下载了一个zookeeper)
Kafka 使用 Zookeeper 进行集群协调,因此 Zookeeper 是启动 Kafka 的前提。你可以使用 Kafka 自带的 Zookeeper 启动脚本启动一个本地 Zookeeper 实例。(非window系统的话不用进到window目录, 只需要bin目录)
打开一个新的 PowerShell 窗口,进入到 Kafka 解压目录中的 binwindows
目录:
cd D:kafkakafka_2.13-3.9.0binwindows
启动 Zookeeper:
.zookeeper-server-start.bat ....configzookeeper.properties
这个命令会启动一个本地的 Zookeeper 实例,并默认监听在 localhost:2181
。出现 Zookeeper 启动日志,表明 Zookeeper 服务正在运行:
[2024-11-08 10:00:00,123] INFO binding to port 2181 (org.apache.zookeeper.server.ZooKeeperServer)
打开另一个新的 PowerShell 窗口,进入 Kafka 解压目录中的 binwindows
目录:
cd D:kafkakafka_2.13-3.9.0binwindows
启动 Kafka 服务器:
.kafka-server-start.bat ....configserver.properties
默认情况下,Kafka 会监听 localhost:9092
,并连接到本地的 Zookeeper 实例(localhost:2181
)。出现 Kafka 启动日志,表明 Kafka 服务正在运行:
[2024-11-08 10:05:00,456] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
现在,你已经成功启动了 Kafka 服务。你可以通过创建一个 Kafka 主题并发送/接收消息来验证 Kafka 是否工作正常。
创建 Kafka 主题(例如,创建一个名为 test
的主题):
.kafka-topics.bat --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
如果命令执行成功,表示 Kafka 已经启动并且主题 test
被创建。
列出所有 Kafka 主题:
.kafka-topics.bat --list --bootstrap-server localhost:9092
如果显示 test
主题,说明 Kafka 启动正常。
启动生产者(发送消息到 test
主题):
.kafka-console-producer.bat --topic test --bootstrap-server localhost:9092
输入一些消息并按回车(例如:Hello Kafka
),这会将消息发送到 Kafka 服务器的 test
主题。
启动消费者(消费 test
主题的消息):
.kafka-console-consumer.bat --topic test --bootstrap-server localhost:9092 --from-beginning
如果消费者显示出你输入的消息,说明 Kafka 正常工作。
Zookeeper 是一个分布式协调服务,它通过一个 集群(ensemble) 来提供高可用性。Zookeeper 集群通常由多个节点组成(一般是 3、5 或 7 个节点),以保证在单个节点失效时,集群仍能正常工作。
Zookeeper 使用 Leader-Follower 模式,集群中的一个节点会被选举为 Leader,负责处理写请求,其他节点作为 Follower,负责处理读请求。通过这种方式,Zookeeper 保证了分布式系统的一致性和高可用性。
确保已经准备好多个服务器,或者在本地启动多个 Zookeeper 实例来模拟集群。(这里我们采用本地启动多个实例来演示)
id
,这对于 Zookeeper 的 Leader 选举非常重要。myid
文件来标识每个节点。我们以三个节点的 Zookeeper 集群为例,演示如何配置集群。
zoo.cfg
每个 Zookeeper 节点都需要配置一个 zoo.cfg
配置文件。你可以在每个节点的 conf
目录下找到这个文件(默认是zoo_sample.cfg
, 这是官方给的例子, 我们可以复制一份, 弄成 zoo.cfg
)。以下是一些重要的配置项:
X
是每个节点的唯一标识符。假设你有 3 个 Zookeeper 节点,分别是 localhost:2181
、localhost:2182
和 localhost:2183
,以下是每个节点的 zoo.cfg
配置文件的示例。(这里我们是在本地起了三个zookeeper实例, 把不同的实例运行在不同的端口,正常三台服务器的话应该都运行在默认端口2181
, 然后 localhost 换成服务器的ip地址)
下面给出三个服务器的 zoo.cfg
配置文件。(这里我们还是本地起的三个实例)
localhost:2181
):# 数据存储目录
dataDir=D:/zookeeper/data1 # 服务器的话可以都写成 /zookeeper/data
# 客户端连接端口
clientPort=2181
# 启动服务器的数量
initLimit=5
syncLimit=2
# 集群成员
server.1=localhost:2888:3888 # 三台服务器的话应该是 localhost 换成三个服务器的ip,三个都是2888.3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
localhost:2182
):dataDir=D:/zookeeper/data2
clientPort=2182
initLimit=5
syncLimit=2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
localhost:2183
):dataDir=D:/zookeeper/data3
clientPort=2183
initLimit=5
syncLimit=2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
myid
文件每个 Zookeeper 节点都需要一个 myid
文件,文件中存储一个唯一的 ID,用于标识该节点。这是 Zookeeper 集群中的一个重要配置项。
在每个节点的 dataDir
目录下创建一个名为 myid
的文件。
文件内容为节点的 ID。例如:
D:/zookeeper/data1
目录下创建一个文件 myid
,内容为 1
。D:/zookeeper/data2
目录下创建一个文件 myid
,内容为 2
。D:/zookeeper/data3
目录下创建一个文件 myid
,内容为 3
。启动每个 Zookeeper 节点:
使用以下命令分别在不同的终端(或者在不同的 PowerShell 窗口)中启动每个 Zookeeper 节点。进入到 Zookeeper 的 binwindows
目录,然后执行以下命令启动 Zookeeper。
cd D:zookeeperzookeeper-3.8.4binwindows
.zkServer.cmd
cd D:zookeeperzookeeper-3.8.4binwindows
.zkServer.cmd
cd D:zookeeperzookeeper-3.8.4binwindows
.zkServer.cmd
验证集群状态:
在启动 Zookeeper 节点之后,可以通过以下命令验证集群的状态:
cd D:zookeeperzookeeper-3.8.4binwindows
.zkCli.cmd
进入 Zookeeper CLI 后,输入以下命令查看集群状态:
status
如果一切正常,你应该会看到以下类似的输出,表示 Zookeeper 集群已经成功启动:
Mode: follower
说明你当前所在的节点是 Follower,而集群中可能有一个节点是 Leader。
客户端可以通过任何一个 Zookeeper 节点的 clientPort
来连接到集群。
例如,使用以下命令连接到 localhost:2181
:
.zkCli.cmd -server localhost:2181
这样就可以连接到 Zookeeper 集群的某个节点,执行相关的操作。
在分布式环境中,通常会启动多个 Kafka 实例,形成一个 Kafka 集群。Kafka 集群中的所有节点都依赖 Zookeeper 来进行协调(如领导者选举、分区分配等)。以下是如何通过 Zookeeper 启动一个简单的 Kafka 集群。
一个 Kafka 集群通常由多个 Kafka broker 组成,每个 broker 都会连接到 Zookeeper。假设有 3 个 Kafka broker,它们的配置文件分别是 server-1.properties
、server-2.properties
和 server-3.properties
。(这里是我们在一个服务器上运行三个kafka)
每个 server.properties
配置文件需要设置 唯一的 broker.id
,Zookeeper 地址,以及其他相关配置。
复制 server.properties
文件,创建 3 个配置文件,分别命名为 server-1.properties
、server-2.properties
和 server-3.properties
,并修改以下参数:
broker.id
。例如,server-1.properties
文件配置如下:
broker.id=1
listeners=PLAINTEXT://localhost:9091
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=localhost:2181,localhost:2182,localhost2183 # 因为我们上面启动的三个zookeeper是同一主机的这三个端口
server-2.properties
和 server-3.properties
文件内容相似,只需修改 broker.id
和 listeners
配置:
server-2.properties
中设置 broker.id=2
,listeners=PLAINTEXT://localhost:9092
。server-3.properties
中设置 broker.id=3
,listeners=PLAINTEXT://localhost:9093
。打开多个 PowerShell 窗口,分别进入 Kafka 的 binwindows
目录。
启动每个 Kafka broker(分别对应 server-1.properties
、server-2.properties
和 server-3.properties
配置文件):
.kafka-server-start.bat ....configserver-1.properties
.kafka-server-start.bat ....configserver-2.properties
.kafka-server-start.bat ....configserver-3.properties
你可以通过以下命令验证 Kafka 集群是否正常启动:
列出所有主题:
.kafka-topics.bat --list --bootstrap-server localhost:9091
你应该能看到 Kafka 集群中的所有主题。
创建一个主题并验证其分区:
.kafka-topics.bat --create --topic test --bootstrap-server localhost:9091 --partitions 3 --replication-factor 3
该命令会创建一个名为 test
的主题,分配 3 个分区,并为每个分区分配 3 个副本。
查看 Kafka 集群的状态:
你可以使用以下命令来检查 Kafka 集群的健康状态和分区分布。
.kafka-topics.bat --describe --topic test --bootstrap-server localhost:9091
这里我们讲的是spring在kafka原生的api上进行封装后的用法
首先,你需要在 Spring Boot 项目的 pom.xml
中添加 Kafka 相关的依赖:
org.springframework.kafka
spring-kafka
org.springframework.kafka
spring-kafka-streams
简单的配置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test1Group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Kafka 消息生产者是向 Kafka 主题发送消息的组件。可以使用 KafkaTemplate
来发送消息。
KafkaTemplate
用来发送消息。send()
方法接收两个参数:主题名和消息内容。
@Autowired
private KafkaTemplate kafkaTemplate; // 消息是 key value的形式发送
// 对于指定key的,kafka会根据key的哈希值把他们放到一个partition中
// kafkaTemplate.send("test-topic", "kafkaKey1" "Kafka value");
// 没有指定key的,Kafka 会将消息发送到一个随机或轮询的分区,具体行为取决于我们使用的 Kafka 生产者配置
kafkaTemplate.send("test-topic", "KafkaValue");
Kafka 消息消费者是从 Kafka 主题接收消息的组件。可以使用 @KafkaListener
注解来创建消费者,
消费者会自动监听指定的主题
消费者类示例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
// @KafkaListener(topics = "test-topic", groupId = "test-group")
// public void listen(ConsumerRecord record) {
// System.out.println(record.key());
// System.out.println(record.value());
// }
// 使用 @KafkaListener 注解标注消费的主题和消费者组
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
在 @KafkaListener
中,topics
指定消费的主题,groupId
指定消费者组。
@KafkaListener
@KafkaListener
是 Spring Kafka 中最常用的注解,用于定义 Kafka 消费者。它标记一个方法为 Kafka 消费者,并指示该方法监听一个或多个 Kafka 主题。
topics
:指定要监听的 Kafka 主题(可以指定多个主题)。groupId
:指定消费者组 ID(如果未指定,将使用默认值)。containerFactory
:指定自定义的 Kafka 消费者容器工厂,通常用于处理消息的序列化和反序列化。示例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
@KafkaListener
注解标记的方法会监听指定的 Kafka 主题(test-topic
)并接收来自该主题的消息。groupId
定义了消费者组 ID,多个消费者可以属于同一个组,从而实现消息的负载均衡。@KafkaHandler
@KafkaHandler
用于处理复杂消息类型时,将不同的消息类型分配到不同的方法。
@KafkaHandler
用于区分消息处理方法,通常在处理不同类型的消息时使用。@KafkaHandler
标记不同的方法来处理每种类型。示例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
@KafkaListener(topics = "test-topic")
public class KafkaConsumer {
@KafkaHandler
public void handleMessage(String message) {
System.out.println("Received string message: " + message);
}
@KafkaHandler
public void handleOtherMessage(Integer message) {
System.out.println("Received integer message: " + message);
}
}
@KafkaListener
注解指定了监听 test-topic
主题,而 @KafkaHandler
注解则将不同类型的消息分配到不同的方法。String
,handleMessage
会处理;如果消息类型是 Integer
,handleOtherMessage
会处理。@KafkaListeners
@KafkaListeners
是一个容器注解,它允许你在一个类上定义多个 @KafkaListener
注解。
@KafkaListeners
用于在一个类上同时声明多个 @KafkaListener
注解,这样就能监听多个主题或使用不同的消费者组。@KafkaListeners
来批量定义。示例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListeners;
import org.springframework.stereotype.Service;
@Service
@KafkaListeners({
@KafkaListener(topics = "topic1", groupId = "group1"),
@KafkaListener(topics = "topic2", groupId = "group2")
})
public class KafkaConsumer {
@KafkaListener(topics = "topic1", groupId = "group1")
public void listenTopic1(String message) {
System.out.println("Received message from topic1: " + message);
}
@KafkaListener(topics = "topic2", groupId = "group2")
public void listenTopic2(String message) {
System.out.println("Received message from topic2: " + message);
}
}
@KafkaListeners
允许你在一个类上声明多个 @KafkaListener
注解,分别监听不同的主题,并在方法中处理不同的消息。在 Spring Boot 中使用 Kafka,主要涉及配置 Kafka 的生产者(Producer)和消费者(Consumer)。
生产者用于将消息发送到 Kafka 主题。常用的生产者配置项如下:
application.properties
):# Kafka 生产者配置
spring.kafka.producer.bootstrap-servers=localhost:9092 # Kafka 集群地址 多个地址用 , 隔开
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer # 默认的键的序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 默认的值的序列化器
spring.kafka.producer.acks=all # 确认方式
spring.kafka.producer.retries=3 # 生产者发送失败时的重试次数
spring.kafka.producer.batch-size=16384 # 批量发送的最大消息数(单位:字节)16k
spring.kafka.producer.linger-ms=1 # 生产者等待更多消息的时间(单位:毫秒)
spring.kafka.producer.buffer-memory=33554432 # 内存缓冲区的大小(单位:字节)
spring.kafka.producer.bootstrap-servers
:
localhost:9092
。spring.kafka.producer.key-serializer
:
StringSerializer
、IntegerSerializer
等。org.apache.kafka.common.serialization.StringSerializer
。spring.kafka.producer.value-serializer
:
key-serializer
类似,value-serializer
用于指定消息体(值)的序列化器。org.apache.kafka.common.serialization.StringSerializer
。spring.kafka.producer.acks
:
0
:不等待确认,即消息发送后立即返回。1
:等待 Leader 节点确认。all
:等待所有副本确认(保证消息可靠性,推荐使用)。all
。spring.kafka.producer.retries
:
0
,如果为 -1
,则会一直重试,直到成功。3
。spring.kafka.producer.batch-size
:
16384
字节(16 KB)。spring.kafka.producer.linger-ms
:
0
,即立即发送。如果设置为正数,生产者会等到这个时间,尝试批量发送更多的消息,提高吞吐量。1
毫秒。spring.kafka.producer.buffer-memory
:
33554432
字节(32 MB)。消费者用于从 Kafka 中消费消息。常用的消费者配置项如下:
application.properties
):# Kafka 消费者配置
spring.kafka.consumer.bootstrap-servers=localhost:9092 # Kafka 集群地址
spring.kafka.consumer.group-id=my-group1 # 默认的消费者组,如果@KafkaListener注解没有写消费者组就会使用这个消费者组
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 默认的键的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 默认的值的反序列化器
spring.kafka.consumer.auto-offset-reset=earliest # 消费者如何处理未提交的消息
spring.kafka.consumer.enable-auto-commit=true # 是否自动提交消费偏移量
spring.kafka.consumer.fetch-min-bytes=50000 # 从 Kafka 拉取消息的最小字节数
spring.kafka.consumer.max-poll-records=500 # 每次拉取的最大消息数量
spring.kafka.consumer.session-timeout=15000 # 会话超时时间(单位:毫秒)
spring.kafka.consumer.bootstrap-servers
:
localhost:9092
。spring.kafka.consumer.group-id
:
my-group
。spring.kafka.consumer.key-deserializer
:
key-serializer
配置相对,消费者需要对消息的键进行反序列化。常用的反序列化器有 StringDeserializer
、IntegerDeserializer
等。org.apache.kafka.common.serialization.StringDeserializer
。spring.kafka.consumer.value-deserializer
:
value-serializer
配置相对,消费者需要对消息的值进行反序列化。org.apache.kafka.common.serialization.StringDeserializer
。spring.kafka.consumer.auto-offset-reset
:
earliest
:从最早的消息开始消费。latest
:从最新的消息开始消费(默认行为)。earliest
。spring.kafka.consumer.enable-auto-commit
:
true
表示自动提交,false
表示手动提交。自动提交方式不适合复杂的消费逻辑,手动提交可以确保消费的消息被正确处理。true
。spring.kafka.consumer.fetch-min-bytes
:
50000
字节(50 KB)。spring.kafka.consumer.max-poll-records
:
500
。spring.kafka.consumer.session-timeout
:
15000
毫秒(15 秒)。application.properties
):# Kafka 生产者高级配置
spring.kafka.producer.compression-type=gzip # 压缩类型(支持 gzip, snappy, lz4, zstd)
spring.kafka.producer.max-request-size=1048576 # 请求的最大字节数
# Kafka 消费者高级配置
spring.kafka.consumer.isolation-level=read_committed # 读取事务提交的消息
spring.kafka.consumer.max-poll-interval=300000 # 最大轮询间隔(单位:毫秒)
spring.kafka.consumer.fetch-max-wait=500 # 最大拉取等待时间(单位:毫秒)
spring.kafka.producer.compression-type
:
gzip
、snappy
、lz4
和 zstd
。spring.kafka.producer.max-request-size
:
spring.kafka.consumer.isolation-level
:
read_committed
表示只读取已提交的消息,read_uncommitted
表示可以读取未提交的消息。spring.kafka.consumer.max-poll-interval
:
spring.kafka.consumer.fetch-max-wait
:
参与评论
手机查看
返回顶部