package com.biz.message.amqp;

import com.biz.message.AbstractBaseMessageService;
import com.biz.message.BizMessage;
import com.biz.message.QueueDefinition;
import com.biz.message.QueueParser;
import com.biz.message.mo.AmqpSendStatus;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.TimeUnit;
import org.codelogger.utils.ValueUtils;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/* loaded from: input_file:com/biz/message/amqp/AmqpMessageService.class */
public class AmqpMessageService extends AbstractBaseMessageService {
    private static final String MQ_HEADER_X_DELAY = "x-delay";
    private AmqpTemplate amqpTemplate;
    private Integer maxRetry;
    private Long retryElapse;

    public void setAmqpTemplate(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }

    public void setMaxRetry(Integer num) {
        this.maxRetry = num;
    }

    public void setRetryElapse(Long l) {
        this.retryElapse = l;
    }

    @Override // com.biz.message.AbstractBaseMessageService
    public AmqpSendStatus send(QueueDefinition queueDefinition, BizMessage<?> bizMessage) {
        return (bizMessage.getProperties() == null || !bizMessage.getProperties().getDelay().booleanValue()) ? send(getRouter(queueDefinition), bizMessage.getPayload(), this.maxRetry, this.retryElapse) : send(getRouter(queueDefinition), bizMessage.getPayload(), this.maxRetry, this.retryElapse, bizMessage.getProperties().getDelayDuration(), bizMessage.getProperties().getDelayTimeUnit());
    }

    @Override // com.biz.message.AbstractBaseMessageService
    protected <T> T receive(QueueDefinition queueDefinition, Class<T> cls) {
        return (T) this.messageConverter.fromMessageString(new String(this.amqpTemplate.receive(new QueueParser(queueDefinition).getName()).getBody()), cls);
    }

    private AmqpRouter getRouter(QueueDefinition queueDefinition) {
        QueueParser queueParser = new QueueParser(queueDefinition);
        this.logger.info("QUEUE --parser--> {}", queueParser);
        return new AmqpRouter(queueParser.getExchangeName(), queueParser.getRoutingKey());
    }

    private AmqpSendStatus send(AmqpRouter amqpRouter, Object obj, Integer num, Long l) {
        String exchangeName = amqpRouter.getExchangeName();
        String routingKey = amqpRouter.getRoutingKey();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("send message to QUEUE using amqp, exchange={}, routing key={}, message={}", new Object[]{exchangeName, routingKey, obj});
        }
        try {
            this.amqpTemplate.convertAndSend(exchangeName, routingKey, obj);
            return new AmqpSendStatus(Boolean.TRUE.booleanValue(), null, null);
        } catch (AmqpConnectException e) {
            return handleRetry(amqpRouter, obj, num, l, e);
        }
    }

    private AmqpSendStatus send(AmqpRouter amqpRouter, Object obj, Integer num, Long l, long j, TimeUnit timeUnit) {
        String exchangeName = amqpRouter.getExchangeName();
        String routingKey = amqpRouter.getRoutingKey();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("send message to QUEUE using amqp, exchange={}, routing key={}, message={}", new Object[]{exchangeName, routingKey, obj});
        }
        try {
            Message message = null;
            MessageProperties messageProperties = null;
            if (obj instanceof Message) {
                message = (Message) obj;
                messageProperties = message.getMessageProperties();
            }
            MessageProperties messageProperties2 = messageProperties == null ? new MessageProperties() : messageProperties;
            messageProperties2.getHeaders().put(MQ_HEADER_X_DELAY, Long.valueOf(timeUnit != null ? timeUnit.toMillis(j) : j * 1000));
            if (message == null && (this.amqpTemplate instanceof RabbitTemplate)) {
                message = this.amqpTemplate.getMessageConverter().toMessage(obj, messageProperties2);
            }
            this.amqpTemplate.send(exchangeName, routingKey, message);
            return new AmqpSendStatus(Boolean.TRUE.booleanValue(), null, null);
        } catch (AmqpConnectException e) {
            return handleRetry(amqpRouter, obj, num, l, e);
        }
    }

    private AmqpSendStatus handleRetry(AmqpRouter amqpRouter, Object obj, Integer num, Long l, AmqpConnectException amqpConnectException) {
        if (ValueUtils.getValue(num).intValue() == 0) {
            this.logger.error("发送消息失败", amqpConnectException);
            return new AmqpSendStatus(Boolean.FALSE.booleanValue(), amqpConnectException.getMessage(), amqpConnectException.getCause());
        }
        this.logger.warn("发送消息失败, 重试创建连接次数[{}], 重试间隔[{}]ms", ValueUtils.getValue(num), ValueUtils.getValue(l));
        if (ValueUtils.getValue(l).longValue() > 0) {
            try {
                Thread.sleep(l.longValue());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return send(amqpRouter, obj, Integer.valueOf(num.intValue() - 1), l);
    }

    private static byte[] objectToByte(Object obj) {
        byte[] bArr = null;
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(obj);
            bArr = byteArrayOutputStream.toByteArray();
            byteArrayOutputStream.close();
            objectOutputStream.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return bArr;
    }
}
