package kelleyRabbimqPool
import (
rand2 "crypto/rand"
"errors"
"fmt"
"github.com/streadway/amqp"
"hash/crc32"
"math"
"math/big"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
var (
ACK_DATA_NIL = errors.New("ack data nil")
)
const (
DEFAULT_MAX_CONNECTION = 5 //rabbitmq tcp 最大连接数
DEFAULT_MAX_CONSUME_CHANNEL = 25 //最大消费channel数(一般指消费者)
DEFAULT_MAX_CONSUME_RETRY = 5 //消费者断线重连最大次数
DEFAULT_PUSH_MAX_TIME = 5 //最大重发次数
DEFAULT_MAX_PRODUCT_RETRY = 5 //生产者断线重连最大次数
//轮循-连接池负载算法
LOAD_BALANCE_ROUND = 1
)
const (
RABBITMQ_TYPE_PUBLISH = 1 //生产者
RABBITMQ_TYPE_CONSUME = 2 //消费者
DEFAULT_RETRY_MIN_RANDOM_TIME = 5000 //最小重试时间机数
DEFAULT_RETRY_MAX_RADNOM_TIME = 15000 //最大重试时间机数
)
const (
EXCHANGE_TYPE_FANOUT = "fanout" // Fanout:广播,将消息交给所有绑定到交换机的队列
EXCHANGE_TYPE_DIRECT = "direct" //Direct:定向,把消息交给符合指定routing key 的队列
EXCHANGE_TYPE_TOPIC = "topic" //Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
)
/**
错误码
*/
const (
RCODE_PUSH_MAX_ERROR = 501 //发送超过最大重试次数
RCODE_GET_CHANNEL_ERROR = 502 //获取信道失败
RCODE_CHANNEL_QUEUE_EXCHANGE_BIND_ERROR = 503 //交换机/队列/绑定失败
RCODE_CONNECTION_ERROR = 504 //连接失败
RCODE_PUSH_ERROR = 505 //消息推送失败
RCODE_CHANNEL_CREATE_ERROR = 506 //信道创建失败
RCODE_RETRY_MAX_ERROR = 507 //超过最大重试次数
)
type RetryClientInterface interface {
Push(pushData []byte) *RabbitMqError
Ack() error
}
/**
重试工具
*/
type retryClient struct {
channel *amqp.Channel
data *amqp.Delivery
header map[string]interface{}
deadExchangeName string
deadQueueName string
deadRouteKey string
pool *RabbitPool
receive *ConsumeReceive
}
func newRetryClient(channel *amqp.Channel, data *amqp.Delivery, header map[string]interface{}, deadExchangeName string, deadQueueName string, deadRouteKey string, pool *RabbitPool, receive *ConsumeReceive) *retryClient {
return &retryClient{channel: channel, data: data, header: header, deadExchangeName: deadExchangeName, deadQueueName: deadQueueName, deadRouteKey: deadRouteKey, pool: pool, receive: receive}
}
func (r *retryClient) Ack() error {
//如果是非自动确认消息 手动进行确认
if !r.receive.IsAutoAck {
if r.data != nil {
return r.data.Ack(true)
}
return ACK_DATA_NIL
}
return nil
}
func (r *retryClient) Push(pushData []byte) *RabbitMqError {
if r.channel != nil {
var retryNums int32
retryNum, ok := r.header["retry_nums"]
if !ok {
retryNums = 0
} else {
retryNums = retryNum.(int32)
}
retryNums += 1
if retryNums >= r.receive.MaxReTry {
if r.receive.EventFail != nil {
r.receive.EventFail(RCODE_RETRY_MAX_ERROR, NewRabbitMqError(RCODE_RETRY_MAX_ERROR, "The maximum number of retries exceeded. Procedure", ""), pushData)
}
} else {
go func(tryNum int32, pushD []byte) {
time.Sleep(time.Millisecond * 200)
header := make(map[string]interface{}, 1)
header["retry_nums"] = tryNum
expirationTime, errs := RandomAround(r.pool.minRandomRetryTime, r.pool.maxRandomRetryTime)
if errs != nil {
expirationTime = 5000
}
err := r.channel.Publish(r.deadExchangeName, r.deadRouteKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: pushD,
Expiration: strconv.FormatInt(expirationTime, 10),
Headers: r.header,
DeliveryMode: amqp.Persistent,
})
if err != nil {
if r.receive.EventFail != nil {
r.receive.EventFail(RCODE_RETRY_MAX_ERROR, NewRabbitMqError(RCODE_RETRY_MAX_ERROR, "The maximum number of retries exceeded. Procedure", ""), pushD)
}
}
}(retryNums, pushData)
}
return nil
} else {
return NewRabbitMqError(RCODE_GET_CHANNEL_ERROR, fmt.Sprintf("获取队列 %s 的消费通道失败", r.deadQueueName), fmt.Sprintf("获取队列 %s 的消费通道失败", r.deadQueueName))
}
}
/**
错误返回
*/
type RabbitMqError struct {
Code int
Message string
Detail string
}
func (e RabbitMqError) Error() string {
return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Message)
}
func NewRabbitMqError(code int, message string, detail string) *RabbitMqError {
return &RabbitMqError{Code: code, Message: message, Detail: detail}
}
/**
消费者注册接收数据
*/
type ConsumeReceive struct {
ExchangeName string //交换机
ExchangeType string //交换机类型
Route string //路由
QueueName string //队列名称
EventSuccess func(data []byte, header map[string]interface{}, retryClient RetryClientInterface) bool //成功事件回调
EventFail func(int, error, []byte) //失败回调
IsTry bool //是否重试
MaxReTry int32 //最大重式次数
IsAutoAck bool //是否自动确认
}
type RetryToolInterface interface {
push()
}
type RetryTool struct {
channel *amqp.Channel
}
func (r *RetryTool) push() {
}
/**
单个rabbitmq channel
*/
type rChannel struct {
ch *amqp.Channel
index int32
}
type rConn struct {
conn *amqp.Connection
index int32
}
type RabbitPool struct {
minRandomRetryTime int64
maxRandomRetryTime int64
maxConnection int32 // 最大连接数量
pushMaxTime int //最大重发次数
connectionIndex int32 //记录当前使用的连接
connectionBalance int //连接池负载算法
channelPool map[int64]*rChannel //channel信道池
connections map[int][]*rConn // rabbitmq连接池
channelLock sync.RWMutex //信道池锁
connectionLock sync.Mutex //连接锁
rabbitLoadBalance *RabbitLoadBalance //连接池负载模式(生产者)
consumeMaxChannel int32 //消费者最大信道数一般指消费者
consumeReceive []*ConsumeReceive //消费者注册事件
consumeMaxRetry int32 //消费者断线重连最大次数
consumeCurrentRetry int32 //当前重连次数
productMaxRetry int32 //生产者重连次数
productCurrentRetry int32
pushCurrentRetry int32 //当前推送重连交数
clientType int //客户端类型 生产者或消费者 默认为生产者
errorChanel chan *amqp.Error //错误捕捉channel
connectStatus bool
host string //服务ip
port int //服务端口
user string //用户名
password string //密码
virtualHost string // 默认为/
}
/**
初始化生产者
*/
func NewProductPool() *RabbitPool {
return newRabbitPool(RABBITMQ_TYPE_PUBLISH)
}
/**
初始化消费者
*/
func NewConsumePool() *RabbitPool {
return newRabbitPool(RABBITMQ_TYPE_CONSUME)
}
func newRabbitPool(clientType int) *RabbitPool {
return &RabbitPool{
minRandomRetryTime: DEFAULT_RETRY_MIN_RANDOM_TIME,
maxRandomRetryTime: DEFAULT_RETRY_MAX_RADNOM_TIME,
clientType: clientType,
consumeMaxChannel: DEFAULT_MAX_CONSUME_CHANNEL,
maxConnection: DEFAULT_MAX_CONNECTION,
pushMaxTime: DEFAULT_PUSH_MAX_TIME,
connectionBalance: LOAD_BALANCE_ROUND,
connectionIndex: 0,
consumeMaxRetry: DEFAULT_MAX_CONSUME_RETRY,
consumeCurrentRetry: 0,
productMaxRetry: DEFAULT_MAX_PRODUCT_RETRY,
pushCurrentRetry: 0,
connectStatus: false,
connections: make(map[int][]*rConn, 2),
channelPool: