/*
 * Decompiled with CFR 0.152.
 */
package org.apache.solr.crossdc.update.processor;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CollectionProperties;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.crossdc.common.ConfUtil;
import org.apache.solr.crossdc.common.ConfigProperty;
import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
import org.apache.solr.crossdc.update.processor.KafkaRequestMirroringHandler;
import org.apache.solr.crossdc.update.processor.MirroringUpdateProcessor;
import org.apache.solr.crossdc.update.processor.ProducerMetrics;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcessorFactory
implements SolrCoreAware,
UpdateRequestProcessorFactory.RunAlways {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final String SERVER_SHOULD_MIRROR = "shouldMirror";
    private volatile KafkaRequestMirroringHandler mirroringHandler;
    private volatile ProducerMetrics producerMetrics;
    private boolean enabled = true;
    private KafkaCrossDcConf conf;
    private final Map<String, Object> properties = new HashMap<String, Object>();
    private NamedList<?> args;

    public void init(NamedList<?> args) {
        super.init(args);
        this.args = args;
    }

    private void applyArgsOverrides() {
        Boolean enabled = this.args.getBooleanArg("enabled");
        if (enabled != null && !enabled.booleanValue()) {
            this.enabled = false;
        }
        for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
            String val = this.args._getStr(configKey.getKey());
            if (val == null || val.isBlank()) continue;
            this.properties.put(configKey.getKey(), val);
        }
    }

    private void lookupPropertyOverridesInZk(SolrCore core) {
        log.info("Producer startup config properties before adding additional properties from Zookeeper={}", this.properties);
        try {
            SolrZkClient solrZkClient = core.getCoreContainer().getZkController().getZkClient();
            ConfUtil.fillProperties(solrZkClient, this.properties);
            this.applyArgsOverrides();
            CollectionProperties cp = new CollectionProperties(solrZkClient);
            Map collectionProperties = cp.getCollectionProperties(core.getCoreDescriptor().getCollectionName());
            for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
                String val = (String)collectionProperties.get("crossdc." + configKey.getKey());
                if (val == null || val.isBlank()) continue;
                this.properties.put(configKey.getKey(), val);
            }
            String enabledVal = (String)collectionProperties.get("crossdc.enabled");
            if (enabledVal != null) {
                this.enabled = Boolean.parseBoolean(enabledVal.toString());
            }
        }
        catch (Exception e) {
            log.error("Exception looking for CrossDC configuration in Zookeeper", (Throwable)e);
            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception looking for CrossDC configuration in Zookeeper", (Throwable)e);
        }
    }

    public void inform(SolrCore core) {
        log.info("KafkaRequestMirroringHandler inform enabled={}", (Object)this.enabled);
        if (core.getCoreContainer().isZooKeeperAware()) {
            this.lookupPropertyOverridesInZk(core);
        } else {
            this.applyArgsOverrides();
            if (this.enabled) {
                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, ((Object)((Object)this)).getClass().getSimpleName() + " only supported in SolrCloud mode; please disable or remove from solrconfig.xml");
            }
            log.warn("Core '{}' was configured to use a disabled {}, but {} is only supported in SolrCloud deployments.  A NoOp processor will be used instead", new Object[]{core.getName(), ((Object)((Object)this)).getClass().getSimpleName(), ((Object)((Object)this)).getClass().getSimpleName()});
        }
        if (!this.enabled) {
            return;
        }
        ConfUtil.verifyProperties(this.properties);
        this.conf = new KafkaCrossDcConf(this.properties);
        KafkaMirroringSink sink = new KafkaMirroringSink(this.conf);
        Closer closer = new Closer(sink);
        core.addCloseHook((CloseHook)new MyCloseHook(closer));
        this.producerMetrics = new ProducerMetrics(core.getSolrMetricsContext().getChildContext((Object)this), core);
        this.mirroringHandler = new KafkaRequestMirroringHandler(sink);
    }

    private static Integer getIntegerPropValue(String name, Properties props) {
        String value = props.getProperty(name);
        if (value == null) {
            return null;
        }
        return Integer.parseInt(value);
    }

    public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
        if (!this.enabled) {
            return new NoOpUpdateRequestProcessor(next);
        }
        if (this.mirroringHandler == null) {
            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "mirroringHandler is null");
        }
        boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR, true);
        boolean mirrorCommits = this.conf.getBool("solr.crossdc.mirrorCommits");
        CrossDcConf.ExpandDbq expandDbq = CrossDcConf.ExpandDbq.getOrDefault(this.conf.get("solr.crossdc.expandDbq"), CrossDcConf.ExpandDbq.EXPAND);
        long maxMirroringBatchSizeBytes = this.conf.getInt("solr.crossdc.maxRequestSizeBytes").intValue();
        Boolean indexUnmirrorableDocs = this.conf.getBool("solr.crossdc.indexUnmirrorableDocs");
        ModifiableSolrParams mirroredParams = null;
        if (doMirroring) {
            CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
            String collection = coreDesc.getCollectionName();
            if (collection == null) {
                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not determine collection name for " + MirroringUpdateProcessor.class.getSimpleName() + ". Solr may not be running in cloud mode.");
            }
            mirroredParams = new ModifiableSolrParams(req.getParams());
            mirroredParams.set("collection", new String[]{collection});
            mirroredParams.remove("_version_");
            mirroredParams.remove("update.distrib");
            mirroredParams.remove("distrib.from.collection");
            mirroredParams.remove("distrib.inplace.prevversion");
            mirroredParams.remove("commit_end_point");
            mirroredParams.remove("distrib.from.shard");
            mirroredParams.remove("distrib.from.parent");
            mirroredParams.remove("distrib.from");
            mirroredParams.set(SERVER_SHOULD_MIRROR, new String[]{Boolean.FALSE.toString()});
        }
        if (log.isTraceEnabled()) {
            log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams);
        }
        return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, mirrorCommits, expandDbq, maxMirroringBatchSizeBytes, (SolrParams)mirroredParams, DistributedUpdateProcessor.DistribPhase.parseParam((String)req.getParams().get("update.distrib")), doMirroring ? this.mirroringHandler : null, this.producerMetrics);
    }

    public static class NoOpUpdateRequestProcessor
    extends UpdateRequestProcessor {
        NoOpUpdateRequestProcessor(UpdateRequestProcessor next) {
            super(next);
        }
    }

    private static class Closer {
        private final KafkaMirroringSink sink;

        public Closer(KafkaMirroringSink sink) {
            this.sink = sink;
        }

        public final void close() {
            try {
                this.sink.close();
            }
            catch (IOException e) {
                log.error("Exception closing sink", (Throwable)e);
            }
        }
    }

    private static class MyCloseHook
    implements CloseHook {
        private final Closer closer;

        public MyCloseHook(Closer closer) {
            this.closer = closer;
        }

        public void preClose(SolrCore core) {
        }

        public void postClose(SolrCore core) {
            this.closer.close();
        }
    }
}

