/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import java.io.Closeable;
import java.util.Collections;
import java.util.Dictionary;
import java.util.EnumMap;
import java.util.Hashtable;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.DistributionResponseInfo;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.publisher.AgentState;
import org.apache.sling.distribution.journal.impl.publisher.DistPublisherJMX;
import org.apache.sling.distribution.journal.impl.publisher.PackageMessageFactory;
import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
import org.apache.sling.distribution.journal.impl.publisher.PublisherConfiguration;
import org.apache.sling.distribution.journal.impl.publisher.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.shared.DistributionLogEventListener;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.journal.shared.Strings;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;

@ParametersAreNonnullByDefault
@Component(service={}, immediate=true, configurationPid={"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"})
@Designate(ocd=PublisherConfiguration.class, factory=true)
public class DistributionPublisher
implements DistributionAgent {
    public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
    private final EnumMap<DistributionRequestType, ToLongFunction<PackageMessage>> reqTypes = new EnumMap(DistributionRequestType.class);
    private final DefaultDistributionLog log = new DefaultDistributionLog(this.pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
    @Reference
    private MessagingProvider messagingProvider;
    @Reference(name="packageBuilder")
    private DistributionPackageBuilder packageBuilder;
    @Reference
    private DiscoveryService discoveryService;
    @Reference
    private PackageMessageFactory factory;
    @Reference
    private EventAdmin eventAdmin;
    @Reference
    private Topics topics;
    @Reference
    JournalAvailable journalAvailable;
    @Reference
    private DistributionMetricsService distributionMetricsService;
    @Reference
    private PubQueueProvider pubQueueProvider;
    private String pubAgentName;
    private String pkgType;
    private long queuedTimeout;
    private ServiceRegistration<DistributionAgent> componentReg;
    private Consumer<PackageMessage> sender;
    private JMXRegistration reg;
    private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
    private Closeable statusPoller;
    private DistributionLogEventListener distributionLogEventListener;

    public DistributionPublisher() {
        this.reqTypes.put(DistributionRequestType.ADD, this::sendAndWait);
        this.reqTypes.put(DistributionRequestType.DELETE, this::sendAndWait);
        this.reqTypes.put(DistributionRequestType.TEST, this::send);
    }

    @Activate
    public void activate(PublisherConfiguration config, BundleContext context) {
        DistPublisherJMX bean;
        Objects.requireNonNull(this.factory);
        Objects.requireNonNull(this.distributionMetricsService);
        this.pubAgentName = Strings.requireNotBlank(config.name());
        this.queuedTimeout = config.queuedTimeout();
        this.pkgType = this.packageBuilder.getType();
        this.sender = this.messagingProvider.createSender(this.topics.getPackageTopic());
        Dictionary<String, Object> props = this.createServiceProps(config);
        this.componentReg = Objects.requireNonNull(context.registerService(DistributionAgent.class, (Object)this, props));
        this.distributionLogEventListener = new DistributionLogEventListener(context, this.log, this.pubAgentName);
        try {
            bean = new DistPublisherJMX(this.pubAgentName, this.discoveryService, this);
        }
        catch (NotCompliantMBeanException e) {
            throw new RuntimeException(e);
        }
        this.reg = new JMXRegistration(bean, "agent", this.pubAgentName);
        String msg = String.format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, this.queuedTimeout);
        this.subscriberCountGauge = this.distributionMetricsService.createGauge("distribution.journal.publisher.subscriber_count;pub_name=" + this.pubAgentName, "Current number of publish subscribers", () -> this.discoveryService.getTopologyView().getSubscribedAgentIds().size());
        HandlerAdapter[] handlerAdapterArray = new HandlerAdapter[1];
        handlerAdapterArray[0] = HandlerAdapter.create(PackageStatusMessage.class, this.pubQueueProvider::handleStatus);
        this.statusPoller = this.messagingProvider.createPoller(this.topics.getStatusTopic(), Reset.earliest, handlerAdapterArray);
        this.log.info(msg, new Object[0]);
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly((Closeable[])new Closeable[]{this.statusPoller, this.distributionLogEventListener, this.reg});
        this.componentReg.unregister();
        String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, this.queuedTimeout);
        IOUtils.closeQuietly(this.subscriberCountGauge);
        this.log.info(msg, new Object[0]);
    }

    private Dictionary<String, Object> createServiceProps(PublisherConfiguration config) {
        Hashtable<String, Object> props = new Hashtable<String, Object>();
        ((Dictionary)props).put("name", config.name());
        ((Dictionary)props).put("title", config.name());
        ((Dictionary)props).put("details", config.name());
        ((Dictionary)props).put("packageBuilder.target", config.packageBuilder_target());
        ((Dictionary)props).put("webconsole.configurationFactory.nameHint", config.webconsole_configurationFactory_nameHint());
        return props;
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        return Collections.unmodifiableCollection(this.pubQueueProvider.getQueueNames(this.pubAgentName));
    }

    public DistributionQueue getQueue(String queueName) {
        try {
            DistributionQueue queue = this.pubQueueProvider.getQueue(this.pubAgentName, queueName);
            if (queue == null) {
                this.distributionMetricsService.getQueueAccessErrorCount().increment();
            }
            return queue;
        }
        catch (Exception e) {
            this.distributionMetricsService.getQueueAccessErrorCount().increment();
            throw e;
        }
    }

    @Nonnull
    public DistributionLog getLog() {
        return this.log;
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) throws DistributionException {
        ToLongFunction<PackageMessage> handler = this.reqTypes.get(request.getRequestType());
        if (handler != null) {
            return this.execute(resourceResolver, request, handler);
        }
        return this.executeUnsupported(request);
    }

    private DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request, ToLongFunction<PackageMessage> sender) throws DistributionException {
        PackageMessage pkg;
        try {
            pkg = DistributionMetricsService.timed(this.distributionMetricsService.getBuildPackageDuration(), () -> this.factory.create(this.packageBuilder, resourceResolver, this.pubAgentName, request));
        }
        catch (Exception e) {
            this.distributionMetricsService.getDroppedRequests().mark();
            this.log.error("Failed to create content package for requestType={}, paths={}", request.getRequestType(), request.getPaths(), e);
            throw new DistributionException((Throwable)e);
        }
        try {
            long offset = DistributionMetricsService.timed(this.distributionMetricsService.getEnqueuePackageDuration(), () -> sender.applyAsLong(pkg));
            this.distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
            this.distributionMetricsService.getAcceptedRequests().mark();
            String msg = String.format("Request accepted with distribution package %s at offset=%s", pkg, offset);
            this.log.info(msg, new Object[0]);
            return new SimpleDistributionResponse(DistributionRequestState.ACCEPTED, msg, new DistributionResponseInfo(){

                @Nonnull
                public String getId() {
                    return pkg.getPkgId();
                }
            });
        }
        catch (Throwable e) {
            this.distributionMetricsService.getDroppedRequests().mark();
            String msg = String.format("Failed to append distribution package %s to the journal", pkg);
            this.log.error(msg, e);
            if (e instanceof Error) {
                throw (Error)e;
            }
            throw new DistributionException(msg, e);
        }
    }

    private long send(PackageMessage pkg) {
        this.sender.accept(pkg);
        return -1L;
    }

    private long sendAndWait(PackageMessage pkg) {
        PackageQueuedNotifier queuedNotifier = this.pubQueueProvider.getQueuedNotifier();
        try {
            CompletableFuture<Long> received = queuedNotifier.registerWait(pkg.getPkgId());
            Event createdEvent = DistributionEvent.eventPackageCreated(pkg, this.pubAgentName);
            this.eventAdmin.postEvent(createdEvent);
            this.sender.accept(pkg);
            return received.get(this.queuedTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            queuedNotifier.unRegisterWait(pkg.getPkgId());
            throw new RuntimeException(e);
        }
    }

    @Nonnull
    private DistributionResponse executeUnsupported(DistributionRequest request) {
        String msg = String.format("Request requestType=%s not supported by this agent, expected one of %s", request.getRequestType(), this.reqTypes.keySet());
        this.log.info(msg, new Object[0]);
        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
    }
}

