/*
 * Decompiled with CFR 0.152.
 */
package org.activiti.cloud.services.job.executor;

import org.activiti.cloud.services.job.executor.JobMessageBuilderFactory;
import org.activiti.cloud.services.job.executor.JobMessageFailedEvent;
import org.activiti.cloud.services.job.executor.JobMessageProducer;
import org.activiti.cloud.services.job.executor.JobMessageSentEvent;
import org.activiti.engine.runtime.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.MessageDispatchingException;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

public class DefaultJobMessageProducer
implements JobMessageProducer {
    private static final Logger logger = LoggerFactory.getLogger(DefaultJobMessageProducer.class);
    private static final String ROUTING_KEY = "routingKey";
    private final StreamBridge streamBridge;
    private final ApplicationEventPublisher eventPublisher;
    private final JobMessageBuilderFactory jobMessageBuilderFactory;

    public DefaultJobMessageProducer(StreamBridge streamBridge, ApplicationEventPublisher eventPublisher, JobMessageBuilderFactory jobMessageBuilderFactory) {
        this.streamBridge = streamBridge;
        this.eventPublisher = eventPublisher;
        this.jobMessageBuilderFactory = jobMessageBuilderFactory;
    }

    @Override
    public void sendMessage(@NonNull String destination, @NonNull Job job) {
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            throw new IllegalStateException("requires active transaction synchronization");
        }
        Assert.hasLength((String)job.getId(), (String)"job id must not be empty");
        Assert.hasLength((String)destination, (String)"destination must not be empty");
        Message message = this.jobMessageBuilderFactory.create(job).withPayload((Object)job.getId()).setHeader(ROUTING_KEY, (Object)destination).build();
        TransactionSynchronizationManager.registerSynchronization((TransactionSynchronization)new JobMessageTransactionSynchronization((Message<String>)message, destination));
    }

    class JobMessageTransactionSynchronization
    implements TransactionSynchronization {
        private final String destination;
        private final Message<String> message;

        public JobMessageTransactionSynchronization(Message<String> message, String destination) {
            this.destination = destination;
            this.message = message;
        }

        public void afterCommit() {
            logger.debug("Sending job message '{}' via stream bridge to: {}", this.message, (Object)this.destination);
            try {
                boolean sent = DefaultJobMessageProducer.this.streamBridge.send(this.destination, this.message);
                if (!sent) {
                    throw new MessageDispatchingException(this.message);
                }
                DefaultJobMessageProducer.this.eventPublisher.publishEvent((ApplicationEvent)new JobMessageSentEvent(this.message, this.destination));
            }
            catch (Exception cause) {
                logger.error("Sending job message {} failed due to error: {}", this.message, (Object)cause.getMessage());
                DefaultJobMessageProducer.this.eventPublisher.publishEvent((ApplicationEvent)new JobMessageFailedEvent(this.message, cause, this.destination));
            }
        }
    }
}

