MQ消息队列数据丢失问题

数据的丢失问题可能出现在生产者、MQ服务器、消费者中,以RabbitMQ和Kafka为例来分析一下。

RabbitMQ

生产者弄丢了数据

生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,因为网络问题等都有可能。可以用RabbitMQ提供的事务功能,生产者发送数据之前开启RabbitMQ事务channel.txSelect() ,然后发送消息,如果消息没有成功被RabbitMQ接收到,生产者会收到异常报错,此时就可以回滚事务channel.txRollback() ,然后重试发送消息。如果收到了消息,那么可以提交事务channel.txCommit() 。

try {
    // 通过工厂创建连接
    connection = factory.newConnection();
    // 获取通道
    channel = connection.createChannel();
    // 开启事务
    channel.txSelect();

    // 这里发送消息
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

    // 模拟出现异常
    int result = 1 / 0;

    // 提交事务
    channel.txCommit();
} catch (IOException | TimeoutException e) {
    // 捕捉异常,回滚事务
    channel.txRollback();
}

RabbitMQ事务机制(

lock