package com.almworks.structure.confluence.helper;

import com.almworks.structure.confluence.helper.util.OneElementQueue;
import com.atlassian.fugue.Suppliers;
import com.atlassian.util.concurrent.ThreadFactories;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:com/almworks/structure/confluence/helper/ThrottledNotificationService.class */
public class ThrottledNotificationService implements NotificationTracker, DisposableBean, InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(ThrottledNotificationService.class);
    private static final Function<ThrottledNotification, Notification> GET_NOTIFICATION = new Function<ThrottledNotification, Notification>() { // from class: com.almworks.structure.confluence.helper.ThrottledNotificationService.1
        public Notification apply(ThrottledNotification throttledNotification) {
            return throttledNotification.getNotification();
        }
    };
    private final SubscriptionManager mySubscriptionManager;
    private final NotificationSender myNotificationSender;
    private final long myMinimalSendInterval = TimeUnit.MILLISECONDS.toNanos(Long.getLong("structure.notifications.minimalSendInterval", 500).longValue());
    private final int myMaxNotificationQueueSize = Integer.getInteger("structure.notification.maxNotificationQueueSize", 1000000).intValue();
    private final long myNotificationAccumulationInterval = TimeUnit.MILLISECONDS.toNanos(Long.getLong("structure.notifications.accumulationInterval", 100).longValue());
    private final long myResendPendingNotificationsInterval = TimeUnit.MILLISECONDS.toNanos(Long.getLong("structure.notifications.resendPendingNotificationsInterval", 30000).longValue());
    private final DelayQueue<ThrottledNotification> myNotificationsQueue = new DelayQueue<>();
    private final AtomicLong myNextSend = new AtomicLong(System.nanoTime());
    private final AtomicInteger myNotificationsCount = new AtomicInteger(0);
    private final Runnable mySendTask = new Runnable() { // from class: com.almworks.structure.confluence.helper.ThrottledNotificationService.2
        @Override // java.lang.Runnable
        public void run() {
            try {
                ThrottledNotificationService.this.send();
            } catch (InterruptedException e) {
                ThrottledNotificationService.log.warn("sending notifications thread was interrupted, stopping");
            }
        }
    };
    private final ExecutorService mySendTaskExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new OneElementQueue(Suppliers.ofInstance(this.mySendTask)), ThreadFactories.namedThreadFactory("structure-notification-sender"), new ThreadPoolExecutor.DiscardPolicy());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/almworks/structure/confluence/helper/ThrottledNotificationService$ThrottledNotification.class */
    public class ThrottledNotification implements Delayed {
        private final Notification myNotification;
        private final long myInstanceId;

        public ThrottledNotification(Notification notification) {
            this.myInstanceId = ThrottledNotificationService.this.myNotificationsCount.getAndIncrement();
            this.myNotification = notification;
        }

        public Notification getNotification() {
            return this.myNotification;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(ThrottledNotificationService.this.myNextSend.get() - System.nanoTime(), TimeUnit.NANOSECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return Long.compare(this.myInstanceId, ((ThrottledNotification) delayed).myInstanceId);
        }
    }

    public ThrottledNotificationService(SubscriptionManager subscriptionManager, NotificationSender notificationSender) {
        this.mySubscriptionManager = subscriptionManager;
        this.myNotificationSender = notificationSender;
        log.warn("starting");
    }

    public String toString() {
        return "Structure notification service";
    }

    public void afterPropertiesSet() throws Exception {
        log.warn(this + " starting send task");
        this.mySendTaskExecutor.submit(this.mySendTask);
    }

    public void destroy() throws Exception {
        log.warn(this + " shutting down");
        this.mySendTaskExecutor.shutdownNow();
        this.mySendTaskExecutor.awaitTermination(2 * this.myMinimalSendInterval, TimeUnit.NANOSECONDS);
    }

    @Override // com.almworks.structure.confluence.helper.NotificationTracker
    public void record(Notification notification) {
        add(notification);
    }

    private void add(Notification notification) {
        ThrottledNotification peek;
        this.myNotificationsQueue.add((DelayQueue<ThrottledNotification>) new ThrottledNotification(notification));
        log.debug("n +{}", notification);
        int size = this.myNotificationsQueue.size() - this.myMaxNotificationQueueSize;
        if (size <= 0) {
            return;
        }
        log.error("Dropping {} notifications: notification queue size exceeded {}", Integer.valueOf(size), Integer.valueOf(this.myMaxNotificationQueueSize));
        while (true) {
            int i = size;
            size--;
            if (i <= 0 || (peek = this.myNotificationsQueue.peek()) == null) {
                return;
            } else {
                this.myNotificationsQueue.remove(peek);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send() throws InterruptedException {
        log.debug("send {}. waiting for subscriptions", this.myNextSend);
        List<Subscription> activeSubscriptions = this.mySubscriptionManager.getActiveSubscriptions();
        log.debug("subscriptions {}. waiting for notifications", activeSubscriptions);
        ThrottledNotification waitForNotifications = waitForNotifications(activeSubscriptions);
        List emptyList = Collections.emptyList();
        if (waitForNotifications != null) {
            ThrottledNotification accumulateNotifications = accumulateNotifications();
            emptyList = new ArrayList(1 + (accumulateNotifications == null ? 0 : 1) + this.myNotificationsQueue.size());
            emptyList.add(waitForNotifications);
            if (accumulateNotifications != null) {
                emptyList.add(accumulateNotifications);
            }
            this.myNotificationsQueue.drainTo(emptyList);
        }
        Collection<Notification> transform = Collections2.transform(emptyList, GET_NOTIFICATION);
        log.debug("sending {}", transform);
        this.myNotificationSender.send(activeSubscriptions, transform);
        this.myNextSend.set(System.nanoTime() + this.myMinimalSendInterval);
    }

    @Nullable
    private ThrottledNotification waitForNotifications(List<Subscription> list) throws InterruptedException {
        return this.myNotificationSender.hasPendingNotifications(list) ? this.myNotificationsQueue.poll(this.myResendPendingNotificationsInterval, TimeUnit.NANOSECONDS) : this.myNotificationsQueue.take();
    }

    private ThrottledNotification accumulateNotifications() throws InterruptedException {
        return this.myNotificationsQueue.poll(this.myNotificationAccumulationInterval, TimeUnit.NANOSECONDS);
    }
}
