/*
 * Decompiled with CFR 0.152.
 */
package com.depotnearby.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.depotnearby.common.mo.ByteArrayMessage;
import com.depotnearby.common.mo.Message;
import com.depotnearby.common.service.mq.MessageHandler;
import com.depotnearby.common.service.mq.impl.RabbitConnection;
import com.depotnearby.common.util.JsonUtil;
import com.depotnearby.dao.redis.IMQRedisDao;
import com.depotnearby.exception.CommonException;
import com.depotnearby.ro.mq.MQMessageRo;
import com.depotnearby.util.SingleTimerFactory;
import com.depotnearby.vo.mq.MQMessage;
import com.google.common.collect.Sets;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.ObjectPool;
import org.codelogger.utils.CollectionUtils;
import org.codelogger.utils.ValueUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQService {
    private static final String DEPOTNEARBY_MQ_TOP = "_depotnearby.top";
    private static final String HEADER_TYPE = "__type";
    private ObjectPool<RabbitConnection> connPool;
    private ExecutorService subscribeMsgExecutor = Executors.newFixedThreadPool(10);
    private Set<String> mqQueues;
    private Set<String> mqTopics;
    private IMQRedisDao mqRedisDao;
    private static final Logger logger = LoggerFactory.getLogger(MQService.class);

    protected MQService(ObjectPool<RabbitConnection> connPool, String[] mqQueues, String[] mqTopics, final IMQRedisDao mqRedisDao) throws CommonException {
        this.connPool = connPool;
        this.mqRedisDao = mqRedisDao;
        this.mqQueues = Sets.newHashSet((Object[])mqQueues);
        Field[] queueFields = Message.QUEUE.class.getFields();
        Message.QUEUE queue = new Message.QUEUE(){};
        for (Field field : queueFields) {
            field.setAccessible(true);
            try {
                this.mqQueues.add(field.get(queue).toString());
            }
            catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
        this.mqTopics = Sets.newHashSet((Object[])mqTopics);
        Field[] topicFields = Message.TOPIC.class.getFields();
        Message.TOPIC topic = new Message.TOPIC(){};
        for (Field field : topicFields) {
            field.setAccessible(true);
            try {
                this.mqTopics.add(field.get(topic).toString());
            }
            catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
        this.initQueueAndTopics();
        if (mqRedisDao != null) {
            Timer timer = SingleTimerFactory.getTimer();
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    List<MQMessageRo> mqMessageRosOfNeedProcess = mqRedisDao.getMQMessageRosOfNeedProcess();
                    if (CollectionUtils.isNotEmpty(mqMessageRosOfNeedProcess)) {
                        for (MQMessageRo mqMessageRoOfNeedProcess : mqMessageRosOfNeedProcess) {
                            try {
                                try {
                                    MQService.this.sendMessage(new MQMessage(mqMessageRoOfNeedProcess));
                                    mqRedisDao.markAsProcessed(mqMessageRoOfNeedProcess);
                                }
                                catch (CommonException e) {
                                    logger.info("Put mq msg[{}] to queue failed, put back to wait list", (Object)JsonUtil.obj2Json((Object)mqMessageRoOfNeedProcess), (Object)e);
                                    mqRedisDao.putBackToWaitForProcessQueue(mqMessageRoOfNeedProcess);
                                }
                            }
                            catch (Exception e) {
                                logger.warn("Process mq msg:{} failed...", (Object)JsonUtil.obj2Json((Object)mqMessageRoOfNeedProcess), (Object)e);
                            }
                        }
                    }
                }
            }, TimeUnit.MINUTES.toMillis(1L), TimeUnit.SECONDS.toMillis(1L));
        }
    }

    private void initQueueAndTopics() throws CommonException {
        RabbitConnection wrapConn = null;
        try {
            wrapConn = (RabbitConnection)this.connPool.borrowObject();
            Channel channel = wrapConn.getChannel();
            channel.exchangeDeclare(DEPOTNEARBY_MQ_TOP, "direct");
            logger.debug("topic-length: {}", (Object)(this.mqTopics == null ? 0 : this.mqTopics.size()));
            for (String queue : this.mqQueues) {
                this.initQueue(channel, queue);
            }
            for (String topic : this.mqTopics) {
                this.initTopic(channel, topic);
            }
        }
        catch (Exception e) {
            throw new CommonException("\u521d\u59cb\u5316\u961f\u5217\u5931\u8d25", e);
        }
        finally {
            this.returnObject(wrapConn);
        }
    }

    private void initTopic(Channel channel, String topic) throws IOException {
        if (StringUtils.isNotBlank((CharSequence)topic)) {
            logger.debug("\u521d\u59cb\u5316Topic: {}", (Object)topic);
            channel.exchangeDeclare(topic, "fanout");
            channel.exchangeBind(topic, DEPOTNEARBY_MQ_TOP, topic);
        }
    }

    private void initQueue(Channel channel, String queue) throws IOException {
        if (StringUtils.isNotBlank((CharSequence)queue)) {
            logger.debug("\u521d\u59cb\u5316Queue: {}", (Object)queue);
            channel.queueDeclare(queue, true, false, false, null);
            channel.queueBind(queue, DEPOTNEARBY_MQ_TOP, queue);
        }
    }

    public void sendMessage(MQMessage message) throws CommonException {
        if (message == null) {
            throw new CommonException("\u6d88\u606f\u4e0d\u80fd\u4e3a\u7a7a", 900);
        }
        this.send(message.getQueueName(), message);
    }

    @Deprecated
    public void sendMessage(String queueName, Object message) throws CommonException {
        this.send(queueName, message);
    }

    private void send(String queueName, Object message) throws CommonException {
        if (message == null) {
            throw new CommonException("\u6d88\u606f\u4e0d\u80fd\u4e3a\u7a7a", 900);
        }
        RabbitConnection wrapConn = null;
        try {
            wrapConn = (RabbitConnection)this.connPool.borrowObject();
            Channel channel = wrapConn.getChannel();
            AMQP.BasicProperties mp = MessageProperties.MINIMAL_BASIC;
            HashMap<String, Object> headers = new HashMap<String, Object>();
            headers.put(HEADER_TYPE, this.getMessageType(message));
            mp.setHeaders(headers);
            byte[] content = this.obj2bytes(message);
            channel.basicPublish(DEPOTNEARBY_MQ_TOP, queueName, mp, content);
        }
        catch (Exception e) {
            throw new CommonException("\u53d1\u9001\u6d88\u606f\u51fa\u9519", 900, e);
        }
        finally {
            this.returnObject(wrapConn);
        }
    }

    public Object receiveMessage(String queueName) throws CommonException {
        RabbitConnection wrapConn = null;
        try {
            wrapConn = (RabbitConnection)this.connPool.borrowObject();
            Channel channel = wrapConn.getChannel();
            GetResponse getResponse = channel.basicGet(queueName, true);
            String msgType = this.getMsgTypeFromProps(getResponse.getProps());
            byte[] content = getResponse.getBody();
            Object object = this.bytes2obj(msgType, content);
            return object;
        }
        catch (Exception e) {
            throw new CommonException("\u63a5\u6536\u6d88\u606f\u51fa\u9519", e);
        }
        finally {
            this.returnObject(wrapConn);
        }
    }

    private String getMsgTypeFromProps(AMQP.BasicProperties props) {
        if (props == null || props.getHeaders() == null) {
            return null;
        }
        Object typeObj = props.getHeaders().get(HEADER_TYPE);
        return typeObj == null ? null : typeObj.toString();
    }

    public void subscribeQueue(String queueName, final MessageHandler handler) throws CommonException {
        RabbitConnection wrapConn = null;
        try {
            wrapConn = (RabbitConnection)this.connPool.borrowObject();
            Channel channel = wrapConn.getConn().createChannel();
            channel.basicConsume(queueName, false, (Consumer)new DefaultConsumer(channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String type = MQService.this.getMsgTypeFromProps(properties);
                    Object msg = null;
                    try {
                        msg = MQService.this.bytes2obj(type, body);
                    }
                    catch (Exception e) {
                        logger.error("\u6d88\u606f\u53cd\u5e8f\u5217\u5316\u51fa\u9519, Message Type: {}", (Object)type, (Object)e);
                        this.getChannel().basicNack(envelope.getDeliveryTag(), false, false);
                    }
                    if (msg != null) {
                        try {
                            SubscribeMsgCallback callbackRun = new SubscribeMsgCallback(msg, handler, this.getChannel(), envelope, MQService.this.mqRedisDao);
                            MQService.this.subscribeMsgExecutor.execute(callbackRun);
                        }
                        catch (Exception e) {
                            logger.info("execute {} failed.", msg, (Object)e);
                        }
                    }
                }
            });
        }
        catch (Exception e) {
            throw new CommonException("\u63a5\u6536\u6d88\u606f\u51fa\u9519", e);
        }
        finally {
            this.returnObject(wrapConn);
        }
    }

    private byte[] obj2bytes(Object message) throws IOException {
        return JSON.toJSONBytes((Object)message, (SerializerFeature[])new SerializerFeature[0]);
    }

    private Object getMessageType(Object message) {
        return message.getClass().getName();
    }

    private Object bytes2obj(String type, byte[] body) throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException {
        if (type == null || type.equals(this.getMessageType(new byte[0]))) {
            return body;
        }
        Class<?> messageObjClass = Class.forName(type);
        if (ByteArrayMessage.class.isAssignableFrom(messageObjClass)) {
            ByteArrayMessage byteArrayMessage = (ByteArrayMessage)messageObjClass.newInstance();
            byteArrayMessage.messageDecode(body);
            return byteArrayMessage;
        }
        logger.info("\u53cd\u5e8f\u5217\u5316\u76ee\u6807\u5bf9\u8c61[{}]", (Object)JSON.toJSONString((Object)JSON.parseObject((byte[])body, messageObjClass, (Feature[])new Feature[0])));
        return JSON.parseObject((byte[])body, messageObjClass, (Feature[])new Feature[0]);
    }

    private void returnObject(RabbitConnection wrapConn) {
        if (wrapConn != null) {
            try {
                this.connPool.returnObject((Object)wrapConn);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void subscribeTopic(String topicName, MessageHandler handler) throws CommonException {
        String tempQueue = topicName + "." + UUID.randomUUID().toString();
        RabbitConnection conn = null;
        try {
            conn = (RabbitConnection)this.connPool.borrowObject();
            Channel channel = conn.getChannel();
            channel.queueDeclare(tempQueue, false, false, true, null);
            channel.queueBind(tempQueue, topicName, tempQueue);
            this.subscribeQueue(tempQueue, handler);
        }
        catch (Exception e) {
            throw new CommonException("\u8bbf\u95eeTopic[" + topicName + "]\u6d88\u606f\u51fa\u9519", e);
        }
        finally {
            this.returnObject(conn);
        }
    }

    private static class SubscribeMsgCallback
    implements Runnable {
        private Object msg;
        private MessageHandler handler;
        private Channel channel;
        private Envelope envelope;
        private IMQRedisDao mqRedisDao;

        public SubscribeMsgCallback(Object msg, MessageHandler handler, Channel channel, Envelope envelope, IMQRedisDao mqRedisDao) {
            this.msg = msg;
            this.handler = handler;
            this.channel = channel;
            this.envelope = envelope;
            this.mqRedisDao = mqRedisDao;
        }

        @Override
        public void run() {
            try {
                if (this.msg instanceof MQMessage) {
                    MQMessage mqMessage = (MQMessage)this.msg;
                    logger.debug("\u63a5\u6536\u5230MQMessage:{}", (Object)mqMessage);
                    this.handler.handle(mqMessage.dataToObject());
                } else {
                    logger.debug("\u63a5\u6536\u5230\u666e\u901a Message:{}", this.msg);
                    this.handler.handle(this.msg);
                }
                this.channel.basicAck(this.envelope.getDeliveryTag(), false);
            }
            catch (Exception e) {
                try {
                    if (this.msg instanceof MQMessage && this.mqRedisDao != null) {
                        MQMessage mqMessage = (MQMessage)this.msg;
                        if (ValueUtils.getValue((Integer)mqMessage.getFailedCount()) < ValueUtils.getValue((Integer)mqMessage.getMaxFailedCount())) {
                            logger.debug("\u6d88\u606f{}\u6267\u884c\u51fa\u9519\u672a\u8d85\u8fc7\u6700\u5927\u6b21\u6570:{}\uff0c\u653e\u5165\u5f85\u5904\u7406\u961f\u5217.", new Object[]{JsonUtil.obj2Json((Object)this.msg), mqMessage.getMaxFailedCount(), e});
                            MQMessageRo mQMessageRo = new MQMessageRo(mqMessage.getQueueName(), mqMessage.getDataClass(), mqMessage.getData(), System.currentTimeMillis(), mqMessage.getRetryPeriod(), mqMessage.getFailedCount() + 1, mqMessage.getMaxFailedCount(), mqMessage.getMailTo());
                            this.mqRedisDao.addToWaitForProcessQueue(mQMessageRo);
                        } else {
                            logger.info("\u6d88\u606f{}\u6267\u884c\u51fa\u9519\u8d85\u8fc7\u6700\u5927\u6b21\u6570:{}\uff0c\u653e\u5f03.", new Object[]{JsonUtil.obj2Json((Object)this.msg), mqMessage.getMaxFailedCount(), e});
                            this.channel.basicAck(this.envelope.getDeliveryTag(), false);
                        }
                    } else if (this.mqRedisDao == null) {
                        logger.info("\u6d88\u606f\u5904\u7406\u6d88\u606f{}\u56de\u8c03\u6267\u884c\u51fa\u9519, \u91cd\u65b0\u653e\u5165\u6d88\u606f\u961f\u5217\u3002", (Object)JsonUtil.obj2Json((Object)this.msg), (Object)e);
                        this.channel.basicNack(this.envelope.getDeliveryTag(), false, true);
                    } else {
                        logger.info("\u6d88\u606f{}\u6267\u884c\u51fa\u9519\u5e76\u4e14 mqRedisDao \u4e3a\u7a7a\uff0c\u653e\u5f03.", (Object)JsonUtil.obj2Json((Object)this.msg), (Object)e);
                        this.channel.basicAck(this.envelope.getDeliveryTag(), false);
                    }
                }
                catch (Exception e1) {
                    logger.error("rabbitmq nack error", (Throwable)e);
                }
            }
        }
    }
}

