第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题

破局之路课程 2024-03-21 07:03:05
1.第一个kafka程序1.1.创建我们的主题

kafka-topics.bat --zookeeper localhost:2181/kafka --create --topic hello-kafka --replication-factor 1 --partitions 4

(主题不创建,可能会造成程序报错,也可在程序中配置如:spring.kafka.listener.missing-topics-fatal=false

1.2.生产者发送消息

引入jar:

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.3.0</version>

</dependency>

生产者代码示例:

1.2.1.必选属性

创建生产者对象时有三个属性必须指定。

bootstrap.servers

该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查询其他 broker 的信息。

不过最少提供 2 个 broker 的信息(用逗号分隔,比如: 127.0.0.1:9092,192.168.0.13:9092),一旦其中一个宕机,生产者仍能连接到集群上。

key.serializer

生产者接口允许使用参数化类型,可以把 Java 对象作为键和值传 broker,但是 broker 希望收到的消息的键和值都是字节数组,所以,必须提供将对象序列化成字节数组的序列化器。key.serializer 必须设置为实现org.apache.kafka.common.serialization.Serializer 的接口类,Kafka 的客户端默认提供了ByteArraySerializer,IntegerSerializer, StringSerializer,也可以实现自定义的序列化器。

value.serializer

同 key.serializer。

1.3.消费者接受消息

代码示例:

1.3.1.必选参数

bootstrap.servers、key.serializer、value.serializer 含义同生产者

group.id

并非完全必需,它指定了消费者属于哪一个群组,但是创建不属于任何一个群组的消费者并没有问题。

新版本特点:poll(Duration)这个版本修改了这样的设计,会把元数据获取也计入整个超时时间(更加的合理)

1.4.演示示例

1.默认创建主题,只有一个分区时,演示生产者和消费者情况。

2.修改主题分区为 2(使用管理命令),再重新演示生产者和消费者情况。

2.Kafka 的生产者2.1.生产者发送消息的基本流程

从创建一个 ProducerRecord 对象开始, Producer Record 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。

接下来,数据被传给分区器。如果之前在 Producer Record 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 Producer Record 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里(双端队列,尾部写入),这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka ,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

生产者发送消息一般会发生两类错误:

一类是可重试错误,比如连接错误(可通过再次建立连接解决)、无主 no leader(可通过分区重新选举首领解决)。

另一类是无法通过重试解决,比如“消息太大”异常,具体见 message.max.bytes,这类消息不会进行任何重试,直接抛出异常

2.2.Kafka 三种发送方式

我们通过生成者的 send 方法进行发送。send 方法会返回一个包含 RecordMetadata 的 Future 对象。RecordMetadata 里包含了目标主题,分区信息和消息的偏移量。

2.2.1.发送并忘记

忽略 send 方法的返回值,不做任何处理。大多数情况下,消息会正常到达,而且生产者会自动重试,但有时会丢失消息。

2.2.2.同步发送

获得 send 方法返回的 Future 对象,在合适的时候调用 Future 的 get 方法。

2.2.3.异步发送

实现接口 org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递给 send 方法。

2.3.更多发送配置

生产者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。可以参考org.apache.kafka.clients.producer 包下的 ProducerConfig 类。

acks:

Kafk 内部的复制机制是比较复杂的,这里不谈论内部机制(后续章节进行细讲),我们只讨论生产者发送消息时与副本的关系。

指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。

acks=0:生产者在写入消息之前不会等待任 何来自服务器的响应,容易丢消息,但是吞吐量高。

acks=1:只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。

acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。

金融业务,主备外加异地灾备。所以很多高可用场景一般不是设置 2 个副本,有可能达到 5 个副本,不同机架上部署不同的副本,异地上也部署一套副本。

buffer.memory

设置生产者内存缓冲区的大小(结合生产者发送消息的基本流程),生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向 broker 发送的速度,导致生产者空间不足,producer 会阻塞或者抛出异常。缺省 33554432 (32M)

max.block.ms

指定了在调用 send()方法或者使用 partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。缺省 60000ms

retries

发送失败时,指定生产者可以重发消息的次数(缺省 Integer.MAX_VALUE)。默认情况下,生产者在每次重试之间等待 100ms,可以通过参数retry.backoff.ms 参数来改变这个时间间隔。

receive.buffer.bytes 和 send.buffer.bytes

指定 TCP socket 接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。缺省 102400

batch.size

当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送。缺省16384(16k) ,如果一条消息超过了批次的大小,会写不进去。

linger.ms

指定了生产者在发送批次前等待更多消息加入批次的时间。它和 batch.size 以先到者为先。也就是说,一旦我们获得消息的数量够 batch.size 的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比 batch.size 设置要小的多,我们需要“linger”特定的时间以获取更多的消息。这个设置默认为 0,即没有延迟。设定 linger.ms=5,例如,将会减少请求数目,但是同时会增加 5ms 的延迟,但也会提升消息的吞吐量。

compression.type

producer 用于压缩数据的压缩类型。默认是无压缩。正确的选项值是 none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。snappy 占用 cpu 少,提供较好的性能和可观的压缩比,如果比较关注性能和网络带宽,用这个。如果带宽紧张,用 gzip,会占用较多的 cpu,但提供更高的压缩比。

client.id

当向 server 发出请求时,这个字符串会发送给 server。目的是能够追踪请求源头,以此来允许 ip/port 许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪。

max.in.flight.requests.per.connection

指定了生产者在接收到服务器响应之前可以发送多个消息,值越高,占用的内存越大,当然也可以提升吞吐量。发生错误时,可能会造成数据的发送顺序改变,默认是 5 (修改)。

如果需要保证消息在一个分区上的严格顺序,这个值应该设为 1。不过这样会严重影响生产者的吞吐量。

request.timeout.ms

客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常,默认 30 秒。

metadata.fetch.timeout.ms

是指我们所获取的一些元数据的第一个时间数据。元数据包含:topic,host,partitions。此项配置是指当等待元数据 fetch 成功完成所需要的时间,否则会跑出异常给客户端

max.request.size

控制生产者发送请求最大大小。默认这个值为 1M,如果一个请求里只有一个消息,那这个消息不能大于 1M,如果一次请求是一个批次,该批次包含了 1000 条消息,那么每个消息不能大于 1KB。注意:broker 具有自己对消息记录尺寸的覆盖,如果这个尺寸小于生产者的这个设置,会导致消息被拒绝。这个参数和 Kafka 主机的 message.max.bytes 参数有关系。如果生产者发送的消息超过 message.max.bytes 设置的大小,就会被 Kafka 服务器拒绝。

以上参数不用全记住,一般来说,就记住 acks、batch.size、linger.ms、max.request.size 就行了,因为这 4 个参数重要些,其他参数一般没有太大必要调整

2.4.顺序保证

Kafka 可以保证同一个分区里的消息是有序的。也就是说,发送消息时,主题只有且只有一个分区,同时生产者按照一定的顺序发送消息, broker 就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下, 顺序是非常重要的。例如,往一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!不过,有些场景对顺序不是很敏感。

如果把 retires 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功, broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。

一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retires 设为 0(不重试的话消息可能会因为连接关闭等原因会丢) 。所以还是需要重试,同时把 max.in.flight.request.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发

送给 broker 。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

2.5.序列化

创建生产者对象必须指定序列化器,默认的序列化器并不能满足我们所有的场景。我们完全可以自定义序列化器。只要实现org.apache.kafka.common.serialization.Serializer 接口即可。

2.5.1.自定义序列化需要考虑的问题

自定义序列化容易导致程序的脆弱性。举例,在我们上面的实现里,我们有多种类型的消费者,每个消费者对实体字段都有各自的需求,比如,有的将字段变更为 long 型,有的会增加字段,这样会出现新旧消息的兼容性问题。特别是在系统升级的时候,经常会出现一部分系统升级,其余系统被迫跟着升级的情况。

解决这个问题,可以考虑使用自带格式描述以及语言无关的序列化框架。比如 Protobuf,或者 Kafka 官方推荐的 Apache Avro。

Avro 会使用一个 JSON 文件作为 schema 来描述数据,Avro 在读写时会用到这个 schema,可以把这个 schema 内嵌在数据文件中。这样,不管数据格式如何变动,消费者都知道如何处理数据。

但是内嵌的消息,自带格式,会导致消息的大小不必要的增大,消耗了资源。我们可以使用 schema 注册表机制,将所有写入的数据用到的 schema保存在注册表中,然后在消息中引用 schema 的标识符,而读取的数据的消费者程序使用这个标识符从注册表中拉取 schema 来反序列化记录。

注意:Kafka 本身并不提供 schema 注册表,需要借助第三方,现在已经有很多的开源实现,比如 Confluent Schema Registry,可以从 GitHub 上获取。

如何使用参考如下网址:

https://cloud.tencent.com/developer/article/1336568

2.6.分区

我们在新增 ProducerRecord 对象中可以看到,ProducerRecord 包含了目标主题,键和值,Kafka 的消息都是一个个的键值对。键可以设置为默认的 null。

键的主要用途有两个:一,用来决定消息被写往主题的哪个分区,拥有相同键的消息将被写往同一个分区,二,还可以作为消息的附加消息。

如果键值为 null,并且使用默认的分区器,分区器使用轮询算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用默认的分区器,Kafka 对键进行散列(Kafka 自定义的散列算法,具体算法原理不知),然后根据散列值把消息映射到特定的分区上。很明显,同一个键总是被映射到同一个分区。但是只有不改变主题分区数量的情况下,键和分区之间的映射才能保持不变,一旦增加了新的分区,就无法保证了,所以如果要使用键来映射分区,那就要在创建主题的时候把分区规划好,而且永远不要增加新分区。

2.6.1.自定义分区器

某些情况下,数据特性决定了需要进行特殊分区,比如电商业务,北京的业务量明显比较大,占据了总业务量的 20%,我们需要对北京的订单进行单独分区处理,默认的散列分区算法不合适了, 我们就可以自定义分区算法,对北京的订单单独处理,其他地区沿用散列分区算法。或者某些情况下,我们用 value 来进行分区。

3.Kafka 的消费者3.1.消费者的入门

消费者的含义,同一般消息中间件中消费者的概念。在高并发的情况下,生产者产生消息的速度是远大于消费者消费的速度,单个消费者很可能会负担不起,此时有必要对消费者进行横向伸缩,于是我们可以使用多个消费者从同一个主题读取消息,对消息进行分流。(买单的故事,群组,消费者的一群人, 消费者:买单的,分区:一笔单,一笔单能被买单一次,当然一个消费者可以买多个单,如果有一个消费者挂掉了<跑单了>,另外的消费者接上)

3.2.消费者群组

Kafka 里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。

上图,主题 T 有 4 个分区,群组中只有一个消费者,则该消费者将收到主题 T1 全部 4 个分区的消息。

如上图,在群组中增加一个消费者 2,那么每个消费者将分别从两个分区接收消息,上图中就表现为消费者 1 接收分区 1 和分区 3 的消息,消费者 2接收分区 2 和分区 4 的消息。

如上图,在群组中有 4 个消费者,那么每个消费者将分别从 1 个分区接收消息。

但是,当我们增加更多的消费者,超过了主题的分区数量,就会有一部分的消费者被闲置,不会接收到任何消息。

往消费者群组里增加消费者是进行横向伸缩能力的主要方式。所以我们有必要为主题设定合适规模的分区,在负载均衡的时候可以加入更多的消费者。但是要记住,一个群组里消费者数量超过了主题的分区数量,多出来的消费者是没有用处的。

如果是多个应用程序,需要从同一个主题中读取数据,只要保证每个应用程序有自己的消费者群组就行了。

3.3.消费者配置

消费者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。可以参考

org.apache.kafka.clients.consumer 包下 ConsumerConfig 类。

auto.offset.reset

消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理。默认值是 latest,从最新的记录开始读取,另一个值是 earliest,表示消费者从起始位置读取分区的记录。

注意:如果是消费者在读取一个没有偏移量的分区或者偏移量无效的情况(因消费者长时间失效,包含的偏移量记录已经过时并被删除)下,默认值是 latest 的话,消费者将从最新的记录开始读取数据( 在消费者启动之后生成的记录),可以先启动生产者,再启动消费者,观察到这种情况。观察代

码,在模块 kafka-no-spring 下包 hellokafka 中。

enable .auto.commit

默认值 true,表明消费者是否自动提交偏移。为了尽量避免重复数据和数据丢失,可以改为 false,自行控制何时提交。

partition.assignment.strategy

分区分配给消费者的策略。系统提供两种策略。默认为 Range。允许自定义策略。

Range

把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)

