/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.LongStream;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component(immediate=true)
public class PackageDistributedNotifier
implements TopologyChangeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
    @Reference
    private EventAdmin eventAdmin;
    @Reference
    private PubQueueProvider pubQueueCacheService;
    @Reference
    private MessagingProvider messagingProvider;
    @Reference
    private Topics topics;
    private Consumer<PackageDistributedMessage> sender;
    private boolean sendMsg;

    @Activate
    public void activate() {
        this.sendMsg = StringUtils.isNotBlank((CharSequence)this.topics.getEventTopic());
        if (this.sendMsg) {
            this.sender = this.messagingProvider.createSender(this.topics.getEventTopic());
        }
        LOG.info("Started package distributed notifier with event message topic {}", (Object)this.topics.getEventTopic());
    }

    @Override
    public void changed(TopologyViewDiff diffView) {
        diffView.getProcessedOffsets().forEach(this::processOffsets);
    }

    private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
        long minOffset = offsets.get().findFirst().getAsLong();
        OffsetQueue<DistributionQueueItem> offsetQueue = this.pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
        offsets.get().mapToObj(offsetQueue::getItem).filter(Objects::nonNull).forEach(msg -> this.notifyDistributed(pubAgentName, (DistributionQueueItem)msg));
    }

    protected void notifyDistributed(String pubAgentName, DistributionQueueItem queueItem) {
        LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", (Object)pubAgentName, (Object)queueItem.getPackageId());
        this.sendEvt(pubAgentName, queueItem);
        if (this.sendMsg) {
            this.sendMsg(pubAgentName, queueItem);
        }
    }

    private void sendMsg(String pubAgentName, DistributionQueueItem queueItem) {
        try {
            PackageDistributedMessage msg = this.createDistributedMessage(pubAgentName, queueItem);
            this.sender.accept(msg);
        }
        catch (Exception e) {
            LOG.warn("Exception when sending package distributed message for pubAgentName={}, pkgId={}", new Object[]{pubAgentName, queueItem.getPackageId(), e});
        }
    }

    private PackageDistributedMessage createDistributedMessage(String pubAgentName, DistributionQueueItem queueItem) {
        return PackageDistributedMessage.builder().pubAgentName(pubAgentName).packageId(queueItem.getPackageId()).offset(((Long)queueItem.get((Object)"recordOffset")).longValue()).paths((String[])queueItem.get((Object)"request.paths")).deepPaths((String[])queueItem.get((Object)"request.deepPaths")).build();
    }

    private void sendEvt(String pubAgentName, DistributionQueueItem queueItem) {
        try {
            Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
            this.eventAdmin.sendEvent(distributed);
        }
        catch (Exception e) {
            LOG.warn("Exception when sending package distributed event for pubAgentName={}, pkgId={}", new Object[]{pubAgentName, queueItem.getPackageId(), e});
        }
    }
}

