/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.shaded.connector.kafka.source.metrics;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.table.store.shaded.connector.kafka.MetricUtil;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.Metric;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.MetricName;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceReaderMetrics {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReaderMetrics.class);
    public static final String KAFKA_SOURCE_READER_METRIC_GROUP = "KafkaSourceReader";
    public static final String TOPIC_GROUP = "topic";
    public static final String PARTITION_GROUP = "partition";
    public static final String CURRENT_OFFSET_METRIC_GAUGE = "currentOffset";
    public static final String COMMITTED_OFFSET_METRIC_GAUGE = "committedOffset";
    public static final String COMMITS_SUCCEEDED_METRIC_COUNTER = "commitsSucceeded";
    public static final String COMMITS_FAILED_METRIC_COUNTER = "commitsFailed";
    public static final String KAFKA_CONSUMER_METRIC_GROUP = "KafkaConsumer";
    public static final String CONSUMER_FETCH_MANAGER_GROUP = "consumer-fetch-manager-metrics";
    public static final String BYTES_CONSUMED_TOTAL = "bytes-consumed-total";
    public static final String RECORDS_LAG = "records-lag";
    public static final long INITIAL_OFFSET = -1L;
    private final SourceReaderMetricGroup sourceReaderMetricGroup;
    private final MetricGroup kafkaSourceReaderMetricGroup;
    private final Counter commitsSucceeded;
    private final Counter commitsFailed;
    private final Map<TopicPartition, Offset> offsets = new HashMap<TopicPartition, Offset>();
    @Nullable
    private ConcurrentMap<TopicPartition, Metric> recordsLagMetrics;
    @Nullable
    private Metric bytesConsumedTotalMetric;
    private long latestBytesConsumedTotal;

    public KafkaSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
        this.sourceReaderMetricGroup = sourceReaderMetricGroup;
        this.kafkaSourceReaderMetricGroup = sourceReaderMetricGroup.addGroup(KAFKA_SOURCE_READER_METRIC_GROUP);
        this.commitsSucceeded = this.kafkaSourceReaderMetricGroup.counter(COMMITS_SUCCEEDED_METRIC_COUNTER);
        this.commitsFailed = this.kafkaSourceReaderMetricGroup.counter(COMMITS_FAILED_METRIC_COUNTER);
    }

    public void registerKafkaConsumerMetrics(KafkaConsumer<?, ?> kafkaConsumer) {
        Map<MetricName, Metric> kafkaConsumerMetrics = kafkaConsumer.metrics();
        if (kafkaConsumerMetrics == null) {
            LOG.warn("Consumer implementation does not support metrics");
            return;
        }
        MetricGroup kafkaConsumerMetricGroup = this.kafkaSourceReaderMetricGroup.addGroup(KAFKA_CONSUMER_METRIC_GROUP);
        kafkaConsumerMetrics.forEach((name, metric) -> kafkaConsumerMetricGroup.gauge(name.name(), () -> metric.metricValue()));
    }

    public void registerTopicPartition(TopicPartition tp) {
        this.offsets.put(tp, new Offset(-1L, -1L));
        this.registerOffsetMetricsForTopicPartition(tp);
    }

    public void recordCurrentOffset(TopicPartition tp, long offset) {
        this.checkTopicPartitionTracked(tp);
        this.offsets.get((Object)tp).currentOffset = offset;
    }

    public void recordCommittedOffset(TopicPartition tp, long offset) {
        this.checkTopicPartitionTracked(tp);
        this.offsets.get((Object)tp).committedOffset = offset;
    }

    public void recordSucceededCommit() {
        this.commitsSucceeded.inc();
    }

    public void recordFailedCommit() {
        this.commitsFailed.inc();
    }

    public void registerNumBytesIn(KafkaConsumer<?, ?> consumer) {
        try {
            Predicate<Map.Entry<MetricName, ? extends Metric>> filter = entry -> ((MetricName)entry.getKey()).group().equals(CONSUMER_FETCH_MANAGER_GROUP) && ((MetricName)entry.getKey()).name().equals(BYTES_CONSUMED_TOTAL) && !((MetricName)entry.getKey()).tags().containsKey(TOPIC_GROUP);
            this.bytesConsumedTotalMetric = MetricUtil.getKafkaMetric(consumer.metrics(), filter);
        }
        catch (IllegalStateException e) {
            LOG.warn(String.format("Error when getting Kafka consumer metric \"%s\". I/O metric \"%s\" will not be reported. ", BYTES_CONSUMED_TOTAL, "numBytesIn"), (Throwable)e);
        }
    }

    public void maybeAddRecordsLagMetric(KafkaConsumer<?, ?> consumer, TopicPartition tp) {
        if (this.recordsLagMetrics == null) {
            this.recordsLagMetrics = new ConcurrentHashMap<TopicPartition, Metric>();
            this.sourceReaderMetricGroup.setPendingRecordsGauge(() -> {
                long pendingRecordsTotal = 0L;
                for (Metric recordsLagMetric : this.recordsLagMetrics.values()) {
                    pendingRecordsTotal += ((Double)recordsLagMetric.metricValue()).longValue();
                }
                return pendingRecordsTotal;
            });
        }
        this.recordsLagMetrics.computeIfAbsent(tp, ignored -> this.getRecordsLagMetric(consumer.metrics(), tp));
    }

    public void removeRecordsLagMetric(TopicPartition tp) {
        if (this.recordsLagMetrics != null) {
            this.recordsLagMetrics.remove(tp);
        }
    }

    public void updateNumBytesInCounter() {
        if (this.bytesConsumedTotalMetric != null) {
            long bytesConsumedUntilNow = ((Number)this.bytesConsumedTotalMetric.metricValue()).longValue();
            long bytesConsumedSinceLastUpdate = bytesConsumedUntilNow - this.latestBytesConsumedTotal;
            this.sourceReaderMetricGroup.getIOMetricGroup().getNumBytesInCounter().inc(bytesConsumedSinceLastUpdate);
            this.latestBytesConsumedTotal = bytesConsumedUntilNow;
        }
    }

    private void registerOffsetMetricsForTopicPartition(TopicPartition tp) {
        MetricGroup topicPartitionGroup = this.kafkaSourceReaderMetricGroup.addGroup(TOPIC_GROUP, tp.topic()).addGroup(PARTITION_GROUP, String.valueOf(tp.partition()));
        topicPartitionGroup.gauge(CURRENT_OFFSET_METRIC_GAUGE, () -> this.offsets.getOrDefault((Object)tp, (Offset)new Offset((long)-1L, (long)-1L)).currentOffset);
        topicPartitionGroup.gauge(COMMITTED_OFFSET_METRIC_GAUGE, () -> this.offsets.getOrDefault((Object)tp, (Offset)new Offset((long)-1L, (long)-1L)).committedOffset);
    }

    private void checkTopicPartitionTracked(TopicPartition tp) {
        if (!this.offsets.containsKey(tp)) {
            throw new IllegalArgumentException(String.format("TopicPartition %s is not tracked", tp));
        }
    }

    @Nullable
    private Metric getRecordsLagMetric(Map<MetricName, ? extends Metric> metrics, TopicPartition tp) {
        try {
            String resolvedTopic = tp.topic().replace('.', '_');
            String resolvedPartition = String.valueOf(tp.partition());
            Predicate<Map.Entry<MetricName, ? extends Metric>> filter = entry -> {
                MetricName metricName = (MetricName)entry.getKey();
                Map<String, String> tags = metricName.tags();
                return metricName.group().equals(CONSUMER_FETCH_MANAGER_GROUP) && metricName.name().equals(RECORDS_LAG) && tags.containsKey(TOPIC_GROUP) && tags.get(TOPIC_GROUP).equals(resolvedTopic) && tags.containsKey(PARTITION_GROUP) && tags.get(PARTITION_GROUP).equals(resolvedPartition);
            };
            return MetricUtil.getKafkaMetric(metrics, filter);
        }
        catch (IllegalStateException e) {
            LOG.warn(String.format("Error when getting Kafka consumer metric \"%s\" for partition \"%s\". Metric \"%s\" may not be reported correctly. ", RECORDS_LAG, tp, "pendingRecords"), (Throwable)e);
            return null;
        }
    }

    private static class Offset {
        long currentOffset;
        long committedOffset;

        Offset(long currentOffset, long committedOffset) {
            this.currentOffset = currentOffset;
            this.committedOffset = committedOffset;
        }
    }
}