RoundRobin

把主题的分区循环分配给消费者。

自定义策略

extends 类 AbstractPartitionAssignor,然后在消费者端增加参数:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 类.class.getName());即可。

max.poll.records

控制每次 poll 方法返回的的记录数量。

fetch.min.bytes

每次 fetch 请求时,server 应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为 1 个字节。多消费者下,可以设大这个值,以降低 broker 的工作负载

fetch.wait.max.ms

如果没有足够的数据能够满足 fetch.min.bytes,则此项配置是指在应答 fetch 请求之前,server 会阻塞的最大时间。缺省为 500 个毫秒。和上面的fetch.min.bytes 结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认 1MB。假设一个主题有 20 个分区和 5 个消费者,那么每个消费者至少要有 4MB 的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的 message.max.bytes 更大,否则消费者可能无法读取消息。

session.timeout.ms

如果 consumer 在这段时间内没有发送心跳信息,则它会被认为挂掉了。默认 3 秒。

client.id

当向 server 发出请求时,这个字符串会发送给 server。目的是能够追踪请求源头,以此来允许 ip/port 许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪。

receive.buffer.bytes 和 send.buffer.bytes

指定 TCP socket 接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

3.4.消费者中的基础概念

消费者的含义,同一般消息中间件中消费者的概念。在高并发的情况下,生产者产生消息的速度是远大于消费者消费的速度,单个消费者很可能会负担不起,此时有必要对消费者进行横向伸缩,于是我们可以使用多个消费者从同一个主题读取消息,对消息进行分流。

