package org.axonframework.eventhandling;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.io.IOUtils;
import org.axonframework.common.transaction.Transaction;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.interceptors.TransactionManagingInterceptor;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor.class */
public class TrackingEventProcessor extends AbstractEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TrackingEventProcessor.class);
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final int batchSize;
    private final ExecutorService executorService;
    private volatile TrackingToken lastToken;
    private volatile State state;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor$State.class */
    public enum State {
        NOT_STARTED,
        STARTED,
        SHUT_DOWN
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TokenStore tokenStore, TransactionManager transactionManager) {
        this(str, eventHandlerInvoker, streamableMessageSource, tokenStore, transactionManager, NoOpMessageMonitor.INSTANCE);
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TokenStore tokenStore, TransactionManager transactionManager, int i) {
        this(str, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, streamableMessageSource, tokenStore, transactionManager, i, NoOpMessageMonitor.INSTANCE);
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TokenStore tokenStore, TransactionManager transactionManager, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        this(str, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, streamableMessageSource, tokenStore, transactionManager, 1, messageMonitor);
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, RollbackConfiguration rollbackConfiguration, ErrorHandler errorHandler, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TokenStore tokenStore, TransactionManager transactionManager, int i, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        super(str, eventHandlerInvoker, rollbackConfiguration, errorHandler, messageMonitor);
        this.state = State.NOT_STARTED;
        this.messageSource = (StreamableMessageSource) Objects.requireNonNull(streamableMessageSource);
        this.tokenStore = (TokenStore) Objects.requireNonNull(tokenStore);
        this.transactionManager = transactionManager;
        this.executorService = Executors.newSingleThreadExecutor(new AxonThreadFactory("TrackingEventProcessor - " + str));
        registerInterceptor(new TransactionManagingInterceptor(transactionManager));
        Assert.isTrue(i > 0, () -> {
            return "batchSize needs to be greater than 0";
        });
        this.batchSize = i;
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void start() {
        if (this.state == State.NOT_STARTED) {
            this.state = State.STARTED;
            registerInterceptor((unitOfWork, interceptorChain) -> {
                unitOfWork.onPrepareCommit(unitOfWork -> {
                    EventMessage eventMessage = (EventMessage) unitOfWork.getMessage();
                    if ((eventMessage instanceof TrackedEventMessage) && this.lastToken != null && this.lastToken.equals(((TrackedEventMessage) eventMessage).trackingToken())) {
                        this.tokenStore.storeToken(this.lastToken, getName(), 0);
                    }
                });
                return interceptorChain.proceed();
            });
            this.executorService.submit(() -> {
                try {
                    processingLoop();
                } catch (Throwable th) {
                    logger.error("Processing loop ended due to uncaught exception. Processor stopping.", th);
                }
            });
        }
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void shutDown() {
        if (this.state != State.SHUT_DOWN) {
            this.state = State.SHUT_DOWN;
            this.executorService.shutdown();
        }
    }

    protected void processingLoop() {
        MessageStream<TrackedEventMessage<?>> messageStream = null;
        long j = 1;
        while (this.state != State.SHUT_DOWN) {
            try {
                messageStream = ensureEventStreamOpened(messageStream);
                try {
                    processBatch(messageStream);
                    j = 1;
                } catch (Exception e) {
                    if (j == 1) {
                        logger.warn("Error occurred. Starting retry mode.", e);
                    }
                    logger.warn("Releasing claim on token and preparing for retry in {}s", Long.valueOf(j));
                    releaseToken();
                    IOUtils.closeQuietly(messageStream);
                    messageStream = null;
                    try {
                        Thread.sleep(j * 1000);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        logger.warn("Thread interrupted. Preparing to shut down event processor");
                        shutDown();
                    }
                    j = Math.min(j * 2, 60L);
                }
            } finally {
                IOUtils.closeQuietly(messageStream);
                releaseToken();
            }
        }
    }

    private void releaseToken() {
        try {
            this.transactionManager.executeInTransaction(() -> {
                this.tokenStore.releaseClaim(getName(), 0);
            });
        } catch (Exception e) {
        }
    }

    private void processBatch(MessageStream<TrackedEventMessage<?>> messageStream) throws Exception {
        ArrayList arrayList = new ArrayList();
        try {
            if (messageStream.hasNextAvailable(1, TimeUnit.SECONDS)) {
                while (arrayList.size() < this.batchSize && messageStream.hasNextAvailable()) {
                    arrayList.add(messageStream.nextAvailable());
                }
            }
            if (arrayList.isEmpty()) {
                this.transactionManager.executeInTransaction(() -> {
                    this.tokenStore.fetchToken(getName(), 0);
                });
                return;
            }
            this.lastToken = ((TrackedEventMessage) arrayList.get(arrayList.size() - 1)).trackingToken();
            while (this.lastToken != null && messageStream.peek().filter(trackedEventMessage -> {
                return this.lastToken.equals(trackedEventMessage.trackingToken());
            }).isPresent()) {
                arrayList.add(messageStream.nextAvailable());
            }
            process(arrayList);
        } catch (InterruptedException e) {
            logger.error(String.format("Event processor [%s] was interrupted. Shutting down.", getName()), e);
            Thread.currentThread().interrupt();
            shutDown();
        }
    }

    private MessageStream<TrackedEventMessage<?>> ensureEventStreamOpened(MessageStream<TrackedEventMessage<?>> messageStream) {
        MessageStream<TrackedEventMessage<?>> messageStream2 = messageStream;
        while (messageStream2 == null && this.state == State.STARTED) {
            Transaction startTransaction = this.transactionManager.startTransaction();
            try {
                messageStream2 = this.messageSource.openStream2(this.tokenStore.fetchToken(getName(), 0));
                startTransaction.commit();
            } catch (UnableToClaimTokenException e) {
                startTransaction.rollback();
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e2) {
                    logger.info("Thread interrupted while waiting for new attempt to claim token");
                    Thread.currentThread().interrupt();
                }
            } catch (Exception e3) {
                logger.warn("Unexpected exception while attemting to retrieve token and open stream. Retrying in 5 seconds.", e3);
                startTransaction.rollback();
            }
        }
        return messageStream2;
    }

    protected State getState() {
        return this.state;
    }
}
