package com.larcloud.kafka.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.LinkedList;
@Slf4j
@Service
public class SendMsgServiceImpl {
/**
* 初始化消息队列,发送之后暂时保存在list中,然后最早头部读取 earliest
*/
private static final LinkedList<Long> linkedList = new LinkedList<>();
private static String content = "";
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, Object object) {
/*
* 这里的ListenableFuture类是spring对java原生Future的扩展增强,是一个泛型接口,用于监听异步方法的回调
* 而对于kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里K,V的泛型实际上
* 被用于ProducerRecord<K, V> producerRecord,即生产者发送消息的key,value 类型
*/
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);
future.addCallback(
o -> log.info("消息发送成功,{}", o.toString()), throwable -> log.info("消息发送失败,{}" + throwable.getMessage())
);
}
public void sendGroup(String topic, Object object) {
for (int i = 0; i < 4; i++) {
// 第二个参数指定分区,第三个参数指定消息键 分区优先
int i2 = i % 4 ;
log.info("partition = {}",i2);
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, null, "key", "hello group " + i);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("发送消息失败,{}" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> sendResult) {
log.info("发送消息成功,{}", sendResult.toString());
}
});
}
}
}