(买单的故事,群组,消费者的一群人, 消费者:买单的,分区:一笔单,一笔单能被买单一次,当然一个消费者可以买多个单,如果有一个消费者挂掉了<跑单了>,另外的消费者接上)

3.4.1.订阅

创建消费者后,使用 subscribe()方法订阅主题,这个方法接受一个主题列表为参数,也可以接受一个正则表达式为参数;正则表达式同样也匹配多个主题。如果新创建了新主题,并且主题名字和正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。比如,要订阅所有和 test相关的主题,可以 subscribe(“tets.*”)

3.4.2.轮询

为了不断的获取消息,我们要在循环中不断的进行轮询,也就是不停调用 poll 方法。

poll 方法的参数为超时时间,控制 poll 方法的阻塞时间,它会让消费者在指定的毫秒数内一直等待 broker 返回数据。poll 方法将会返回一个记录(消息)列表,每一条记录都包含了记录所属的主题信息,记录所在分区信息,记录在分区里的偏移量,以及记录的键值对。

poll 方法不仅仅只是获取数据,在新消费者第一次调用时,它会负责查找群组,加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。

3.4.3.提交和偏移量

当我们调用 poll 方法的时候,broker 返回的是生产者写入 Kafka 但是还没有被消费者读取过的记录,消费者可以使用 Kafka 来追踪消息在分区里的位置,我们称之为 偏移量。消费者更新自己读取到哪个消息的操作,我们称之为 提交。

