package com.xinchen.kafka.utils;
import com.xinchen.kafka.demo.MyKafkaProperties;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import scala.collection.Seq;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* @Author: ASUS
* @Data:Created in 2017/12/11 17:07
* @Version:
* @Acton:
*/
@Slf4j
public class TopicCRUD{
private final static String ZOOKEEPER_CONNECT = MyKafkaProperties.zkConnect;
private static ZkClient zkClient = new ZkClient(ZOOKEEPER_CONNECT);
/** 创建topic */
public static String createTopic(String topicName, int partition, int replication, Properties properties,ZkUtilsValue value){
ZkUtils zkUtils = ZkUtils.apply(value.getZkConnection(), value.getSessionTimeout(),value.getConnectionTimout(), value.getIsTrue());
AdminUtils.createTopic(zkUtils,topicName,partition,replication,properties, RackAwareMode.Enforced$.MODULE$);
log.info("创建成功!");
zkUtils.close();
return topicName;
}
/** 删除topic */
public static void deleteTopic(String topicName,ZkUtilsValue value){
ZkUtils zkUtils = ZkUtils.apply(value.getZkConnection(), value.getSessionTimeout(),value.getConnectionTimout(), value.getIsTrue());
AdminUtils.deleteTopic(zkUtils,topicName);
log.info("删除成功!");
zkUtils.close();
}
/** 描述主题 */
public static void describeCluster(AdminClient client) throws ExecutionException, InterruptedException {
DescribeClusterResult ret = client.describeCluster();
System.out.println(String.format("Cluster id: %s, controller: %s", ret.clusterId().get(), ret.controller().get()));
System.out.println("Current cluster nodes info: ");
for (Node node : ret.nodes().get()) {
System.out.println(node);
}
}
/** 描述主题的配置 */
public static void describeConfig(AdminClient client,String topicName) throws ExecutionException, InterruptedException {
DescribeConfigsResult ret = client.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC,topicName)));
Map<ConfigResource, Config> configs = ret.all().get();
for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
ConfigResource key = entry.getKey();
Config value = entry.getValue();
System.out.println(String.format("Resource type:%s,resource name:%s",key.type(),key.name()));
Collection<ConfigEntry> entries = value.entries();
for (ConfigEntry configEntry : entries) {
System.out.println(configEntry.name() + "=" +configEntry.value());
}
}
}
/** 更改主题的配置 */
public static void alterConfigs(AdminClient client,String topicName,ConfigEntry configEntry) throws ExecutionException, InterruptedException {
Config topicConfig = new Config(Arrays.asList(configEntry));
client.alterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC,topicName),topicConfig)).all().get();
}
/** 描述指定的主题 */
public static void describeTopics(AdminClient client, String topicName) throws ExecutionException, InterruptedException {
DescribeTopicsResult ret = client.describeTopics(Arrays.asList(topicName, "__consumer_offsets"));
Map<String, TopicDescription> map = ret.all().get();
for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {
System.out.println(entry.getKey()+ "===>" +entry.getValue());
}
}
/** 打印集群中所有的主题 */
public static void listAllTopic(AdminClient client) throws ExecutionException, InterruptedException {
ListTopicsOptions topicsOptions = new ListTopicsOptions();
topicsOptions.listInternal(true);
ListTopicsResult listTopics = client.listTopics(topicsOptions);
Set<String> topicNames = listTopics.names().get();
System.out.println("Current topics in this cluster:" + topicNames);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
kafka实体演示IDEA (105个子文件)
TopicCRUD.class 8KB
ZkUtilsValue.class 3KB
Controller.class 3KB
KafkaConsumerConfig.class 3KB
MyKafkaAyncConsumer.class 3KB
MyKafkaSyncConsumer.class 3KB
MyKafkaStream.class 3KB
MyKafkaProducer.class 3KB
KafkaApplication.class 3KB
KafkaProducerConfig.class 2KB
MyKafkaProducer$1.class 1KB
Listener.class 1KB
MyKafkaProperties.class 768B
KafkaController.class 641B
KafkaApplicationTests.class 627B
kafka.iml 8KB
TopicCRUD.java 4KB
KafkaConsumerConfig.java 3KB
Controller.java 2KB
MyKafkaProducer.java 2KB
MyKafkaAyncConsumer.java 2KB
KafkaProducerConfig.java 2KB
MyKafkaSyncConsumer.java 2KB
KafkaApplication.java 2KB
Listener.java 1KB
MyKafkaStream.java 1KB
KafkaController.java 996B
ZkUtilsValue.java 708B
MyKafkaProperties.java 682B
KafkaApplicationTests.java 333B
workspace.xml 52KB
uiDesigner.xml 9KB
pom.xml 2KB
Maven__org_springframework_boot_spring_boot_test_autoconfigure_1_5_9_RELEASE.xml 769B
Maven__org_springframework_boot_spring_boot_starter_logging_1_5_9_RELEASE.xml 748B
Maven__org_springframework_boot_spring_boot_starter_tomcat_1_5_9_RELEASE.xml 741B
Maven__org_springframework_boot_spring_boot_autoconfigure_1_5_9_RELEASE.xml 734B
Maven__org_springframework_boot_spring_boot_starter_test_1_5_9_RELEASE.xml 727B
Maven__org_springframework_boot_spring_boot_starter_web_1_5_9_RELEASE.xml 720B
Maven__com_vaadin_external_google_android_json_0_0_20131108_vaadin1.xml 700B
Maven__org_springframework_boot_spring_boot_starter_1_5_9_RELEASE.xml 692B
Maven__org_springframework_boot_spring_boot_test_1_5_9_RELEASE.xml 671B
Maven__org_springframework_spring_expression_4_3_13_RELEASE.xml 665B
Maven__org_apache_tomcat_embed_tomcat_embed_websocket_8_5_23.xml 660B
Maven__org_springframework_spring_messaging_4_3_13_RELEASE.xml 658B
Maven__org_springframework_kafka_spring_kafka_1_1_7_RELEASE.xml 647B
Maven__org_springframework_retry_spring_retry_1_2_1_RELEASE.xml 647B
Maven__org_springframework_spring_context_4_3_13_RELEASE.xml 644B
Maven__com_fasterxml_jackson_core_jackson_annotations_2_8_0.xml 644B
Maven__org_springframework_spring_webmvc_4_3_13_RELEASE.xml 637B
Maven__org_apache_tomcat_tomcat_annotations_api_8_5_23.xml 636B
Maven__org_springframework_boot_spring_boot_1_5_9_RELEASE.xml 636B
Maven__org_hibernate_hibernate_validator_5_3_6_Final.xml 634B
Maven__org_springframework_spring_beans_4_3_13_RELEASE.xml 630B
Maven__com_fasterxml_jackson_core_jackson_databind_2_8_10.xml 630B
Maven__org_apache_tomcat_embed_tomcat_embed_core_8_5_23.xml 625B
Maven__org_springframework_spring_test_4_3_13_RELEASE.xml 623B
Maven__org_springframework_spring_core_4_3_13_RELEASE.xml 623B
compiler.xml 622B
Maven__org_springframework_spring_web_4_3_13_RELEASE.xml 616B
Maven__org_springframework_spring_aop_4_3_13_RELEASE.xml 616B
Maven__org_apache_tomcat_embed_tomcat_embed_el_8_5_23.xml 611B
Maven__javax_validation_validation_api_1_1_0_Final.xml 611B
Maven__org_jboss_logging_jboss_logging_3_3_1_Final.xml 608B
Maven__com_fasterxml_jackson_core_jackson_core_2_8_10.xml 602B
Maven__org_apache_kafka_kafka_clients_0_11_0_0.xml 583B
Maven__ch_qos_logback_logback_classic_1_1_11.xml 575B
Maven__org_xerial_snappy_snappy_java_1_1_2_6.xml 566B
Maven__com_yammer_metrics_metrics_core_2_2_0.xml 563B
Maven__org_apache_kafka_kafka_streams_1_0_0.xml 562B
Maven__org_apache_kafka_kafka_2_10_0_10_2_0.xml 562B
Maven__org_slf4j_log4j_over_slf4j_1_7_25.xml 562B
Maven__org_scala_lang_scala_library_2_10_6.xml 561B
Maven__net_sf_jopt_simple_jopt_simple_5_0_3.xml 556B
Maven__org_apache_kafka_connect_json_1_0_0.xml 555B
Maven__ch_qos_logback_logback_core_1_1_11.xml 554B
Maven__org_hamcrest_hamcrest_library_1_3.xml 553B
Maven__org_apache_zookeeper_zookeeper_3_4_9.xml 550B
Maven__org_mockito_mockito_core_1_10_19.xml 549B
Maven__org_slf4j_jcl_over_slf4j_1_7_25.xml 548B
Maven__org_apache_kafka_connect_api_1_0_0.xml 548B
Maven__com_jayway_jsonpath_json_path_2_2_0.xml 546B
Maven__net_minidev_accessors_smart_1_1.xml 542B
Maven__org_slf4j_slf4j_log4j12_1_7_25.xml 541B
Maven__org_skyscreamer_jsonassert_1_4_0.xml 537B
Maven__org_assertj_assertj_core_2_6_0.xml 535B
Maven__org_slf4j_jul_to_slf4j_1_7_25.xml 534B
Maven__org_hamcrest_hamcrest_core_1_3.xml 532B
Maven__org_projectlombok_lombok_1_16_18.xml 531B
Maven__com_fasterxml_classmate_1_3_4.xml 522B
Maven__org_rocksdb_rocksdbjni_5_7_3.xml 521B
Maven__net_minidev_json_smart_2_2_1.xml 521B
Maven__org_slf4j_slf4j_api_1_7_25.xml 513B
Maven__org_objenesis_objenesis_2_1.xml 508B
Maven__com_101tec_zkclient_0_10.xml 496B
Maven__org_yaml_snakeyaml_1_17.xml 495B
Maven__net_jpountz_lz4_lz4_1_3_0.xml 488B
Maven__org_ow2_asm_asm_5_0_3.xml 472B
Maven__log4j_log4j_1_2_17.xml 469B
Maven__junit_junit_4_12.xml 455B
共 105 条
- 1
- 2
资源评论
星澄码帝
- 粉丝: 108
- 资源: 25
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 111111111111111111
- Screenshot_2024-04-30-21-47-24-26.jpg
- Cpp1.cpp1111111111
- 利用ERP流程操作的整个订单流程.ppt
- 最新二开版本源码博客论坛源码,UI很漂亮,可切换皮肤界面.rar
- ModStartBlog现代化个人博客系统 v5.2.0源码.rar
- 带posix库的mingw编译器
- SoraAI是一款功能强大的AI助手,由OpenAI开发,以其出色的语音识别技术、广泛的知识库和高度的人工智能特性而备受瞩目
- Thinkphp开发大气响应式个人博客青春博客网站源码.rar
- 最新PHP博客网站程序源码 ThinkPHP.rar
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功