package com.tplink.cloud.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* spring-kafka 消费者
*/
@Slf4j
@Component
public class SpringKafkaSingleConsumer {
/**
* 因为有多个消费者,所以先用命令创建一个多分区的主题(防止出现消费者没有分区可以消费)
*/
public static final String SPRING_TEST_TOPIC = "SPRING_TEST_TOPIC";
public static final String SPRING_TEST_GROUP_A = "SPRING_TEST_GROUP_A";
public static final String SPRING_TEST_GROUP_A_CLIENT_B = "SPRING_TEST_GROUP_A_CLIENT_B";
@KafkaListener(topics = {SPRING_TEST_TOPIC}, groupId = "SingleConsumer", id = SPRING_TEST_GROUP_A_CLIENT_B)
public void consumeB(List<ConsumerRecord<String, String>> records, Acknowledgment ack) throws InterruptedException{
for (ConsumerRecord<String, String> record : records){
log.info("SingleConsumer:topic = {}, offset = {}, value = {}", record.topic(),record.offset(),record.value());
}
ack.acknowledge();
}
}
评论0