package com.bizunited.platform.tcc.server.service.internal;

import com.alibaba.fastjson.JSON;
import com.bizunited.platform.tcc.common.dto.NetworkRequest;
import com.bizunited.platform.tcc.common.joinpoint.JoinPointerResponse;
import com.bizunited.platform.tcc.common.joinpoint.notify.AbstractNotify;
import com.bizunited.platform.tcc.common.joinpoint.notify.Notify;
import com.bizunited.platform.tcc.common.network.PriorityNotifyBlockingMapQueue;
import com.bizunited.platform.tcc.common.pojo.ClientActivityHealth;
import com.bizunited.platform.tcc.common.service.NotifyService;
import com.bizunited.platform.tcc.server.service.ClientActivityHealthService;
import io.netty.channel.Channel;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;

/* loaded from: input_file:com/bizunited/platform/tcc/server/service/internal/NotifyServiceImpl.class */
public class NotifyServiceImpl implements NotifyService {
    private ClientActivityHealthService clientActivityHealthService;
    private PriorityNotifyBlockingMapQueue<String, AbstractNotify, JoinPointerResponse> notifyMappingQueue;

    public NotifyServiceImpl() {
    }

    public NotifyServiceImpl(ClientActivityHealthService clientActivityHealthService, PriorityNotifyBlockingMapQueue<String, AbstractNotify, JoinPointerResponse> priorityNotifyBlockingMapQueue) {
        this.clientActivityHealthService = clientActivityHealthService;
        this.notifyMappingQueue = priorityNotifyBlockingMapQueue;
    }

    public void notify(Notify notify, boolean z) {
        ClientActivityHealth validateNoify = validateNoify(notify, z);
        this.notifyMappingQueue.offer(notify);
        sendRequest(validateNoify, notify);
    }

    private ClientActivityHealth validateNoify(Notify notify, boolean z) {
        Set<ClientActivityHealth> findByServerGroup;
        Validate.notNull(notify, "既有的通知信息必须传入，请检查!!", new Object[0]);
        String targetIp = notify.getTargetIp();
        Validate.notBlank(targetIp, "特定客户端ip信息必须传入!!", new Object[0]);
        Integer targetPort = notify.getTargetPort();
        Validate.notNull(targetPort, "特定客户端端口信息必须传入!!", new Object[0]);
        Validate.notBlank(notify.getRequestId(), "通知/请求id信息必须传入!!", new Object[0]);
        String serviceGroup = notify.getServiceGroup();
        if (serviceGroup == null) {
            serviceGroup = "";
            notify.setServiceGroup(serviceGroup);
        }
        if (notify.getRequestTime() == null) {
            notify.setRequestTime(LocalDateTime.now());
        }
        Validate.notBlank(notify.getRequestUri(), "向客户端主动通知/请求的资源路径，必须传入!!", new Object[0]);
        ClientActivityHealth findByIpAndPort = this.clientActivityHealthService.findByIpAndPort(targetIp, targetPort);
        if (findByIpAndPort == null || findByIpAndPort.getData() == null || !((Channel) findByIpAndPort.getData()).isActive()) {
            findByIpAndPort = null;
            if (!StringUtils.isBlank(serviceGroup) && z && (findByServerGroup = this.clientActivityHealthService.findByServerGroup(serviceGroup)) != null) {
                Iterator<ClientActivityHealth> it = findByServerGroup.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    ClientActivityHealth next = it.next();
                    if (next.getData() != null && ((Channel) next.getData()).isActive()) {
                        findByIpAndPort = next;
                        break;
                    }
                }
            }
        }
        Validate.notNull(findByIpAndPort, "指定的客户端未被找到，最可能的情况是，指定客户端的租约信息已经失效，且没有服务组或服务组不存在可用的其它客户端!!", new Object[0]);
        return findByIpAndPort;
    }

    private void sendRequest(ClientActivityHealth clientActivityHealth, Notify notify) {
        Channel channel = (Channel) clientActivityHealth.getData();
        NetworkRequest networkRequest = new NetworkRequest();
        networkRequest.setData(notify.getRequestData());
        networkRequest.setRequestId(notify.getRequestId());
        networkRequest.setRequestTime(notify.getRequestTime());
        networkRequest.setRequestUri(notify.getRequestUri());
        networkRequest.setResponseUri(notify.getResponseUri() == null ? "" : notify.getResponseUri());
        channel.writeAndFlush(JSON.toJSONString(networkRequest) + System.lineSeparator());
    }

    public void notify(Notify[] notifyArr, boolean z) {
        Validate.notNull(notifyArr, "进行通知组发送时，通知组必须传入!!", new Object[0]);
        Validate.isTrue(notifyArr.length > 0, "进行通知组发送时，通知组至少有一个通知信息!!", new Object[0]);
        ArrayList arrayList = new ArrayList(notifyArr.length);
        for (Notify notify : notifyArr) {
            arrayList.add(validateNoify(notify, z));
        }
        for (int i = 0; i < notifyArr.length; i++) {
            Notify notify2 = notifyArr[i];
            this.notifyMappingQueue.offer(notify2);
            sendRequest((ClientActivityHealth) arrayList.get(i), notify2);
        }
    }

    public void setClientActivityHealthService(ClientActivityHealthService clientActivityHealthService) {
        this.clientActivityHealthService = clientActivityHealthService;
    }
}
