package com.biz.crm.business.common.rocketmq.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import com.biz.crm.business.common.base.util.DateUtil;
import com.biz.crm.business.common.rocketmq.constant.MqConstant;
import com.biz.crm.business.common.rocketmq.document.RocketMqMessageLogDocument;
import com.biz.crm.business.common.rocketmq.repository.RocketMqMessageLogDocumentRepository;
import com.biz.crm.business.common.rocketmq.util.RocketMqUtil;
import com.biz.crm.business.common.rocketmq.vo.MqMessageVo;
import com.biz.crm.business.common.rocketmq.vo.MqUserDetailVo;
import com.biz.crm.business.common.sdk.enums.EnableStatusEnum;
import com.biz.crm.business.common.sdk.service.LoginUserService;
import com.biz.crm.business.common.sdk.service.RedisService;
import com.bizunited.nebula.common.service.redis.RedisMutexService;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/biz/crm/business/common/rocketmq/service/AbstractRocketMqConsumer.class */
public abstract class AbstractRocketMqConsumer implements RocketMQListener<MqMessageVo>, MessageListener, MessageOrderListener {
    private static final Logger log = LoggerFactory.getLogger(AbstractRocketMqConsumer.class);

    @Autowired(required = false)
    private RocketMqMessageLogDocumentRepository rocketMqMessageLogDocumentRepository;

    @Autowired(required = false)
    private RedisService redisService;

    @Autowired(required = false)
    private RedisMutexService redisMutexService;

    @Autowired(required = false)
    private MqExceptionExtendService mqExceptionExtendService;

    @Autowired(required = false)
    private LoginUserService loginUserService;

    public void onMessage(MqMessageVo mqMessageVo) {
        String stackTrace;
        log.info("接收到的MQ消息内容{}", mqMessageVo);
        if (null == mqMessageVo) {
            log.error("消费MQ消息MqMessageVo为空，忽略本次消费");
            throw new IllegalArgumentException("消费MQ消息MqMessageVo为空，忽略本次消费");
        }
        Exception exc = null;
        String jSONString = JSON.toJSONString(mqMessageVo);
        String id = mqMessageVo.getId();
        if (StringUtils.isEmpty(id)) {
            log.error("消费MQ消息记录日志失败：message = {}", jSONString);
            return;
        }
        String str = MqConstant.MQ_REPEAT + id;
        if (!mqMessageVo.isRepeatConsumer() && this.redisService.hasKey(str).booleanValue()) {
            log.error("MQ消息内容已被成功消费,不可被重复成功消费：message = {}", jSONString);
            return;
        }
        Date date = new Date();
        String format = RocketMqUtil.DATETIME_FORMAT.format(date);
        String code = EnableStatusEnum.DISABLE.getCode();
        setUserInfo(mqMessageVo);
        try {
            stackTrace = StringUtils.isNotEmpty(mqMessageVo.getMsgBody()) ? JSON.toJSONString(handleMessage(mqMessageVo)) : "消息内容为空跳过业务处理!";
            code = EnableStatusEnum.ENABLE.getCode();
        } catch (Exception e) {
            log.error("消费MQ消息失败：message = {} >>>", jSONString, e);
            stackTrace = ExceptionUtils.getStackTrace(e);
            exc = e;
        }
        saveMqConsumerLog(mqMessageVo, format, stackTrace, code);
        Date date2 = new Date();
        log.info("MQ消息[{}] 开始消费时间[{}],结束消费时间[{}],耗时[{}]", new Object[]{id, format, RocketMqUtil.DATETIME_FORMAT.format(date2), DateUtil.millisecondToStrVague(date2.getTime() - date.getTime())});
        if (null != exc) {
            throw new IllegalArgumentException("消费MQ消息失败：message = " + jSONString, exc);
        }
        this.redisMutexService.getAndIncrement(MqConstant.MQ_REPEAT + id, 0L, RocketMqUtil.getLockHour().intValue(), TimeUnit.HOURS);
    }

    private void setUserInfo(MqMessageVo mqMessageVo) {
        mqMessageVo.getCurrentAccount();
        String accountJson = mqMessageVo.getAccountJson();
        if (StringUtils.isNotEmpty(accountJson)) {
            this.loginUserService.refreshAuthentication((MqUserDetailVo) JSON.parseObject(accountJson, MqUserDetailVo.class));
        }
    }

