Kafka 异步回调日志记录可以通过以下步骤实现:
java.util.logging
包或第三方日志库(如 Log4j、SLF4J 等)。import java.util.logging.Logger;
public class KafkaConsumerCallback {
private static final Logger logger = Logger.getLogger(KafkaConsumerCallback.class.getName());
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerCallback implements ConsumerCallback<String, String> {
private static final Logger logger = Logger.getLogger(KafkaConsumerCallback.class.getName());
@Override
public void onConsume(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
logger.info("Received message: key = " + record.key() + ", value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());
}
}
}
src/main/resources
目录下创建一个 log4j.properties
文件,并进行相应的配置。# log4j.properties示例
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
这样,当 Kafka 异步回调接收到新的消息时,日志记录器会自动记录相应的日志信息。