package com.depotnearby.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.depotnearby.common.mo.ByteArrayMessage;
import com.depotnearby.common.mo.Message;
import com.depotnearby.common.service.mq.MessageHandler;
import com.depotnearby.common.service.mq.impl.RabbitConnection;
import com.depotnearby.common.util.JsonUtil;
import com.depotnearby.dao.redis.IMQRedisDao;
import com.depotnearby.exception.CommonException;
import com.depotnearby.ro.mq.MQMessageRo;
import com.depotnearby.util.SingleTimerFactory;
import com.depotnearby.vo.mq.MQMessage;
import com.google.common.collect.Sets;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.ObjectPool;
import org.codelogger.utils.CollectionUtils;
import org.codelogger.utils.ValueUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/depotnearby/service/MQService.class */
public class MQService {
    private static final String DEPOTNEARBY_MQ_TOP = "_depotnearby.top";
    private static final String HEADER_TYPE = "__type";
    private ObjectPool<RabbitConnection> connPool;
    private ExecutorService subscribeMsgExecutor = Executors.newFixedThreadPool(10);
    private Set<String> mqQueues;
    private Set<String> mqTopics;
    private IMQRedisDao mqRedisDao;
    private static final Logger logger = LoggerFactory.getLogger(MQService.class);

    /* loaded from: input_file:com/depotnearby/service/MQService$SubscribeMsgCallback.class */
    private static class SubscribeMsgCallback implements Runnable {
        private Object msg;
        private MessageHandler handler;
        private Channel channel;
        private Envelope envelope;
        private IMQRedisDao mqRedisDao;

