package com.biz.crm.business.common.rocketmq.service;

import com.alibaba.fastjson.JSON;
import com.biz.crm.business.common.rocketmq.constant.MqConstant;
import com.biz.crm.business.common.rocketmq.document.RocketMqMessageLogDocument;
import com.biz.crm.business.common.rocketmq.enums.RocketMqTypeEnum;
import com.biz.crm.business.common.rocketmq.event.RocketMqProducerEvent;
import com.biz.crm.business.common.rocketmq.repository.RocketMqMessageLogDocumentRepository;
import com.biz.crm.business.common.rocketmq.util.RocketMqUtil;
import com.biz.crm.business.common.rocketmq.vo.MqMessageVo;
import com.biz.crm.business.common.sdk.enums.EnableStatusEnum;
import com.biz.crm.business.common.sdk.model.AbstractCrmUserIdentity;
import com.biz.crm.business.common.sdk.service.LoginUserService;
import com.biz.crm.business.common.sdk.service.RedisService;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import jodd.util.StringUtil;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

@Component
/* loaded from: input_file:com/biz/crm/business/common/rocketmq/service/RocketMqProducer.class */
public class RocketMqProducer {
    private static final Logger log = LoggerFactory.getLogger(RocketMqProducer.class);

    @Resource
    private MqCommonService mqCommonService;

    @Autowired(required = false)
    private RocketMqMessageLogDocumentRepository rocketMqMessageLogDocumentRepository;

    @Autowired(required = false)
    private RedisService redisService;

    @Autowired(required = false)
    private MqExceptionExtendService mqExceptionExtendService;

    @Autowired(required = false)
    private BuildMqInfoExtendService buildMqInfoExtendService;

    @Autowired(required = false)
    private LoginUserService loginUserService;
    protected final Integer DEF_SIZE = 200;

    public RocketMqTypeEnum getCurrentMqType() {
        return this.mqCommonService.getMqTypeEnum();
    }

    public void sendMqMsg(MqMessageVo mqMessageVo) {
        sendMqMsg(mqMessageVo, 0L);
    }

    public void sendMqMsg(MqMessageVo mqMessageVo, long j) {
        if (!StringUtils.hasText(mqMessageVo.getTopic())) {
            mqMessageVo.setTopic(RocketMqUtil.getTopic());
        }
        if (!mqMessageVo.getTopic().contains(RocketMqUtil.mqEnvironment())) {
            mqMessageVo.setTopic(mqMessageVo.getTopic() + RocketMqUtil.mqEnvironment());
        }
        RocketMqMessageLogDocument buildMqLog = buildMqLog(mqMessageVo);
        log.info("发送的MQ消息内容[{}]", JSON.toJSONString(mqMessageVo));
        try {
            this.mqCommonService.sendMqMsg(buildMqLog, mqMessageVo, j);
        } catch (IllegalArgumentException e) {
            log.error("MQ消息发送失败[{}]", JSON.toJSONString(mqMessageVo));
            log.error(e.getMessage(), e);
            sendMqExceptionSaveLog(buildMqLog, mqMessageVo, e);
            throw new IllegalArgumentException(e.getMessage());
        } catch (Exception e2) {
            log.error("MQ消息发送失败[{}]", JSON.toJSONString(mqMessageVo));
            log.error(e2.getMessage(), e2);
            sendMqExceptionSaveLog(buildMqLog, mqMessageVo, e2);
            throw new IllegalArgumentException("MQ消息发送失败,消息ID[" + buildMqLog.getId() + "]");
        }
    }

    public void sendMqOrderMsg(MqMessageVo mqMessageVo) {
        Assert.hasText(mqMessageVo.getTag(), "MQ的tag不能为空!");
        sendMqOrderMsg(mqMessageVo, mqMessageVo.getTag(), 0L);
    }

    public void sendMqOrderMsg(MqMessageVo mqMessageVo, long j) {
        Assert.hasText(mqMessageVo.getTag(), "MQ的tag不能为空!");
        sendMqOrderMsg(mqMessageVo, mqMessageVo.getTag(), j);
    }

