批量消费需要开启配置:
@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10); // 10并发
factory.getContainerProperties().setPollTimeout(5000);
//例如poll(5000): 如果拉到数据的话 会立即放回;如果拉不到数据的话,这个是最长的等待时间;
//比如5s,如果一直没有数据的话,每5s拉一次返回一次,有数据就立即返回再拉
factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
return factory;
}
单挑信息消费:
@Bean
public KafkaListenerContainerFactory<?> batchFactoryOff(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(1000);
factory.setBatchListener(false);//设置为批量消费,每个批次数量在Kafka配置参数中设置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
return factory;
}