Kafka TimeoutException 通常是由于客户端与 Kafka 服务器之间的通信超时引起的。这可能是由于网络问题、服务器负载过高或者客户端配置不当等原因导致的。为了优化连接池,你可以尝试以下方法:
connection.timeout.ms
和 request.timeout.ms
。Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("connection.timeout.ms", "60000"); // 增加连接超时时间到 60 秒
props.put("request.timeout.ms", "60000"); // 增加请求超时时间到 60 秒
session.timeout.ms
。props.put("session.timeout.ms", "30000"); // 增加会话超时时间到 30 秒
max.partition.fetchers
和 max.in.flight.requests.per.connection
来实现。props.put("max.partition.fetchers", "16"); // 增加最大分区获取者数
props.put("max.in.flight.requests.per.connection", "5"); // 增加每个连接的最大未确认请求数
Properties props = new Properties();
// ... 其他配置 ...
// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 使用连接池
consumer.setPartitionsPerTopic(10); // 每个主题的分区数
consumer.setFetchMaxBytes(1048576); // 每次拉取的最大字节数
consumer.setFetchMinBytes(1); // 每次拉取的最小字节数
consumer.setFetchWaitMaxMs(500); // 拉取等待的最大时间
consumer.setMaxPollRecords(500); // 每次轮询的最大记录数
consumer.setMaxPartitionFetchBytes(1048576); // 每个分区拉取的最大字节数
通过以上方法,你可以尝试优化 Kafka 客户端的连接池配置,以减少 TimeoutException 的发生。