/*
 * Decompiled with CFR 0.152.
 */
package com.huawei.middleware.dtm.client.client.Sender;

import com.google.protobuf.InvalidProtocolBufferException;
import com.huawei.middleware.dtm.client.DTMClientConfig;
import com.huawei.middleware.dtm.client.client.Sender.IMessageSender;
import com.huawei.middleware.dtm.client.client.lb.ILoadBalance;
import com.huawei.middleware.dtm.common.configuration.DTMClientCenterConfig;
import com.huawei.middleware.dtm.common.exception.DTMClientException;
import com.huawei.middleware.dtm.common.exception.GlobalAdvanceException;
import com.huawei.middleware.dtm.common.exception.RPCException;
import com.huawei.middleware.dtm.common.exception.StackTraceUtil;
import com.huawei.middleware.dtm.common.logger.DTMLoggerFactory;
import com.huawei.middleware.dtm.common.protocol.message.MessageWrapper;
import com.huawei.middleware.dtm.common.protocol.message.response.TransactionIdResponse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import org.slf4j.Logger;

public abstract class AbstractMessageSender
implements IMessageSender,
ILoadBalance {
    private static final Logger LOGGER = DTMLoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Override
    public MessageWrapper sendMessage(String channelKey, MessageWrapper message) throws RPCException, DTMClientException {
        if (!DTMClientConfig.RUNNING_CHANNELS.containsKey(channelKey)) {
            throw new DTMClientException("Current server channel of " + channelKey + " is not active.");
        }
        return DTMClientConfig.INVOKER_SENDER.syncCall(message, DTMClientConfig.RUNNING_CHANNELS.get(channelKey));
    }

    @Override
    public TransactionIdResponse sendGlobalStartMessage(MessageWrapper message) throws DTMClientException, InvalidProtocolBufferException {
        Channel channel = this.chooseNext();
        HashSet<ChannelId> channelIdSet = new HashSet<ChannelId>();
        channelIdSet.add(channel.id());
        TransactionIdResponse result = null;
        while (true) {
            try {
                result = TransactionIdResponse.parseFrom((byte[])DTMClientConfig.INVOKER_SENDER.syncCall(message, channel).getMessageBytes());
                if (result.getStatusCode() < 800) {
                    return result;
                }
                LOGGER.warn("Global start event can redirect since status: {},warn message:{}", (Object)result.getStatusCode(), (Object)result.getMessage());
            }
            catch (RPCException e) {
                LOGGER.error("Run global start event failed,choose next server server.Error message: {}", (Object)StackTraceUtil.stackTrace((Throwable)e));
            }
            channel = this.chooseNext();
            if (channelIdSet.contains(channel.id())) {
                if (null != result) {
                    throw new DTMClientException("No available server. please check: " + result.getMessage());
                }
                throw new DTMClientException("No available server. please check.");
            }
            channelIdSet.add(channel.id());
        }
    }

    @Override
    public MessageWrapper sendGlobalEndMessage(String channelKey, MessageWrapper message, boolean isGlobalCommit) throws GlobalAdvanceException, DTMClientException {
        Channel channel = DTMClientConfig.RUNNING_CHANNELS.get(channelKey);
        if (channel == null) {
            channel = this.chooseNext();
        }
        HashSet<ChannelId> channelIdSet = new HashSet<ChannelId>();
        channelIdSet.add(channel.id());
        while (true) {
            try {
                return DTMClientConfig.INVOKER_SENDER.syncCall(message, channel, DTMClientCenterConfig.GLOBAL_END_TIMEOUT);
            }
            catch (RPCException e) {
                LOGGER.error("Run global " + (isGlobalCommit ? "commit" : "rollback") + " event failed,choose next server server.Error message: {}", (Object)e.getMessage());
                channel = this.chooseNext();
                if (channelIdSet.contains(channel.id())) {
                    LOGGER.error("No available server. please check.");
                    throw new GlobalAdvanceException("No available server,send global " + (isGlobalCommit ? "commit" : "rollback") + " event failed. please check.");
                }
                channelIdSet.add(channel.id());
                LOGGER.debug("Choose another channel {} to send global {} event {}.", new Object[]{channel, isGlobalCommit ? "commit" : "rollback", message});
                continue;
            }
            break;
        }
    }
}