    public void sendMqOrderMsg(MqMessageVo mqMessageVo, String str) {
        sendMqOrderMsg(mqMessageVo, str, 0L);
    }

    public void sendMqOrderMsg(MqMessageVo mqMessageVo, String str, long j) {
        if (!StringUtils.hasText(mqMessageVo.getTopic())) {
            mqMessageVo.setTopic(RocketMqUtil.getOrderTopic());
        }
        Assert.hasText(str, "MQ顺序消息,唯一标记不能为空!(当无法确认唯一标记时,可传tag)");
        if (!mqMessageVo.getTopic().contains(RocketMqUtil.mqEnvironment())) {
            mqMessageVo.setTopic(mqMessageVo.getTopic() + RocketMqUtil.mqEnvironment());
        }
        RocketMqMessageLogDocument buildMqLog = buildMqLog(mqMessageVo);
        log.info("发送的MQ消息内容[{}]", JSON.toJSONString(mqMessageVo));
        try {
            this.mqCommonService.sendMqOrderMsg(buildMqLog, mqMessageVo, str, j);
        } catch (IllegalArgumentException e) {
            log.error("MQ消息发送失败[{}]", JSON.toJSONString(mqMessageVo));
            log.error(e.getMessage(), e);
            sendMqExceptionSaveLog(buildMqLog, mqMessageVo, e);
            throw new IllegalArgumentException(e.getMessage());
        } catch (Exception e2) {
            log.error("MQ消息发送失败[{}]", JSON.toJSONString(mqMessageVo));
            log.error(e2.getMessage(), e2);
            sendMqExceptionSaveLog(buildMqLog, mqMessageVo, e2);
            throw new IllegalArgumentException("MQ消息发送失败,消息ID[" + buildMqLog.getId() + "]");
        }
    }

    private void sendMqExceptionSaveLog(RocketMqMessageLogDocument rocketMqMessageLogDocument, MqMessageVo mqMessageVo, Exception exc) {
        log.error("MQ消息发送失败:mqMessageVo = {} >>>", JSON.toJSONString(mqMessageVo), exc);
        rocketMqMessageLogDocument.setSendLog(ExceptionUtils.getStackTrace(exc));
        if (!StringUtils.hasText(rocketMqMessageLogDocument.getSendLog())) {
            rocketMqMessageLogDocument.setSendLog("MQ消息发送失败");
        }
        rocketMqMessageLogDocument.setSendStatus(EnableStatusEnum.DISABLE.getCode());
        if (RocketMqUtil.isSaveLog()) {
            this.rocketMqMessageLogDocumentRepository.save(rocketMqMessageLogDocument);
        }
    }

