package com.biz.crm.cps.external.mdm.local.service.notifier;

import com.alibaba.fastjson.JSON;
import com.biz.crm.base.BusinessException;
import com.biz.crm.cps.external.mdm.sdk.event.ParticipatorMdmTagEventListener;
import com.biz.crm.cps.external.mdm.sdk.vo.CustomerTagRefreshMdmVo;
import com.biz.crm.cps.external.mdm.sdk.vo.RocketMQMessageBody;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

@RocketMQMessageListener(topic = "${rocketmq.topic}${rocketmq.environment-variable}", selectorExpression = "CUSTOMER_TAG_REFRESH", consumerGroup = "CUSTOMER_TAG_REFRESH${rocketmq.environment-variable}")
@Component
/* loaded from: input_file:com/biz/crm/cps/external/mdm/local/service/notifier/CustomerTagRefreshConsumer.class */
public class CustomerTagRefreshConsumer implements RocketMQListener<RocketMQMessageBody> {
    private static final Logger log = LoggerFactory.getLogger(CustomerTagRefreshConsumer.class);

    @Autowired(required = false)
    private List<ParticipatorMdmTagEventListener> listeners;

    @Transactional
    public void onMessage(RocketMQMessageBody rocketMQMessageBody) {
        if (null == rocketMQMessageBody) {
            log.warn("消费MQ消息：RocketMQMessageBody 为空，忽略本次消费");
            throw new BusinessException("消费MQ消息：RocketMQMessageBody 为空，忽略本次消费");
        }
        String jSONString = JSON.toJSONString(rocketMQMessageBody);
        if (StringUtils.isBlank(rocketMQMessageBody.getId())) {
            log.warn("消费MQ消息，记录日志失败：message = {}", jSONString);
            throw new BusinessException("消费MQ消息，失败：message = " + jSONString, new String[]{"记录日志失败"});
        }
        try {
            handleMessage(rocketMQMessageBody);
        } catch (InterruptedException e) {
            log.error("消费MQ消息，记录日志失败：message = {} >>>", jSONString, e);
        }
    }

    private void handleMessage(RocketMQMessageBody rocketMQMessageBody) throws InterruptedException {
        log.info("客户标签信息更新开始-------------");
        String msgBody = rocketMQMessageBody.getMsgBody();
        if (StringUtils.isEmpty(msgBody)) {
            return;
        }
        log.info("客户标签信息：{}", msgBody);
        CustomerTagRefreshMdmVo customerTagRefreshMdmVo = (CustomerTagRefreshMdmVo) JSON.parseObject(msgBody, CustomerTagRefreshMdmVo.class);
        if (customerTagRefreshMdmVo == null) {
            log.info("客户标签信息为空");
        } else if (CollectionUtils.isEmpty(this.listeners)) {
            log.info("客户标签信息监听为空");
        } else {
            this.listeners.forEach(participatorMdmTagEventListener -> {
                participatorMdmTagEventListener.onRefresh(customerTagRefreshMdmVo);
            });
            log.info("客户标签信息更新结束-------------");
        }
    }
}
