数据的丢失问题可能出现在生产者、MQ服务器、消费者中,以RabbitMQ和Kafka为例来分析一下。
RabbitMQ
生产者弄丢了数据
生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,因为网络问题等都有可能。可以用RabbitMQ提供的事务功能,生产者发送数据之前开启RabbitMQ事务channel.txSelect()
,然后发送消息,如果消息没有成功被RabbitMQ接收到,生产者会收到异常报错,此时就可以回滚事务channel.txRollback()
,然后重试发送消息。如果收到了消息,那么可以提交事务channel.txCommit()
。
try {
// 通过工厂创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启事务
channel.txSelect();
// 这里发送消息
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// 模拟出现异常
int result = 1 / 0;
// 提交事务
channel.txCommit();
} catch (IOException | TimeoutException e) {
// 捕捉异常,回滚事务
channel.txRollback();
}
RabbitMQ事务机制(