kafka-python发送消息速率上不去该怎么办

科技一点鑫得 2024-03-09 00:44:05

最近需要模拟消息源向kafka发送大量消息评估产品性能,我使用的是kafka-python库(pip install kafka-python进行安装)编写发送消息脚本,kafka发送消息部分的关键代码如下:

from kafka import KafkaProducerclass Config: KAFKA = { "brokers": ["x.x.x.x:9091"], "sasl": True, "username": "xxxx", "password": "xxxx", "topic": "perftest" }class KafkaTookit(): def __init__(self, config=Config.KAFKA): self.topic = config["topic"] if config["sasl"]: self.producer = KafkaProducer( sasl_mechanism="PLAIN", security_protocol='SASL_PLAINTEXT', sasl_plain_username=config["username"], sasl_plain_password=config["password"], bootstrap_servers=config["brokers"], key_serializer=lambda v: v.encode(), value_serializer=lambda v: v.encode() ) else: self.producer = KafkaProducer( bootstrap_servers=config["brokers"], key_serializer=lambda v: v.encode(), value_serializer=lambda v: v.encode() ) def send(self, msg, key): self.producer.send(self.topic, value=msg, key=key)

脚本写好之后测试发现消息发送速率上不去,一秒钟只能发送1000个消息左右,增加为多线程方式发送速率也没有明显提升。经过一番折腾,最终通过官方api文档找到了消息发送速率上不去的关键原因。创建KafkaProducer对象有几个影响发送吞吐量的关键参数:

acks:默认值为1,表示发送消息会等待记录写入本地日志,这个参数应该是关键原因,显然设置为0应该会增加发送吞吐量;

batch_size:kafka生产者会缓存一定大小的数据再批量发送,缓存的大小通过batch_size指定,值越大批量发送网络请求次数就越少,吞吐量就会越大,可以根据实际场景适量调大。

linger_ms:linger_ms和batch_size类似,表示间隔linger_ms时间批量发送一次数据,间隔越大、一次缓存的数据就越多,吞吐量就越大,默认值是5ms,可以适当调大。

compression_type:默认不启用任何压缩,启用gzip压缩可以提升吞吐量。

理解了上面几个参数的含义,尝试设置上面几个参数再次进行测试发送速率提升非常明显,代码参考如下:

from kafka import KafkaProducerclass Config: KAFKA = { "brokers": ["x.x.x.x:9091"], "sasl": True, "username": "xxxx", "password": "xxxx", "topic": "perftest" }class KafkaTookit(): def __init__(self, config=Config.KAFKA): self.topic = config["topic"] if config["sasl"]: self.producer = KafkaProducer( sasl_mechanism="PLAIN", security_protocol='SASL_PLAINTEXT', sasl_plain_username=config["username"], sasl_plain_password=config["password"], bootstrap_servers=config["brokers"], batch_size=163840, acks=0, compression_type="gzip", linger_ms=3000, key_serializer=lambda v: v.encode(), value_serializer=lambda v: v.encode() ) else: self.producer = KafkaProducer( bootstrap_servers=config["brokers"], batch_size=163840, acks=0, compression_type="gzip", linger_ms=3000, key_serializer=lambda v: v.encode(), value_serializer=lambda v: v.encode() ) def send(self, msg_key): msg, key = msg_key self.producer.send(self.topic, value=msg, key=key)参考文献

[1] . kafka-python · PyPI

0 阅读:0

科技一点鑫得

简介:感谢大家的关注