/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
import org.apache.flink.util.Preconditions;

@Internal
public class CommittableCollector<CommT> {
    private static final long EOI = Long.MAX_VALUE;
    private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>> checkpointCommittables;
    private final int subtaskId;
    private final int numberOfSubtasks;

    public CommittableCollector(int subtaskId, int numberOfSubtasks) {
        this.subtaskId = subtaskId;
        this.numberOfSubtasks = numberOfSubtasks;
        this.checkpointCommittables = new TreeMap<Long, CheckpointCommittableManagerImpl<CommT>>();
    }

    CommittableCollector(Map<Long, CheckpointCommittableManagerImpl<CommT>> checkpointCommittables, int subtaskId, int numberOfSubtasks) {
        this.checkpointCommittables = new TreeMap<Long, CheckpointCommittableManagerImpl<CommT>>(Preconditions.checkNotNull(checkpointCommittables));
        this.subtaskId = subtaskId;
        this.numberOfSubtasks = numberOfSubtasks;
    }

    public static <CommT> CommittableCollector<CommT> of(RuntimeContext context) {
        return new CommittableCollector<CommT>(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
    }

    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> committables) {
        CommittableCollector committableCollector = new CommittableCollector(0, 1);
        CommittableSummary summary = new CommittableSummary(0, 1, 1L, committables.size(), committables.size(), 0);
        super.addSummary(summary);
        committables.forEach(c -> {
            CommittableWithLineage<Object> committableWithLineage = new CommittableWithLineage<Object>(c, 1L, 0);
            committableCollector.addCommittable(committableWithLineage);
        });
        return committableCollector;
    }

    public void addMessage(CommittableMessage<CommT> message) {
        if (message instanceof CommittableSummary) {
            this.addSummary((CommittableSummary)message);
        } else if (message instanceof CommittableWithLineage) {
            this.addCommittable((CommittableWithLineage)message);
        }
    }

    public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCommittablesUpTo(long checkpointId) {
        Collection checkpoints = this.checkpointCommittables.headMap(checkpointId, true).values();
        checkpoints.removeIf(CheckpointCommittableManagerImpl::isFinished);
        return checkpoints;
    }

    @Nullable
    public CommittableManager<CommT> getEndOfInputCommittable() {
        return (CommittableManager)this.checkpointCommittables.get(Long.MAX_VALUE);
    }

    public boolean isFinished() {
        return this.checkpointCommittables.values().stream().allMatch(CheckpointCommittableManagerImpl::isFinished);
    }

    public void merge(CommittableCollector<CommT> cc) {
        for (Map.Entry checkpointEntry : cc.checkpointCommittables.entrySet()) {
            this.checkpointCommittables.merge((Long)checkpointEntry.getKey(), (CheckpointCommittableManagerImpl<CommT>)checkpointEntry.getValue(), (BiFunction<CheckpointCommittableManagerImpl<CommT>, CheckpointCommittableManagerImpl<CommT>, CheckpointCommittableManagerImpl<CommT>>)((BiFunction<CheckpointCommittableManagerImpl, CheckpointCommittableManagerImpl, CheckpointCommittableManagerImpl>)CheckpointCommittableManagerImpl::merge));
        }
    }

    public int getNumberOfSubtasks() {
        return this.numberOfSubtasks;
    }

    public int getSubtaskId() {
        return this.subtaskId;
    }

    public CommittableCollector<CommT> copy() {
        return new CommittableCollector<CommT>(this.checkpointCommittables.entrySet().stream().map(e -> Tuple2.of(e.getKey(), ((CheckpointCommittableManagerImpl)e.getValue()).copy())).collect(Collectors.toMap(t -> (Long)t.f0, t -> (CheckpointCommittableManagerImpl)t.f1)), this.subtaskId, this.numberOfSubtasks);
    }

    Collection<CheckpointCommittableManagerImpl<CommT>> getCheckpointCommittables() {
        return this.checkpointCommittables.values();
    }

    private void addSummary(CommittableSummary<CommT> summary) {
        this.checkpointCommittables.computeIfAbsent(summary.getCheckpointId().orElse(Long.MAX_VALUE), key -> new CheckpointCommittableManagerImpl(this.subtaskId, this.numberOfSubtasks, summary.getCheckpointId().orElse(Long.MAX_VALUE))).upsertSummary(summary);
    }

    private void addCommittable(CommittableWithLineage<CommT> committable) {
        this.getCheckpointCommittables(committable).addCommittable(committable);
    }

    private CheckpointCommittableManagerImpl<CommT> getCheckpointCommittables(CommittableMessage<CommT> committable) {
        CheckpointCommittableManagerImpl committables = (CheckpointCommittableManagerImpl)this.checkpointCommittables.get(committable.getCheckpointId().orElse(Long.MAX_VALUE));
        return Preconditions.checkNotNull(committables, "Unknown checkpoint for %s", committable);
    }
}

