/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.core.io.SimpleVersionedSerializer;

public class SourceCoordinatorSerdeUtils {
    private static final int CURRENT_VERSION = 0;

    private SourceCoordinatorSerdeUtils() {
    }

    static void writeCoordinatorSerdeVersion(DataOutputStream out) throws IOException {
        out.writeInt(0);
    }

    static void readAndVerifyCoordinatorSerdeVersion(DataInputStream in) throws IOException {
        int version = in.readInt();
        if (version > 0) {
            throw new IOException("Unsupported source coordinator serde version " + version);
        }
    }

    static void writeRegisteredReaders(Map<Integer, ReaderInfo> registeredReaders, DataOutputStream out) throws IOException {
        out.writeInt(registeredReaders.size());
        for (ReaderInfo info : registeredReaders.values()) {
            SourceCoordinatorSerdeUtils.writeReaderInfo(info, out);
        }
    }

    static Map<Integer, ReaderInfo> readRegisteredReaders(DataInputStream in) throws IOException {
        int numReaders = in.readInt();
        HashMap<Integer, ReaderInfo> registeredReaders = new HashMap<Integer, ReaderInfo>();
        for (int i = 0; i < numReaders; ++i) {
            ReaderInfo info = SourceCoordinatorSerdeUtils.readReaderInfo(in);
            registeredReaders.put(info.getSubtaskId(), info);
        }
        return registeredReaders;
    }

    static <SplitT> void writeAssignmentsByCheckpointId(Map<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentByCheckpointIds, SimpleVersionedSerializer<SplitT> splitSerializer, DataOutputStream out) throws IOException {
        out.writeInt(splitSerializer.getVersion());
        out.writeInt(assignmentByCheckpointIds.size());
        for (Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>> assignments : assignmentByCheckpointIds.entrySet()) {
            long checkpointId = assignments.getKey();
            out.writeLong(checkpointId);
            int numSubtasks = assignments.getValue().size();
            out.writeInt(numSubtasks);
            for (Map.Entry<Integer, LinkedHashSet<SplitT>> assignment : assignments.getValue().entrySet()) {
                int subtaskId = assignment.getKey();
                out.writeInt(subtaskId);
                int numAssignedSplits = assignment.getValue().size();
                out.writeInt(numAssignedSplits);
                for (Object split : assignment.getValue()) {
                    byte[] serializedSplit = splitSerializer.serialize(split);
                    out.writeInt(serializedSplit.length);
                    out.write(serializedSplit);
                }
            }
        }
    }

    static <SplitT> Map<Long, Map<Integer, LinkedHashSet<SplitT>>> readAssignmentsByCheckpointId(DataInputStream in, SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException {
        int splitSerializerVersion = in.readInt();
        int numCheckpoints = in.readInt();
        HashMap<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointIds = new HashMap<Long, Map<Integer, LinkedHashSet<SplitT>>>(numCheckpoints);
        for (int i = 0; i < numCheckpoints; ++i) {
            long checkpointId = in.readLong();
            int numSubtasks = in.readInt();
            HashMap assignments = new HashMap();
            assignmentsByCheckpointIds.put(checkpointId, assignments);
            for (int j = 0; j < numSubtasks; ++j) {
                int subtaskId = in.readInt();
                int numAssignedSplits = in.readInt();
                LinkedHashSet<Object> splits = new LinkedHashSet<Object>(numAssignedSplits);
                assignments.put(subtaskId, splits);
                for (int k = 0; k < numAssignedSplits; ++k) {
                    int serializedSplitSize = in.readInt();
                    byte[] serializedSplit = SourceCoordinatorSerdeUtils.readBytes(in, serializedSplitSize);
                    Object split = splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
                    splits.add(split);
                }
            }
        }
        return assignmentsByCheckpointIds;
    }

    static byte[] readBytes(DataInputStream in, int size) throws IOException {
        int read;
        byte[] bytes = new byte[size];
        for (int off = 0; off < size; off += read) {
            read = in.read(bytes, off, size - off);
            if (read >= 0) continue;
            throw new BufferUnderflowException();
        }
        return bytes;
    }

    private static void writeReaderInfo(ReaderInfo readerInfo, DataOutputStream out) throws IOException {
        out.writeInt(readerInfo.getSubtaskId());
        out.writeUTF(readerInfo.getLocation());
    }

    private static ReaderInfo readReaderInfo(DataInputStream in) throws IOException {
        int subtaskId = in.readInt();
        String location = in.readUTF();
        return new ReaderInfo(subtaskId, location);
    }
}