    private void saveMqConsumerLog(MqMessageVo mqMessageVo, String str, String str2, String str3) {
        RocketMqMessageLogDocument buildLogVo;
        if (mqMessageVo == null || !RocketMqUtil.isSaveLog()) {
            return;
        }
        String id = mqMessageVo.getId();
        String jSONString = JSON.toJSONString(mqMessageVo);
        String str4 = MqConstant.MQ_MESSAGE + id;
        Object obj = this.redisService.get(str4);
        LocalDateTime now = LocalDateTime.now();
        if (obj != null) {
            buildLogVo = obj instanceof RocketMqMessageLogDocument ? (RocketMqMessageLogDocument) obj : (RocketMqMessageLogDocument) JSONObject.parseObject(JSON.toJSONString(obj), RocketMqMessageLogDocument.class);
        } else {
            Optional findById = this.rocketMqMessageLogDocumentRepository.findById(id);
            if (findById.isPresent()) {
                buildLogVo = (RocketMqMessageLogDocument) findById.get();
            } else {
                buildLogVo = RocketMqMessageLogDocument.buildLogVo(mqMessageVo);
                buildLogVo.setEndStatus(EnableStatusEnum.ENABLE.getCode());
            }
        }
        if (buildLogVo == null) {
            buildLogVo = RocketMqMessageLogDocument.buildLogVo(mqMessageVo);
            buildLogVo.setRemarks("未获取到MQ发送日志记录,消费时,重新构建");
        }
        if (StringUtils.isEmpty(buildLogVo.getTopic())) {
            buildLogVo.setTopic(RocketMqUtil.getTopic());
        }
        buildLogVo.setCallbackBegin(str);
        buildLogVo.setUpdateDate(now.format(RocketMqUtil.YYYY_MM_DD));
        buildLogVo.setUpdateDateSecond(now.format(RocketMqUtil.HH_MM_SS));
        buildLogVo.setUpdateDateAll(now.format(RocketMqUtil.YYYY_MM_DD_HH_MM_SS));
        buildLogVo.setCallbackLog(str2);
        buildLogVo.setEndStatus(str3);
        try {
            buildLogVo.setCallbackEnd(now.format(RocketMqUtil.YYYY_MM_DD_HH_MM_SS));
            if (Objects.nonNull(buildLogVo.getCreateDateSort())) {
                buildLogVo.setConsumeTime(Long.valueOf(System.currentTimeMillis() - buildLogVo.getCreateDateSort().longValue()));
            }
            buildLogVo.setId(id);
            this.rocketMqMessageLogDocumentRepository.save(buildLogVo);
            this.redisService.del(str4);
        } catch (IllegalStateException | NoNodeAvailableException e) {
            log.error("message = {} >>>", jSONString, e);
            log.error("消费MQ消息，记录日志失败,elasticsearch异常!\n[{}]", RocketMqUtil.buildErrorInfo());
            if (this.mqExceptionExtendService != null) {
                this.mqExceptionExtendService.consumerMqException(e);
            }
        } catch (Exception e2) {
            log.error("message = {} >>>", jSONString, e2);
            log.error("消费MQ消息，记录日志失败,elasticsearch异常!");
            if (this.mqExceptionExtendService != null) {
                this.mqExceptionExtendService.consumerMqException(e2);
            }
        }
    }

    public Action consume(Message message, ConsumeContext consumeContext) {
        if (null == message) {
            log.error("普通消费MQ消息：MqMessageVo 为空，忽略本次消费");
            throw new IllegalArgumentException("消费MQ消息：MqMessageVo 为空，忽略本次消费");
        }
        if (message.getBody() == null) {
            log.error("普通消费MQ消息：MqMessageVo 为空，忽略本次消费");
            throw new IllegalArgumentException("顺序消费MQ消息：MqMessageVo 为空，忽略本次消费");
        }
        onMessage((MqMessageVo) JSONObject.parseObject(message.getBody(), MqMessageVo.class, new Feature[0]));
        return Action.CommitMessage;
    }

    public OrderAction consume(Message message, ConsumeOrderContext consumeOrderContext) {
        if (null == message) {
            log.error("顺序消费MQ消息：MqMessageVo 为空，忽略本次消费");
            throw new IllegalArgumentException("消费MQ消息：MqMessageVo 为空，忽略本次消费");
        }
        if (message.getBody() == null) {
            log.error("顺序消费MQ消息：MqMessageVo 为空，忽略本次消费");
            throw new IllegalArgumentException("顺序消费MQ消息：MqMessageVo 为空，忽略本次消费");
        }
        onMessage((MqMessageVo) JSONObject.parseObject(message.getBody(), MqMessageVo.class, new Feature[0]));
        return OrderAction.Success;
    }

    protected abstract Object handleMessage(MqMessageVo mqMessageVo);
}
