/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

public class AdaptiveBatchScheduler
extends DefaultScheduler
implements SchedulerOperations {
    private final DefaultLogicalTopology logicalTopology;
    private final VertexParallelismDecider vertexParallelismDecider;
    private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;

    AdaptiveBatchScheduler(Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, Consumer<ComponentMainThreadExecutor> startUpAction, ScheduledExecutor delayExecutor, ClassLoader userCodeLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionVertexOperations executionVertexOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time rpcTimeout, VertexParallelismDecider vertexParallelismDecider, int defaultMaxParallelism) throws Exception {
        super(log, jobGraph, ioExecutor, jobMasterConfiguration, startUpAction, delayExecutor, userCodeLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulingStrategyFactory, failoverStrategyFactory, restartBackoffTimeStrategy, executionVertexOperations, executionVertexVersioner, executionSlotAllocatorFactory, initializationTimestamp, mainThreadExecutor, jobStatusListener, executionGraphFactory, shuffleMaster, rpcTimeout, AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(jobGraph.getVertices(), defaultMaxParallelism));
        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
        this.vertexParallelismDecider = vertexParallelismDecider;
        this.forwardGroupsByJobVertexId = ForwardGroupComputeUtil.computeForwardGroups(jobGraph.getVerticesSortedTopologicallyFromSources(), this.getExecutionGraph()::getJobVertex);
    }

    @Override
    public void startSchedulingInternal() {
        this.initializeVerticesIfPossible();
        super.startSchedulingInternal();
    }

    @Override
    protected void updateTaskExecutionStateInternal(ExecutionVertexID executionVertexId, TaskExecutionStateTransition taskExecutionState) {
        this.initializeVerticesIfPossible();
        super.updateTaskExecutionStateInternal(executionVertexId, taskExecutionState);
    }

    private void initializeVerticesIfPossible() {
        ArrayList<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<ExecutionJobVertex>();
        try {
            long createTimestamp = System.currentTimeMillis();
            for (ExecutionJobVertex jobVertex : this.getExecutionGraph().getVerticesTopologically()) {
                this.maybeSetParallelism(jobVertex);
            }
            for (ExecutionJobVertex jobVertex : this.getExecutionGraph().getVerticesTopologically()) {
                if (!this.canInitialize(jobVertex)) continue;
                this.getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp);
                newlyInitializedJobVertices.add(jobVertex);
            }
        }
        catch (JobException ex) {
            this.log.error("Unexpected error occurred when initializing ExecutionJobVertex", (Throwable)ex);
            this.failJob(ex, System.currentTimeMillis());
        }
        if (newlyInitializedJobVertices.size() > 0) {
            this.updateTopology(newlyInitializedJobVertices);
        }
    }

    private void maybeSetParallelism(ExecutionJobVertex jobVertex) {
        int parallelism;
        if (jobVertex.isParallelismDecided()) {
            return;
        }
        Optional<List<BlockingResultInfo>> consumedResultsInfo = this.tryGetConsumedResultsInfo(jobVertex);
        if (!consumedResultsInfo.isPresent()) {
            return;
        }
        ForwardGroup forwardGroup = this.forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId());
        if (forwardGroup != null && forwardGroup.isParallelismDecided()) {
            parallelism = forwardGroup.getParallelism();
            this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", new Object[]{jobVertex.getName(), jobVertex.getJobVertexId(), parallelism});
        } else {
            parallelism = this.vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo.get());
            if (forwardGroup != null) {
                forwardGroup.setParallelism(parallelism);
            }
            this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {}.", new Object[]{jobVertex.getName(), jobVertex.getJobVertexId(), parallelism});
        }
        this.changeJobVertexParallelism(jobVertex, parallelism);
    }

    private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parallelism) {
        jobVertex.getJobVertex().setParallelism(parallelism);
        try {
            this.getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(this.getJobGraph()));
        }
        catch (Throwable t) {
            this.log.warn("Cannot create JSON plan for job", t);
            this.getExecutionGraph().setJsonPlan("{}");
        }
        jobVertex.setParallelism(parallelism);
    }

    private Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo(ExecutionJobVertex jobVertex) {
        ArrayList<BlockingResultInfo> consumableResultInfo = new ArrayList<BlockingResultInfo>();
        DefaultLogicalVertex logicalVertex = this.logicalTopology.getVertex(jobVertex.getJobVertexId());
        Iterable<DefaultLogicalResult> consumedResults = logicalVertex.getConsumedResults();
        for (DefaultLogicalResult consumedResult : consumedResults) {
            ExecutionJobVertex producerVertex = this.getExecutionJobVertex(consumedResult.getProducer().getId());
            if (producerVertex.isFinished()) {
                IntermediateResult intermediateResult = this.getExecutionGraph().getAllIntermediateResults().get(consumedResult.getId());
                Preconditions.checkNotNull(intermediateResult);
                consumableResultInfo.add(BlockingResultInfo.createFromIntermediateResult(intermediateResult));
                continue;
            }
            return Optional.empty();
        }
        return Optional.of(consumableResultInfo);
    }

    private boolean canInitialize(ExecutionJobVertex jobVertex) {
        if (jobVertex.isInitialized() || !jobVertex.isParallelismDecided()) {
            return false;
        }
        for (JobEdge inputEdge : jobVertex.getJobVertex().getInputs()) {
            ExecutionJobVertex producerVertex = this.getExecutionGraph().getJobVertex(inputEdge.getSource().getProducer().getID());
            Preconditions.checkNotNull(producerVertex);
            if (producerVertex.isInitialized()) continue;
            return false;
        }
        return true;
    }

    private void updateTopology(List<ExecutionJobVertex> newlyInitializedJobVertices) {
        for (ExecutionJobVertex vertex : newlyInitializedJobVertices) {
            this.initializeOperatorCoordinatorsFor(vertex);
        }
        this.getExecutionGraph().notifyNewlyInitializedJobVertices(newlyInitializedJobVertices);
    }

    private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
        this.operatorCoordinatorHandler.registerAndStartNewCoordinators(vertex.getOperatorCoordinators(), this.getMainThreadExecutor());
    }

    @VisibleForTesting
    public static VertexParallelismStore computeVertexParallelismStoreForDynamicGraph(Iterable<JobVertex> vertices, int defaultMaxParallelism) {
        return AdaptiveBatchScheduler.computeVertexParallelismStore(vertices, v -> {
            if (v.getParallelism() > 0) {
                return AdaptiveBatchScheduler.getDefaultMaxParallelism(v);
            }
            return defaultMaxParallelism;
        }, Function.identity());
    }
}

