/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.operation;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.CompactUnit;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactTask;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;

public class KeyValueFileStoreWrite
extends AbstractFileStoreWrite<KeyValue> {
    private final DataFileReader.Factory dataFileReaderFactory;
    private final DataFileWriter.Factory dataFileWriterFactory;
    private final Supplier<Comparator<RowData>> keyComparatorSupplier;
    private final MergeFunction mergeFunction;
    private final CoreOptions options;

    public KeyValueFileStoreWrite(SchemaManager schemaManager, long schemaId, RowType keyType, RowType valueType, Supplier<Comparator<RowData>> keyComparatorSupplier, MergeFunction mergeFunction, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options) {
        super(snapshotManager, scan);
        this.dataFileReaderFactory = new DataFileReader.Factory(schemaManager, schemaId, keyType, valueType, options.fileFormat(), pathFactory);
        this.dataFileWriterFactory = new DataFileWriter.Factory(schemaId, keyType, valueType, options.fileFormat(), pathFactory, options.targetFileSize());
        this.keyComparatorSupplier = keyComparatorSupplier;
        this.mergeFunction = mergeFunction;
        this.options = options;
    }

    @Override
    public RecordWriter<KeyValue> createWriter(BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
        return this.createMergeTreeWriter(partition, bucket, this.scanExistingFileMetas(partition, bucket), compactExecutor);
    }

    @Override
    public RecordWriter<KeyValue> createEmptyWriter(BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
        return this.createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
    }

    @Override
    public Callable<CompactResult> createCompactWriter(BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> compactFiles) {
        if (compactFiles == null) {
            compactFiles = this.scanExistingFileMetas(partition, bucket);
        }
        Comparator<RowData> keyComparator = this.keyComparatorSupplier.get();
        CompactRewriter rewriter = this.compactRewriter(partition, bucket, keyComparator);
        Levels levels = new Levels(keyComparator, compactFiles, this.options.numLevels());
        CompactUnit unit = CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, levels.levelSortedRuns());
        return new MergeTreeCompactTask(keyComparator, this.options.targetFileSize(), rewriter, unit, true);
    }

    private MergeTreeWriter createMergeTreeWriter(BinaryRowData partition, int bucket, List<DataFileMeta> restoreFiles, ExecutorService compactExecutor) {
        DataFileWriter dataFileWriter = this.dataFileWriterFactory.create(partition, bucket);
        Comparator<RowData> keyComparator = this.keyComparatorSupplier.get();
        Levels levels = new Levels(keyComparator, restoreFiles, this.options.numLevels());
        return new MergeTreeWriter(dataFileWriter.keyType(), dataFileWriter.valueType(), this.createCompactManager(partition, bucket, new UniversalCompaction(this.options.maxSizeAmplificationPercent(), this.options.sortedRunSizeRatio(), this.options.numSortedRunCompactionTrigger(), this.options.maxSortedRunNum()), compactExecutor, levels), levels, this.getMaxSequenceNumber(restoreFiles), keyComparator, this.mergeFunction.copy(), dataFileWriter, this.options.commitForceCompact(), this.options.numSortedRunStopTrigger(), this.options.changelogProducer());
    }

    private CompactManager createCompactManager(BinaryRowData partition, int bucket, CompactStrategy compactStrategy, ExecutorService compactExecutor, Levels levels) {
        Comparator<RowData> keyComparator = this.keyComparatorSupplier.get();
        CompactRewriter rewriter = this.compactRewriter(partition, bucket, keyComparator);
        return new MergeTreeCompactManager(compactExecutor, levels, compactStrategy, keyComparator, this.options.targetFileSize(), rewriter);
    }

    private CompactRewriter compactRewriter(BinaryRowData partition, int bucket, Comparator<RowData> keyComparator) {
        DataFileWriter dataFileWriter = this.dataFileWriterFactory.create(partition, bucket);
        return (outputLevel, dropDelete, sections) -> dataFileWriter.write(new RecordReaderIterator<KeyValue>(new MergeTreeReader(sections, dropDelete, this.dataFileReaderFactory.create(partition, bucket), keyComparator, this.mergeFunction.copy())), outputLevel);
    }
}