    private RocketMqMessageLogDocument buildMqLog(MqMessageVo mqMessageVo) {
        if (this.buildMqInfoExtendService != null) {
            return this.buildMqInfoExtendService.buildMqLog(mqMessageVo);
        }
        if (!StringUtils.hasText(mqMessageVo.getCurrentAccount())) {
            mqMessageVo.setCurrentAccount(Objects.isNull(this.loginUserService) ? "" : this.loginUserService.getLoginAccountName());
        }
        if (!StringUtils.hasText(mqMessageVo.getAccountJson())) {
            AbstractCrmUserIdentity abstractLoginUser = this.loginUserService.getAbstractLoginUser();
            mqMessageVo.setAccountJson(Objects.isNull(abstractLoginUser) ? null : JSON.toJSONString(abstractLoginUser));
        }
        RocketMqMessageLogDocument buildLogVo = RocketMqMessageLogDocument.buildLogVo(mqMessageVo);
        RocketMqUtil.checkSendMsg(buildLogVo);
        if (RocketMqUtil.isSaveLog()) {
            try {
                buildLogVo = (RocketMqMessageLogDocument) this.rocketMqMessageLogDocumentRepository.save(buildLogVo);
                this.redisService.set(MqConstant.MQ_MESSAGE + buildLogVo.getId(), buildLogVo, 600L);
            } catch (IllegalStateException | NoNodeAvailableException e) {
                log.error(e.getMessage(), e);
                String buildErrorInfo = RocketMqUtil.buildErrorInfo();
                log.error("消费MQ消息，记录日志失败,elasticsearch异常!\n[{}]", buildErrorInfo);
                if (this.mqExceptionExtendService != null) {
                    this.mqExceptionExtendService.sendMqException(e);
                }
                throw new IllegalArgumentException(buildErrorInfo);
            }
        } else {
            buildLogVo.setId(UUID.randomUUID().toString().replace("-", ""));
        }
        mqMessageVo.setId(buildLogVo.getId());
        return buildLogVo;
    }

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = {RocketMqProducerEvent.class})
    public void onRocketMqProducerEvent(RocketMqProducerEvent rocketMqProducerEvent) {
        if (rocketMqProducerEvent == null || ((CollectionUtils.isEmpty(rocketMqProducerEvent.getGroupList()) && StringUtil.isBlank(rocketMqProducerEvent.getMsgBody())) || !StringUtils.hasText(rocketMqProducerEvent.getTag()))) {
            log.error("事务完成后发送MQ消息失败");
            return;
        }
        String topic = rocketMqProducerEvent.getTopic();
        String tag = rocketMqProducerEvent.getTag();
        boolean isOrder = rocketMqProducerEvent.isOrder();
        String shardingKey = rocketMqProducerEvent.getShardingKey();
        if (isOrder && !StringUtils.hasText(shardingKey)) {
            shardingKey = tag;
        }
        String currentAccount = rocketMqProducerEvent.getCurrentAccount();
        String accountJson = rocketMqProducerEvent.getAccountJson();
        String businessKey = rocketMqProducerEvent.getBusinessKey();
        String businessType = rocketMqProducerEvent.getBusinessType();
        String operationType = rocketMqProducerEvent.getOperationType();
        String remarks = rocketMqProducerEvent.getRemarks();
        if (!StringUtil.isNotBlank(rocketMqProducerEvent.getMsgBody())) {
            Integer groupSize = rocketMqProducerEvent.getGroupSize();
            if (groupSize == null) {
                groupSize = this.DEF_SIZE;
            }
            List partition = Lists.partition(rocketMqProducerEvent.getGroupList(), groupSize.intValue());
            AtomicInteger atomicInteger = new AtomicInteger(0);
            String str = shardingKey;
            partition.stream().filter(list -> {
                return !CollectionUtils.isEmpty(list);
            }).forEach(list2 -> {
                try {
                    Thread.sleep(150L);
                    atomicInteger.getAndIncrement();
                    MqMessageVo mqMessageVo = new MqMessageVo();
                    mqMessageVo.setTopic(topic);
                    mqMessageVo.setTag(tag);
                    mqMessageVo.setMsgBody(JSON.toJSONString(list2));
                    mqMessageVo.setCurrentAccount(currentAccount);
                    mqMessageVo.setAccountJson(accountJson);
                    mqMessageVo.setBusinessKey(businessKey);
                    mqMessageVo.setBusinessType(businessType);
                    mqMessageVo.setOperationType(operationType);
                    mqMessageVo.setRemarks(remarks);
                    if (isOrder) {
                        sendMqOrderMsg(mqMessageVo, str);
                    } else {
                        sendMqMsg(mqMessageVo);
                    }
                } catch (InterruptedException e) {
                    log.error("MQ异步发送消息失败 >>>");
                    log.error(e.getMessage(), e);
                }
            });
            return;
        }
        MqMessageVo mqMessageVo = new MqMessageVo();
        mqMessageVo.setTopic(topic);
        mqMessageVo.setTag(tag);
        mqMessageVo.setMsgBody(rocketMqProducerEvent.getMsgBody());
        mqMessageVo.setCurrentAccount(currentAccount);
        mqMessageVo.setAccountJson(accountJson);
        mqMessageVo.setBusinessKey(businessKey);
        mqMessageVo.setBusinessType(businessType);
        mqMessageVo.setOperationType(operationType);
        mqMessageVo.setRemarks(remarks);
        if (isOrder) {
            sendMqOrderMsg(mqMessageVo, shardingKey);
        } else {
            sendMqMsg(mqMessageVo);
        }
    }
}