消费者是如何提交偏移量的呢?消费者会往一个叫做_consumer_offset 的特殊主题发送一个消息,里面会包括每个分区的偏移量。

3.5.消费者中的核心概念3.5.1.多线程安全问题

KafkaConsumer 的实现不是线程安全的,所以我们在多线程的环境下,使用 KafkaConsumer 的实例要小心,应该每个消费数据的线程拥有自己的

3.5.2.群组协调

消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化(新加入或者掉线),主题中分区发生了变化(增加)时发生。

3.5.3.分区再均衡

当消费者群组里的消费者发生变化,或者主题里的分区发生了变化,都会导致再均衡现象的发生。从前面的知识中,我们知道,Kafka 中,存在着消费者对分区所有权的关系,这样无论是消费者变化,比如增加了消费者,新消费者会读取原本由其他消费者读取的分区,消费者减少,原本由它负责的分区要由其他消费者来读取,增加了分区,哪个消费者来读取这个新增的分区,这些行为,都会导致分区所有权的变化,这种变化就被称为 再均衡。

再均衡对 Kafka 很重要,这是消费者群组带来高可用性和伸缩性的关键所在。不过一般情况下,尽量减少再均衡,因为再均衡期间,消费者是无法读取消息的,会造成整个群组一小段时间的不可用。

消费者通过向称为群组协调器的 broker(不同的群组有不同的协调器)发送心跳来维持它和群组的从属关系以及对分区的所有权关系。如果消费者长时间不发送心跳,群组协调器认为它已经死亡,就会触发一次再均衡。

在 0.10.1 及以后的版本中,心跳由单独的线程负责,相关的控制参数为 max.poll.interval.ms。

3.6.Kafka 中的消费安全

一般情况下,我们调用 poll 方法的时候,broker 返回的是生产者写入 Kafka 同时 kafka 的消费者提交偏移量,这样可以确保消费者消息消费不丢失也不重复,所以一般情况下 Kafka 提供的原生的消费者是安全的,但是事情会这么完美吗?

3.7.消费者提交偏移量导致的问题

当我们调用 poll 方法的时候,broker 返回的是生产者写入 Kafka 但是还没有被消费者读取过的记录,消费者可以使用 Kafka 来追踪消息在分区里的位置,我们称之为 偏移量。消费者更新自己读取到哪个消息的操作,我们称之为 提交。

消费者是如何提交偏移量的呢?消费者会往一个叫做_consumer_offset 的特殊主题发送一个消息,里面会包括每个分区的偏移量。发生了再均衡之后,消费者可能会被分配新的分区,为了能够继续工作,消费者者需要读取每个分区最后一次提交的偏移量,然后从指定的地方,继续做处理。

分区再均衡的例子:某软件公司,有一个项目,有两块的工作,有两个码农,一个负责一块,干得好好的。突然一天,小王桌子一拍不干了,老子中了 5 百万了,不跟你们玩了,立马收拾完电脑就走了。然后你今天刚好入职,一个萝卜一个坑,你就入坑了。这个过程我们就好比我们的分区再均衡,分区就是一个项目中的不同块的工作,消费者就是码农,一个码农不玩了,另一个码农立马顶上,这个过程就发生了分区再均衡

1)如果提交的偏移量小于消费者实际处理的最后一个消息的偏移量,处于两个偏移量之间的消息会被重复处理,

2)如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

所以, 处理偏移量的方式对客户端会有很大的影响 。KafkaConsumer API 提供了很多种方式来提交偏移量 。

3.7.1.自动提交(重复消费不可避免)

最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.comnit 被设为 true,消费者会自动把从 poll()方法接收到的最大偏移量提交上去。

提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。自动提交是在轮询里进行的,消费者每次在进行轮询时会检査是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。

假设我们仍然使用默认的5s提交时间间隔, 在最近一次提交之后的3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。

这个时候偏移量已经落后了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量, 减小可能出现重复消息的时间窗, 不过这种情况是无法完全避免的 。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的最大偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(enable.auto.comnit 被设为 true 时,在调用 close()方法之前也会进行自动提交)。一般情况下不会有什么

问题,不过在处理异常或提前退出轮询时要格外小心。

自动提交虽然方便,但是很明显是一种基于时间提交的方式,不过并没有为我们留有余地来避免重复处理消息。

3.7.2.手动提交(同步)

我们通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者 API 提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。

把 auto.commit. offset 设为 false,自行决定何时提交偏移量。使用 commitsync()提交偏移量最简单也最可靠。这个方法会提交由 poll()方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

注意:commitsync()将会提交由 poll()返回的最新偏移量,所以在处理完所有记录后要确保调用了 commitsync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近批消息到发生再均衡之间的所有消息都将被重复处理。

只要没有发生不可恢复的错误,commitSync()方法会阻塞,会一直尝试直至提交成功,如果失败,也只能记录异常日志。

3.7.3.异步提交

手动提交时,在 broker 对提交请求作出回应之前,应用程序会一直阻塞。这时我们可以使用异步提交 API,我们只管发送提交请求,无需等待 broker的响应。

在成功提交或碰到无法恢复的错误之前, commitsync()会一直重试,但是 commitAsync 不会。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。

假设我们发出一个请求用于提交偏移量 2000,,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync()重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,

就会出现重复消息。

commitAsync()也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标。

3.7.4.同步和异步组合

因为同步提交一定会成功、异步可能会失败,所以一般的场景是同步和异步一起来做。

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或 再均衡前的最后一次提交,就要确保能够提交成功。

因此,在消费者关闭前一般会组合使用 commitAsync()和 commitsync()。具体使用,参见模块 kafka-no-spring 下包 commit 包中代码 SyncAndAsync。

3.7.5.特定提交

在我们前面的提交中,提交偏移量的频率与处理消息批次的频率是一样的。但如果想要更频繁地提交该怎么办?

