spring boot集成kafka之spring-kafka深入探秘
来源:https://my.oschina.net/keking/blog/3056698
前言
kafka是一个消息队列产品,基于Topic
partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个
项目Spring-kafka,封装了Apache 的Kafka-
client,用于在Spring项目里快速集成kafka。除了简单的收发消息外,Spri
ng-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。
项目地址:https://github.com/spring-projects/spring-kafka
简单集成
引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
添加配置
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
测试发送和接收
/**
* @author: kl @kailing.pub
* @date: 2019/5/30
*/
@SpringBootApplication
@RestController
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private KafkaTemplate<Object, Object> template;
@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
this.template.send("topic_input", input);
}
@KafkaListener(id = "webGroup", topics = "topic_input")
public void listen(String input) {
logger.info("input value: {}" , input);
}
}
启动应用后,在浏览器中输入:http://localhost:8080/send/kl。就可以在
控制台看到有日志输出了:input value:
"kl"。基础的使用就这么简单。发送消息时注入一个KafkaTemplate,接收
消息时添加一个@KafkaListener注解即可。
Spring-kafka-test嵌入式Kafka Server
不过上面的代码能够启动成功,前提是你已经有了Kafka
Server的服务环境,我们知道Kafka是由Scala +
Zookeeper构建的,可以从官网下载部署包在本地部署。但是,我想告诉你
,为了简化开发环节验证Kafka相关功能,Spring-Kafka-
Test已经封装了Kafka-test提供了注解式的一键开启Kafka
Server的功能,使用起来也是超级简单。本文后面的所有测试用例的Kafka
都是使用这种嵌入式服务提供的。
引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.2.6.RELEASE</version>
<scope>test</scope>
</dependency>
启动服务
下面使用Junit测试用例,直接启动一个Kafka
Server服务,包含四个Broker节点。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
@Test
public void contextLoads()throws IOException {
System.in.read();
}
}
如上:只需要一个注解@EmbeddedKafka即可,就可以启动一个功能完整
的Kafka服务,是不是很酷。默认只写注解不加参数的情况下,是创建一个
随机端口的Broker,在启动的日志中会输出具体的端口以及默认的一些配
置项。不过这些我们在Kafka安装包配置文件中的配置项,在注解参数中都
可以配置,下面详解下@EmbeddedKafka注解中的可设置参数 :
value:broker节点数量
count:同value作用一样,也是配置的broker的节点数量
controlledShutdown:控制关闭开关,主要用来在Broker意外关闭时减
少此Broker上Partition的不可用时间
Kafka是多Broker架构的高可用服务,一个Topic对应多个partition,一
个Partition可以有多个副本Replication,这些Replication副本保存在多
个Broker,用于高可用。但是,虽然存在多个分区副本集,当前工作副
本集却只有一个,默认就是首次分配的副本集【首选副本】为Leader,
负责写入和读取数据。当我们升级Broker或者更新Broker配置时需要重
启服务,这个时候需要将partition转移到可用的Broker。下面涉及到三
种情况
1. 直接关闭Broker:当Broker关闭时,Broker集群会重新进行选主操作,
选出一个新的Broker来作为Partition
Leader,选举时此Broker上的Partition会短时不可用
2. 开启controlledShutdown:当Broker关闭时,Broker本身会先尝试将Lea
der角色转移到其他可用的Broker上
3. 使用命令行工具:使用bin/kafka-preferred-replica-
election.sh,手动触发PartitionLeader角色转移
ports:端口列表,是一个数组。对应了count参数,有几个Broker,就
要对应几个端口号
brokerProperties:Broker参数设置,是一个数组结构,支持如下方式
进行Broker参数设置:
@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.th
reads = 8"})
okerPropertiesLocation:Broker参数文件设置
功能同上面的brokerProperties,只是Kafka
Broker的可设置参数达182个之多,都像上面这样配置肯定不是最优方案,
所以提供了加载本地配置文件的功能,如:
@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")
创建新的Topic