Kafka 拦截器、生产者的TCP连接、幂等与事务生产者
参考 Kafka 核心技术与实战 (opens new window) 第 12-14 讲
# 1. Kafka 拦截器
Kafka 拦截器是一个不太常用但很高级实用的功能。
# 1.1 什么是拦截器?
拦截器的基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。下面这张图展示了 Spring MVC 拦截器的工作原理:
拦截器可以动态插入到应用程序中,从而快速地切换不同的拦截器而不影响主程序逻辑。Kafka 拦截器借鉴了这样的设计思路。你可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后。
# 1.2 Kafka 拦截器
Kafka 拦截器分为生产者拦截器和消费者拦截器。
- 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑。
- 消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。
值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。
举个例子,假设你想在生产消息前执行两个“前置动作”:第一个是为消息增加一个头信息,封装发送该消息的时间,第二个是更新发送消息数字段,那么当你将这两个拦截器串联在一起统一指定给 Producer 后,Producer 会按顺序执行上面的动作,然后再发送消息。
当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。拿上面的例子来说,假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定拦截器:
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……
2
3
4
5
6
# 1.2.1 生产者拦截器
生产者拦截器的实现类应当继承 ProducerInterceptor 接口,该接口是 Kafka 提供的,里面有两个核心方法:
onSend
:该方法会在消息发送之前被调用。onAcknowledgement
:该方法会在消息成功提交或发送失败之后被调用。且会早于发送回调 callback 前。
注意点:值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。
# 1.2.2 消费者拦截器
消费者拦截器的实现类要实现 ConsumerInterceptor 接口,并需要你实现两个核心方法:
onConsume
:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。onCommit
:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
一定要注意的是,指定拦截器类时要指定它们的全限定名,即 full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。
# 1.3 典型使用场景
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
# 1.3.1 端到端系统性能检测
Kafka 默认的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去追踪集群间消息的流转路径。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。
能够与主业务逻辑解耦的方法就是,现在,通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据。这就是 Kafka 拦截器的一个非常典型的使用场景。
# 1.3.2 消息审计(message audit)
设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能。
作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。
# 1.4 案例分享
下面我以一个具体的案例来说明一下拦截器的使用。在这个案例中,我们通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。
曾有一个公司的需求是,某个业务只有一个 Producer 和一个 Consumer,他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少,但是目前 Kafka 并没有提供这种端到端的延时统计。这可以通过拦截器来做到。
既然是要计算总延时,那么一定要有个公共的地方来保存它,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 Redis 中。Okay,这个需求显然要实现生产者拦截器,也要实现消费者拦截器。
生产者拦截器的实现:
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略 Jedis 初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
上面的代码比较关键的是在发送消息前更新总的已发送消息数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑。
下面是消费者端的拦截器实现,代码如下:
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; // 省略 Jedis 初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中。之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。
创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。
# 1.5 小结
Kafka 拦截器虽然冷门,但能实现很多特殊需求。
# 2. Java 生产者是如何管理 TCP 连接的?
今天分享的主题是:Kafka 的 Java 生产者是如何管理 TCP 连接的。
# 2.1 为何采用 TCP?
Kafka 中所有通信都是基于 TCP 协议。为什么不适用 HTTP 作为底层通信协议呢?
从社区的角度来看,在开发客户端时,人们能够利用 TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力。
多路复用请求
所谓的多路复用请求,即 multiplexing request,是指将两个或多个数据流合并到底层单一物理连接中的过程。TCP 的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。其实严格来说,TCP 并不能多路复用,它只是提供可靠的消息交付语义保证,比如自动重传丢失的报文。
更严谨地说,作为一个基于报文的协议,TCP 能够被用于多路复用连接场景的前提是,上层的应用协议(比如 HTTP)允许发送多条消息。不过,我们今天并不是要详细讨论 TCP 原理,因此你只需要知道这是社区采用 TCP 的理由之一就行了。
除了 TCP 提供的这些高级功能有可能被 Kafka 客户端的开发人员使用之外,社区还发现,目前已知的 HTTP 库在很多编程语言中都略显简陋。
基于这两个原因,Kafka 社区决定采用 TCP 协议作为所有请求通信的底层协议。
# 2.2 Kafka 生产者程序概览
Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步:
- 构造生产者对象所需的参数对象。
- 利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
- 使用 KafkaProducer 的 send 方法发送消息。
- 调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。
上面四步写成 Java 代码的话大概这个样子:
Properties props = new Properties ();
props.put(“参数 1”, “参数 1 的值”);
props.put(“参数 2”, “参数 2 的值”);
...
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<String, String>(...), callback);
...
}
2
3
4
5
6
7
8
这段代码使用了 Java 7 提供的 try-with-resource 特性,所以并没有显式调用 producer.close() 方法。无论是否显式调用 close 方法,所有生产者程序大致都是这个路数。
现在问题来了,当我们开发一个 Producer 应用时,生产者会向 Kafka 集群中指定的主题(Topic)发送消息,这必然涉及与 Kafka Broker 创建 TCP 连接。那么,Kafka 的 Producer 客户端是如何管理这些 TCP 连接的呢?
# 2.3 何时创建 TCP 连接?
我们首先要弄明白生产者代码是什么时候创建 TCP 连接的。就上面的那段代码而言,可能创建 TCP 连接的地方有两处:
- Producer producer = new KafkaProducer(props)
- producer.send(msg, callback)
你觉得连向 Broker 的 TCP 是何时创建的呢?
首先,生产者应用在创建 KafkaProducer 实例时是会建立与 Broker 的 TCP 连接的。其实这种表述也不是很准确,应该这样说:在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。
也就是还没有调用 send 方法,Producer 就要连接 Broker 了,但这时它也不知道要给哪个 topic 发消息,所以它是向 bootstrap.servers 参数指定的所有 Broker 发送消息。
我在这里稍微解释一下 bootstrap.servers 参数。它是 Producer 的核心参数之一,指定了这个 Producer 启动时要连接的 Broker 地址。请注意,这里的“启动时”,代表的是 Producer 启动时会发起与这些 Broker 的连接。因此,如果你为这个参数指定了 1000 个 Broker 连接信息,那么很遗憾,你的 Producer 启动时会首先创建与这 1000 个 Broker 的 TCP 连接。
所以在实际使用时,并不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3~4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker。
这样的设计真的对吗?
讲到这里,我有一些个人的看法想跟你分享一下。通常情况下,我都不认为社区写的代码或做的设计就一定是对的,因此,很多类似的这种“质疑”会时不时地在我脑子里冒出来。
拿今天的这个 KafkaProducer 创建实例来说,社区的官方文档中提及 KafkaProducer 类是线程安全的。我本人并没有详尽地去验证过它是否真的就是 thread-safe 的,但是大致浏览一下源码可以得出这样的结论:KafkaProducer 实例创建的线程和前面提到的 Sender 线程共享的可变数据结构只有 RecordAccumulator 类,故维护了 RecordAccumulator 类的线程安全,也就实现了 KafkaProducer 类的线程安全。
你不需要了解 RecordAccumulator 类是做什么的,你只要知道它主要的数据结构是一个 ConcurrentMap<TopicPartition, Deque>
。TopicPartition 是 Kafka 用来表示主题分区的 Java 对象,本身是不可变对象。而 RecordAccumulator 代码中用到 Deque 的地方都有锁的保护,所以基本上可以认定 RecordAccumulator 类是线程安全的。
说了这么多,我其实是想说,纵然 KafkaProducer 是线程安全的,我也不赞同创建 KafkaProducer 实例时启动 Sender 线程的做法。写了《Java 并发编程实践》的那位布赖恩·格茨(Brian Goetz)大神,明确指出了这样做的风险:在对象构造器中启动线程会造成 this 指针的逃逸。理论上,Sender 线程完全能够观测到一个尚未构造完成的 KafkaProducer 实例。当然,在构造对象时创建线程没有任何问题,但最好是不要同时启动它。
好了,我们言归正传。针对 TCP 连接何时创建的问题,目前我们的结论是这样的:TCP 连接是在创建 KafkaProducer 实例时建立的。那么,我们想问的是,它只会在这个时候被创建吗?
当然不是,TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。因为这两个地方并非总是创建 TCP 连接。当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地,当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。
接下来,我们来看看 Producer 更新集群元数据信息的两个场景。
- 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- 场景二:Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
讲到这里,我们可以“挑战”一下社区对 Producer 的这种设计的合理性。目前来看,一个 Producer 默认会向集群的所有 Broker 都创建 TCP 连接,不管是否真的需要传输请求。这显然是没有必要的。再加上 Kafka 还支持强制将空闲的 TCP 连接资源关闭,这就更显得多此一举了。
试想一下,在一个有着 1000 台 Broker 的集群中,你的 Producer 可能只会与其中的 3~5 台 Broker 长期通信,但是 Producer 启动后依次创建与这 1000 台 Broker 的 TCP 连接。一段时间之后,大约有 995 个 TCP 连接又被强制关闭。这难道不是一种资源浪费吗?很显然,这里是有改善和优化的空间的。
# 2.4 何时关闭 TCP 连接?
说完了 TCP 连接的创建,我们来说说它们何时被关闭。
Producer 端关闭 TCP 连接的方式有两种:
- 一种是用户主动关闭:比如调用
producer.close()
或 kill -9 主动杀掉 producer 应用。 - 一种是 Kafka 自动关闭:这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。当然这只是软件层面的“长连接”机制,由于 Kafka 创建的这些 Socket 连接都开启了 keepalive,因此 keepalive 探活机制还是会遵守的。
值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。
# 2.5 小结
总结一下 Kafka(2.1.0)中 Java Producer 端管理 TCP 连接的方式:
- KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
- KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
- 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。
- 如果设置 Producer 端 connections.max.idle.ms 参数大于 0,则步骤 1 中创建的 TCP 连接会被自动关闭;如果设置该参数 =-1,那么步骤 1 中创建的 TCP 连接将无法被关闭,从而成为“僵尸”连接。
# 3. 幂等生产者和事务生产者
今天分享的主题是:Kafka 消息交付可靠性保障以及精确处理一次语义的实现。
# 3.1 三种可靠性保障
所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:
- 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
- 至少一次(at least once):消息不会丢失,但有可能被重复发送。
- 精确一次(exactly once):消息不会丢失,也不会被重复发送。
# 3.1.1 如何提供“至少一次”的保障?
目前,Kafka 默认提供的交付可靠性保障是第二种,即至少一次。当 Producer 发送消息后没有收到确认,它就默认会再次发送相同的消息,这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。
# 3.1.2 如何提供“最多一次”的保障?
Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。
因为有些场景是允许偶尔的消息丢失但禁止消息重复,此时使用最多一次交付保障就是最恰当的。
# 3.1.3 “精确一次”
无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。
那么问题来了,Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性和事务。它们分别是什么机制?两者是一回事吗?要回答这些问题,我们首先来说说什么是幂等性。
# 3.2 幂等性(Idempotence)
幂等性指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。
幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。
# 3.3 幂等性 Producer
在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。在 version 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
实现幂等的大致原理
enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。
尽管设置简单,但我们必须了解幂等性 Producer 的作用范围。
- 首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
- 其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
那么你可能会问,如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!
# 3.4 事务(Transaction)
Kafka 的事务概念类似于我们熟知的数据库提供的事务。在数据库领域,事务提供的安全性保障是经典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。
但 ACID 的概念本身就很有歧义,比如对隔离性的理解,不同的数据库也往往提供了不同的隔离界别。
Kafka 0.11 版本后提供了对事务的支持,且目前主要在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
下面我们就来看看 Kafka 中的事务型 Producer。
# 3.5 事务型 Producer
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启 enable.idempotence = true。
- 设置 Producer 端参数 transctional.id。最好为其设置一个有意义的名字。
此外,你还需要在 Producer 代码中做一些调整,如这段代码所示:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
2
3
4
5
6
7
8
9
和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:
- read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
- read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
# 3.6 小结
简单来说,幂等性 Producer 和事务型 Producer 都是 Kafka 社区力图为 Kafka 实现精确一次处理语义所提供的工具,只是它们的作用范围是不同的:
- 幂等性 Producer 只能保证单分区、单会话上的消息幂等性;
- 事务能够保证跨分区、跨会话间的幂等性。
从交付语义上来看,自然是事务型 Producer 能做的更多。
不过,切记天下没有免费的午餐。比起幂等性 Producer,事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。