        public SubscribeMsgCallback(Object obj, MessageHandler messageHandler, Channel channel, Envelope envelope, IMQRedisDao iMQRedisDao) {
            this.msg = obj;
            this.handler = messageHandler;
            this.channel = channel;
            this.envelope = envelope;
            this.mqRedisDao = iMQRedisDao;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.msg instanceof MQMessage) {
                    MQMessage mQMessage = (MQMessage) this.msg;
                    MQService.logger.debug("接收到MQMessage:{}", mQMessage);
                    this.handler.handle(mQMessage.dataToObject());
                } else {
                    MQService.logger.debug("接收到普通 Message:{}", this.msg);
                    this.handler.handle(this.msg);
                }
                this.channel.basicAck(this.envelope.getDeliveryTag(), false);
            } catch (Exception e) {
                try {
                    if ((this.msg instanceof MQMessage) && this.mqRedisDao != null) {
                        MQMessage mQMessage2 = (MQMessage) this.msg;
                        if (ValueUtils.getValue(mQMessage2.getFailedCount()).intValue() < ValueUtils.getValue(mQMessage2.getMaxFailedCount()).intValue()) {
                            MQService.logger.debug("消息{}执行出错未超过最大次数:{}，放入待处理队列.", new Object[]{JsonUtil.obj2Json(this.msg), mQMessage2.getMaxFailedCount(), e});
                            this.mqRedisDao.addToWaitForProcessQueue(new MQMessageRo(mQMessage2.getQueueName(), mQMessage2.getDataClass(), mQMessage2.getData(), Long.valueOf(System.currentTimeMillis()), mQMessage2.getRetryPeriod(), Integer.valueOf(mQMessage2.getFailedCount().intValue() + 1), mQMessage2.getMaxFailedCount(), mQMessage2.getMailTo()));
                        } else {
                            MQService.logger.info("消息{}执行出错超过最大次数:{}，放弃.", new Object[]{JsonUtil.obj2Json(this.msg), mQMessage2.getMaxFailedCount(), e});
                            this.channel.basicAck(this.envelope.getDeliveryTag(), false);
                        }
                    } else if (this.mqRedisDao == null) {
                        MQService.logger.info("消息处理消息{}回调执行出错, 重新放入消息队列。", JsonUtil.obj2Json(this.msg), e);
                        this.channel.basicNack(this.envelope.getDeliveryTag(), false, true);
                    } else {
                        MQService.logger.info("消息{}执行出错并且 mqRedisDao 为空，放弃.", JsonUtil.obj2Json(this.msg), e);
                        this.channel.basicAck(this.envelope.getDeliveryTag(), false);
                    }
                } catch (Exception e2) {
                    MQService.logger.error("rabbitmq nack error", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MQService(ObjectPool<RabbitConnection> objectPool, String[] strArr, String[] strArr2, final IMQRedisDao iMQRedisDao) throws CommonException {
        this.connPool = objectPool;
        this.mqRedisDao = iMQRedisDao;
        this.mqQueues = Sets.newHashSet(strArr);
        Field[] fields = Message.QUEUE.class.getFields();
        Message.QUEUE queue = new Message.QUEUE() { // from class: com.depotnearby.service.MQService.1
        };
        for (Field field : fields) {
            field.setAccessible(true);
            try {
                this.mqQueues.add(field.get(queue).toString());
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
        this.mqTopics = Sets.newHashSet(strArr2);
        Field[] fields2 = Message.TOPIC.class.getFields();
        Message.TOPIC topic = new Message.TOPIC() { // from class: com.depotnearby.service.MQService.2
        };
        for (Field field2 : fields2) {
            field2.setAccessible(true);
            try {
                this.mqTopics.add(field2.get(topic).toString());
            } catch (IllegalAccessException e2) {
                e2.printStackTrace();
            }
        }
        initQueueAndTopics();
        if (iMQRedisDao != null) {
            SingleTimerFactory.getTimer().schedule(new TimerTask() { // from class: com.depotnearby.service.MQService.3
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    List<MQMessageRo> mQMessageRosOfNeedProcess = iMQRedisDao.getMQMessageRosOfNeedProcess();
                    if (CollectionUtils.isNotEmpty(mQMessageRosOfNeedProcess)) {
                        for (MQMessageRo mQMessageRo : mQMessageRosOfNeedProcess) {
                            try {
                                try {
                                    MQService.this.sendMessage(new MQMessage(mQMessageRo));
                                    iMQRedisDao.markAsProcessed(mQMessageRo);
                                } catch (CommonException e3) {
                                    MQService.logger.info("Put mq msg[{}] to queue failed, put back to wait list", JsonUtil.obj2Json(mQMessageRo), e3);
                                    iMQRedisDao.putBackToWaitForProcessQueue(mQMessageRo);
                                }
                            } catch (Exception e4) {
                                MQService.logger.warn("Process mq msg:{} failed...", JsonUtil.obj2Json(mQMessageRo), e4);
                            }
                        }
                    }
                }
            }, TimeUnit.MINUTES.toMillis(1L), TimeUnit.SECONDS.toMillis(1L));
        }
    }

    private void initQueueAndTopics() throws CommonException {
        RabbitConnection rabbitConnection = null;
        try {
            try {
                rabbitConnection = (RabbitConnection) this.connPool.borrowObject();
                Channel channel = rabbitConnection.getChannel();
                channel.exchangeDeclare(DEPOTNEARBY_MQ_TOP, "direct");
                logger.debug("topic-length: {}", Integer.valueOf(this.mqTopics == null ? 0 : this.mqTopics.size()));
                Iterator<String> it = this.mqQueues.iterator();
                while (it.hasNext()) {
                    initQueue(channel, it.next());
                }
                Iterator<String> it2 = this.mqTopics.iterator();
                while (it2.hasNext()) {
                    initTopic(channel, it2.next());
                }
                returnObject(rabbitConnection);
            } catch (Exception e) {
                throw new CommonException("初始化队列失败", e);
            }
        } catch (Throwable th) {
            returnObject(rabbitConnection);
            throw th;
        }
    }

    private void initTopic(Channel channel, String str) throws IOException {
        if (StringUtils.isNotBlank(str)) {
            logger.debug("初始化Topic: {}", str);
            channel.exchangeDeclare(str, "fanout");
            channel.exchangeBind(str, DEPOTNEARBY_MQ_TOP, str);
        }
    }

    private void initQueue(Channel channel, String str) throws IOException {
        if (StringUtils.isNotBlank(str)) {
            logger.debug("初始化Queue: {}", str);
            channel.queueDeclare(str, true, false, false, (Map) null);
            channel.queueBind(str, DEPOTNEARBY_MQ_TOP, str);
        }
    }

    public void sendMessage(MQMessage mQMessage) throws CommonException {
        if (mQMessage == null) {
            throw new CommonException("消息不能为空", 900);
        }
        send(mQMessage.getQueueName(), mQMessage);
    }

    @Deprecated
    public void sendMessage(String str, Object obj) throws CommonException {
        send(str, obj);
    }

    private void send(String str, Object obj) throws CommonException {
        if (obj == null) {
            throw new CommonException("消息不能为空", 900);
        }
        RabbitConnection rabbitConnection = null;
        try {
            try {
                rabbitConnection = (RabbitConnection) this.connPool.borrowObject();
                Channel channel = rabbitConnection.getChannel();
                AMQP.BasicProperties basicProperties = MessageProperties.MINIMAL_BASIC;
                HashMap hashMap = new HashMap();
                hashMap.put(HEADER_TYPE, getMessageType(obj));
                basicProperties.setHeaders(hashMap);
                channel.basicPublish(DEPOTNEARBY_MQ_TOP, str, basicProperties, obj2bytes(obj));
                returnObject(rabbitConnection);
            } catch (Exception e) {
                throw new CommonException("发送消息出错", 900, e);
            }
        } catch (Throwable th) {
            returnObject(rabbitConnection);
            throw th;
        }
    }

    public Object receiveMessage(String str) throws CommonException {
        RabbitConnection rabbitConnection = null;
        try {
            try {
                rabbitConnection = (RabbitConnection) this.connPool.borrowObject();
                GetResponse basicGet = rabbitConnection.getChannel().basicGet(str, true);
                Object bytes2obj = bytes2obj(getMsgTypeFromProps(basicGet.getProps()), basicGet.getBody());
                returnObject(rabbitConnection);
                return bytes2obj;
            } catch (Exception e) {
                throw new CommonException("接收消息出错", e);
            }
        } catch (Throwable th) {
            returnObject(rabbitConnection);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getMsgTypeFromProps(AMQP.BasicProperties basicProperties) {
        Object obj;
        if (basicProperties == null || basicProperties.getHeaders() == null || (obj = basicProperties.getHeaders().get(HEADER_TYPE)) == null) {
            return null;
        }
        return obj.toString();
    }

    public void subscribeQueue(String str, final MessageHandler messageHandler) throws CommonException {
        RabbitConnection rabbitConnection = null;
        try {
            try {
                rabbitConnection = (RabbitConnection) this.connPool.borrowObject();
                Channel createChannel = rabbitConnection.getConn().createChannel();
                createChannel.basicConsume(str, false, new DefaultConsumer(createChannel) { // from class: com.depotnearby.service.MQService.4
                    public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                        String msgTypeFromProps = MQService.this.getMsgTypeFromProps(basicProperties);
                        Object obj = null;
                        try {
                            obj = MQService.this.bytes2obj(msgTypeFromProps, bArr);
                        } catch (Exception e) {
                            MQService.logger.error("消息反序列化出错, Message Type: {}", msgTypeFromProps, e);
                            getChannel().basicNack(envelope.getDeliveryTag(), false, false);
                        }
                        if (obj != null) {
                            try {
                                MQService.this.subscribeMsgExecutor.execute(new SubscribeMsgCallback(obj, messageHandler, getChannel(), envelope, MQService.this.mqRedisDao));
                            } catch (Exception e2) {
                                MQService.logger.info("execute {} failed.", obj, e2);
                            }
                        }
                    }
                });
                returnObject(rabbitConnection);
            } catch (Exception e) {
                throw new CommonException("接收消息出错", e);
            }
        } catch (Throwable th) {
            returnObject(rabbitConnection);
            throw th;
        }
    }

    private byte[] obj2bytes(Object obj) throws IOException {
        return JSON.toJSONBytes(obj, new SerializerFeature[0]);
    }

    private Object getMessageType(Object obj) {
        return obj.getClass().getName();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Object bytes2obj(String str, byte[] bArr) throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException {
        if (str == null || str.equals(getMessageType(new byte[0]))) {
            return bArr;
        }
        Class<?> cls = Class.forName(str);
        if (!ByteArrayMessage.class.isAssignableFrom(cls)) {
            logger.info("反序列化目标对象[{}]", JSON.toJSONString(JSON.parseObject(bArr, cls, new Feature[0])));
            return JSON.parseObject(bArr, cls, new Feature[0]);
        }
        ByteArrayMessage byteArrayMessage = (ByteArrayMessage) cls.newInstance();
        byteArrayMessage.messageDecode(bArr);
        return byteArrayMessage;
    }

    private void returnObject(RabbitConnection rabbitConnection) {
        if (rabbitConnection != null) {
            try {
                this.connPool.returnObject(rabbitConnection);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void subscribeTopic(String str, MessageHandler messageHandler) throws CommonException {
        String str2 = str + "." + UUID.randomUUID().toString();
        RabbitConnection rabbitConnection = null;
        try {
            try {
                rabbitConnection = (RabbitConnection) this.connPool.borrowObject();
                Channel channel = rabbitConnection.getChannel();
                channel.queueDeclare(str2, false, false, true, (Map) null);
                channel.queueBind(str2, str, str2);
                subscribeQueue(str2, messageHandler);
                returnObject(rabbitConnection);
            } catch (Exception e) {
                throw new CommonException("访问Topic[" + str + "]消息出错", e);
            }
        } catch (Throwable th) {
            returnObject(rabbitConnection);
            throw th;
        }
    }
}