如果 poll()方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用commitSync()或 commitAsync()来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

消费者 API 允许在调用 commitsync()和 commitAsync()方法时传进去希望提交的分区和偏移量的 map。假设我们处理了半个批次的消息,最后一个来自主题“customers”,分区 3 的消息的偏移量是 5000,你可以调用 commitsync()方法来提交它。不过,因为消费者可能不只读取一个分区,因为我们需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。

3.8.分区再均衡

3.8.1.再均衡监听器

在提交偏移量一节中提到过,消费者在退出和进行分区再均衡之前,会做一些清理工作比如,提交偏移量、关闭文件句柄、数据库连接等。

在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe()方法时传进去一个 ConsumerRebalancelistener实例就可以了。

ConsumerRebalancelistener 有两个需要实现的方法。

1) public void onPartitionsRevoked( Collection< TopicPartition> partitions)方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了

2) public void onPartitionsAssigned( Collection< TopicPartition> partitions)方法会在重新分配分区之后和消费者开始读取消息之前被调用。

3.8.2.从特定偏移量处开始记录

到目前为止,我们知道了如何使用 poll()方法从各个分区的最新偏移量处开始处理消息。不过,有时候我们也需要从特定的偏移量处开始读取消息。

如果想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息,可以使 seekToBeginning(Collectiontp)和seekToEnd( Collectiontp)这两个方法。

不过,Kaka 也为我们提供了用于查找特定偏移量的 API。它有很多用途,比如向后回退几个消息或者向前跳过几个消息(对时间比较敏感的应用程序在处理滞后的情况下希望能够向前跳过若干个消息)。在使用 Kafka 以外的系统来存储偏移量时,它将给我们带来更大的惊喜--让消息的业务处理和偏移量的提

交变得一致。试想一下这样的场景:应用程序从 Kaka 读取事件(可能是网站的用户点击事件流),对它们进行处理(可能是使用自动程序清理点击操作并添加会话信息),然后把结果保存到数据库。假设我们真的不想丢失任何数据,也不想在数据库里多次保存相同的结果。

我们可能会,毎处理一条记录就提交一次偏移量。尽管如此,在记录被保存到数据库之后以及偏移量被提交之前,应用程序仍然有可能发生崩溃,导致重复处理数据,数据库里就会出现重复记录。

如果保存记录和偏移量可以在一个原子操作里完成,就可以避免出现上述情况。记录和偏移量要么都被成功提交,要么都不提交。如果记录是保存在数据库里而偏移量是提交到Kafka上,那么就无法实现原子操作不过,如果在同一个事务里把记录和偏移量都写到数据库里会怎样呢?那么我们就会知道记录和偏移量要么都成功提交,要么都没有,然后重新处理记录。

现在的问题是:如果偏移量是保存在数据库里而不是 Kafka 里,那么消费者在得到新分区时怎么知道该从哪里开始读取?这个时候可以使用 seek()方法。在消费者启动或分配到新分区时,可以使用 seck()方法查找保存在数据库里的偏移量。我们可以使用使用 Consumer Rebalancelistener 和 seek()方法确保我们是从数据库里保存的偏移量所指定的位置开始处理消息的。

3.9.优雅退出

如果确定要退出循环,需要通过另一个线程调用 consumer. wakeup()方法。如果循环运行在主线程里,可以在 ShutdownHook 里调用该方法。要记住,consumer. wakeup()是消费者唯一一个可以从其他线程里安全调用的方法。调用 consumer. wakeup()可以退出 poll(),并抛出 WakeupException 异常。我们不需要处理 Wakeup Exception,因为它只是用于跳出循环的一种方式。不过,在退出线程之前调用 consumer.close()是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时。

3.10.反序列化

不过就是序列化过程的一个反向,原理和实现可以参考生产者端的实现,同样也可以自定义反序列化器。

3.11.独立消费者

到目前为止,我们讨论了消费者群组,分区被自动分配给群组里的消费者,在群组里新增或移除消费者时自动触发再均衡。不过有时候可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。

如果是这样的话,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。

独立消费者相当于自己来分配分区,但是这样做的好处是自己控制,但是就没有动态的支持了,包括加入消费者(分区再均衡之类的),新增分区,这些都需要代码中去解决,所以一般情况下不推荐使用。

0 阅读:0

破局之路课程

简介:感谢大家的关注