package com.hub.producer;
import com.sun.javafx.runtime.SystemProperties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class KafkaProducerDeno {
public static void main(String[] args) throws Exception {
KafkaProducerDeno kafkaProducerDeno = new KafkaProducerDeno();
kafkaProducerDeno.producer();
}
private static Properties getProperties() throws Exception {
Properties properties = PropertiesLoaderUtils.loadAllProperties("application.properties");
// Properties properties = new Properties();
// InputStream ips = KafkaProducerDeno.class
// .getResourceAsStream("");
// Properties props = new Properties();
// props.load(ips);
//
//
// System.getProperty("bootstrap.servers");
//
properties.setProperty("bootstrap.servers", properties.getProperty("bootstrap.servers").toString());
properties.setProperty("group.id",properties.getProperty("group.id").toString());
properties.setProperty("key.deserializer", properties.getProperty("key.deserializer").toString());
properties.setProperty("value.deserializer", properties.getProperty("value.deserializer").toString());
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
//properties.setProperty("enable.auto.commit", "false");
//earliest latest
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
//读取topic
properties.setProperty("first.uav.topic.name",properties.getProperty("first.uav.topic.name").toString());
//写入topic
properties.setProperty("three.area.write.topic",properties.getProperty("three.area.write.topic").toString());
return properties;
}
/**
* @Description: 生产者
*/
public static void producer() throws Exception {
Properties properties = getProperties();
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
int i=0;
while(true){
producer.send( new ProducerRecord("test0420", "message: " + ++i));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 获取到Kafka的配置详情信息
*
* @return
* @throws Exception
*/
private static Properties getKafkaProperties() throws Exception {
String postgresqlConfigName = "application.properties";
InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(postgresqlConfigName);
Properties prop = new Properties();
try {
prop.load(stream);
} catch (IOException e) {
throw new Exception("加载配置文件失败,请检查位置文件是否存在====");
}
return prop;
}
}
kafka实战演练快速上手
需积分: 0 170 浏览量
更新于2023-07-13
收藏 140KB ZIP 举报
《Kafka实战演练快速上手》
在大数据处理和实时流计算领域,Apache Kafka已经成为一个不可或缺的组件。它是一个分布式消息中间件,设计用于处理大规模、高吞吐量的数据流,广泛应用于日志收集、用户行为追踪、流式数据处理等场景。本教程将引导你快速上手Kafka,让你在实践中掌握其核心概念与操作。
一、Kafka基本概念
1. **主题(Topic)**:主题是Kafka中数据的分类,类似于数据库中的表。每个主题可以被划分为多个分区(Partitions),提供水平扩展能力。
2. **分区(Partition)**:每个主题可以包含多个分区,分区是有序的且不可变的消息序列。分区内部的消息顺序得到保留,同一分区内的消息按照生产顺序消费。
3. **生产者(Producer)**:负责将消息发送到Kafka的主题中。生产者可以选择将消息发送到特定分区或让Kafka自动分配。
4. **消费者(Consumer)**:消费者从Kafka的主题中读取消息。消费者可以属于一个消费组(Consumer Group),同一组内的消费者会协同工作,实现负载均衡和故障恢复。
5. **消费组(Consumer Group)**:消费组是Kafka消费者模型的核心概念,同一主题的多个分区可以被分发到不同的消费组,确保每个消息只被消费一次。
6. ** broker**:Kafka集群中的节点,负责存储、复制和分发消息。
二、Kafka部署与配置
1. **安装与启动**:下载Kafka二进制包,配置环境变量,启动Zookeeper和Kafka服务器。
2. **创建主题**:使用`kafka-topics.sh`脚本创建主题,指定分区数量和副本数。
3. **配置参数**:根据实际需求调整Kafka的配置参数,如broker的`message.max.bytes`控制单个消息的最大大小,`num.replicas`设置副本数量。
三、Kafka生产者开发
1. **Java API**:使用Kafka的Java客户端库创建生产者,设置配置项,如`bootstrap.servers`连接地址,`acks`确认模式等。
2. **发送消息**:调用`send()`方法将消息发送到指定主题,可选择同步或异步发送。
四、Kafka消费者开发
1. **消费者API**:使用Java客户端创建消费者,设置消费组ID,订阅主题。
2. **拉取消息**:通过`poll()`方法定期从Kafka拉取消息,处理完后提交偏移量,保证消息不丢失。
五、Kafka实战演练
1. **搭建环境**:首先在本地或者虚拟机上部署一套Kafka集群。
2. **编写生产者程序**:创建一个简单的Java应用,向`kafka-demo`主题发送消息。
3. **编写消费者程序**:创建另一个Java应用,订阅`kafka-demo`主题并消费消息。
4. **测试与验证**:运行生产者程序发送消息,同时启动消费者程序查看是否能正确接收并处理消息。
5. **扩展与优化**:尝试增加分区数量,观察消费者负载变化;添加更多消费者,实现并行消费。
通过以上步骤,你可以快速了解并掌握Kafka的基本使用。在实际项目中,还需要关注Kafka的高级特性,如幂等性生产者、Kafka Streams、Kafka Connect等,以及如何与其他系统集成,如Hadoop、Spark等。持续学习和实践,将使你在大数据处理领域更加得心应手。
此湖不结冰
- 粉丝: 2
- 资源: 5
最新资源
- lanchaoHunanHoutaiQiantai
- (177377030)Python 爬虫.zip
- (177537818)python爬虫基础知识及爬虫实例.zip
- 自动驾驶横纵向耦合控制-复现Apollo横纵向控制 基于动力学误差模型,使用mpc算法,一个控制器同时控制横向和纵向,实现横纵向耦合控制 matlab与simulink联合仿真,纵向控制已经做好油门刹
- (178199432)C++实现STL容器之List
- (178112810)基于ssm+vue餐厅点餐系统.zip
- 两相步进电机FOC矢量控制Simulink仿真模型 1.采用针对两相步进电机的SVPWM控制算法,实现FOC矢量控制,DQ轴解耦控制~ 2.转速电流双闭环控制,电流环采用PI控制,转速环分别采用PI和
- VMware虚拟机USB驱动
- Halcon手眼标定简介(1)
- (175128050)c&c++课程设计-图书管理系统