package com.yb.kafka.consumer.three.config;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
System.out.println("init");
return new KafkaTemplate(producerFactory());
}
public static void main(String[] args) {
//创建topic
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient adminClient = AdminClient.create(props);
ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
NewTopic newTopic = new NewTopic("topic-test", 1, (short) 1);
topics.add(newTopic);
CreateTopicsResult result = adminClient.createTopics(topics);
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
SpringCloud+Nacos简单示例
共16个文件
java:10个
xml:3个
yml:2个
需积分: 36 51 下载量 157 浏览量
2020-08-15
22:24:19
上传
评论 1
收藏 45.45MB ZIP 举报
温馨提示
springcloud是目前比较流行的微服务技术,Nacos也是目前比较好的注册中心。下面就两个东西简单组合使用介绍一下:环境:jdk1.8,spring-cloud Greenwich.SR1,spring-boot2,nacos注册中心.消费者、生产者,Nacos1.3免安装包
资源推荐
资源详情
资源评论
收起资源包目录
urbantraffic-iccp-country.zip (16个子文件)
urbantraffic-iccp-sys-service
src
main
resources
application.yml 354B
java
com
yb
kafka
consumer
three
controller
NacosProviderController.java 749B
ProducerController.java 596B
service
config
KafkaConfig.java 4KB
SysServiceApplication.java 1KB
test
pom.xml 2KB
pom.xml 877B
urbantraffic-iccp-sys-client
src
main
resources
application.yml 351B
java
com
yb
kafka
consumer
three
controller
FeignConsumerApi.java 536B
RestTemplateConfig.java 514B
NacosConsumerController.java 1KB
SysClientApplication.java 2KB
service
SysClientService.java 84B
config
KafkaConfig.java 4KB
test
pom.xml 3KB
nacos1.3.zip 45.59MB
共 16 条
- 1
资源评论
atgoingguoat
- 粉丝: 6
- 资源: 14
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功