/*
 * Decompiled with CFR 0.152.
 */
package org.activiti.cloud.services.messages.tests;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.activiti.api.process.model.builders.MessageEventPayloadBuilder;
import org.activiti.api.process.model.events.BPMNMessageEvent;
import org.activiti.api.process.model.events.MessageDefinitionEvent;
import org.activiti.api.process.model.events.MessageSubscriptionEvent;
import org.activiti.api.process.model.payloads.MessageEventPayload;
import org.activiti.cloud.services.messages.core.aggregator.MessageConnectorAggregator;
import org.activiti.cloud.services.messages.core.config.MessageAggregatorProperties;
import org.activiti.cloud.services.messages.core.controlbus.ControlBusGateway;
import org.activiti.cloud.services.messages.core.correlation.Correlations;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.MapAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.BridgeFrom;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.transaction.PlatformTransactionManager;

@SpringBootTest(webEnvironment=SpringBootTest.WebEnvironment.NONE, properties={"spring.application.name=rb", "activiti.cloud.application.name=default-app", "spring.cloud.stream.bindings.messageConnectorInput.content-type=application/json", "spring.cloud.stream.bindings.messageConnectorOutput.content-type=application/json"})
@DirtiesContext
@Import(value={TestConfigurationContext.class})
public abstract class AbstractMessagesCoreIntegrationTests {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessagesCoreIntegrationTests.class);
    protected static final int TEST_TIMEOUT = 30;
    protected ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    @Autowired
    protected StreamBridge streamBridge;
    @Autowired
    private OutputDestination outputDestination;
    @Autowired
    protected MessageGroupStore messageGroupStore;
    @Autowired
    protected MessageConnectorAggregator aggregatingMessageHandler;
    @Autowired
    protected QueueChannel errorQueue;
    @Autowired
    protected QueueChannel discardQueue;
    @Autowired
    protected ControlBusGateway controlBus;
    @Autowired
    protected PlatformTransactionManager transactionManager;
    @Autowired
    protected MessageAggregatorProperties messageAggregatorProperties;
    @Autowired
    private BindingServiceProperties bindingServiceProperties;
    @Value(value="${activiti.cloud.application.name}")
    protected String activitiCloudApplicationName;
    @Value(value="${spring.application.name}")
    protected String springApplicationName;

    @Test
    public void shouldConfigureInputHeadersToRemove() {
        Assertions.assertThat((Object[])this.messageAggregatorProperties.getInputHeadersToRemove()).contains((Object[])new String[]{"kafka_consumer"});
    }

    @Test
    public void shouldConfigureHeaderChannelsTimeToLiveExpression() {
        Assertions.assertThat((String)this.messageAggregatorProperties.getHeaderChannelsTimeToLiveExpression()).contains(new CharSequence[]{"headers['headerChannelsTTL']?:60000"});
    }

    @Test
    @Timeout(value=30L)
    public void shouldProcessMessageEventsConcurrently() throws InterruptedException, JsonProcessingException {
        String messageEventName = "start";
        Integer count = 100;
        Message<MessageEventPayload> startMessage = this.startMessageDeployedEvent(messageEventName);
        String correlationId = this.correlationId(startMessage);
        this.removeMessageGroup(correlationId);
        this.send(startMessage);
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1);
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch sent = new CountDownLatch(count);
        ExecutorService exec = Executors.newSingleThreadExecutor();
        IntStream.range(0, count).forEach(i -> this.sendAsync(this.messageSentEvent(messageEventName), start, sent, exec));
        start.countDown();
        try {
            sent.await(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        IntStream.range(0, count).mapToObj(i -> Try.call(() -> this.poll(TimeUnit.SECONDS.toMillis(1L)))).forEach(out -> Assertions.assertThat((Object)out).isNotNull());
        exec.shutdownNow();
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1);
        Assertions.assertThat(this.peek()).isNull();
    }

    @Test
    @Timeout(value=30L)
    public void shouldProcessMessageEventsConcurrentlyInReversedOrder() throws InterruptedException, JsonProcessingException {
        String messageEventName = "start";
        Integer count = 100;
        Message<MessageEventPayload> startMessage = this.startMessageDeployedEvent(messageEventName);
        String correlationId = this.correlationId(startMessage);
        this.removeMessageGroup(correlationId);
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch sent = new CountDownLatch(count);
        ExecutorService exec = Executors.newSingleThreadExecutor();
        IntStream.range(0, count).forEach(i -> this.sendAsync(this.messageSentEvent(messageEventName), start, sent, exec));
        start.countDown();
        try {
            sent.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(count.intValue());
        this.send(startMessage);
        IntStream.range(0, count).mapToObj(i -> Try.call(() -> this.poll(TimeUnit.SECONDS.toMillis(3L)))).forEach(out -> Assertions.assertThat((Object)out).isNotNull());
        exec.shutdownNow();
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1);
        Assertions.assertThat(this.peek()).isNull();
    }

    @Test
    public void testStartMessageBeforeSent() throws Exception {
        String messageName = "start1";
        Message<MessageEventPayload> startMessage = this.startMessageDeployedEvent(messageName);
        String correlationId = this.correlationId(startMessage);
        this.removeMessageGroup(correlationId);
        this.send(startMessage);
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1);
        this.send(this.messageSentEvent(messageName, null, "sent1"));
        Message out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        Assertions.assertThat(this.peek()).isNull();
        ((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting(new String[]{"name", "variables"}).contains(new Object[]{"start1", Collections.singletonMap("key", "sent1")});
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1)).extracting(Message::getPayload).asList().extracting("name").containsOnly(new Object[]{messageName});
        this.send(this.messageSentEvent(messageName, null, "sent2"));
        out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        Assertions.assertThat(this.peek()).isNull();
        ((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting(new String[]{"name", "variables"}).contains(new Object[]{"start1", Collections.singletonMap("key", "sent2")});
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1)).extracting(Message::getPayload).asList().extracting("name").containsOnly(new Object[]{messageName});
    }

    @Test
    public void testStartMessageAfterSent() throws Exception {
        String messageName = "start2";
        Message<MessageEventPayload> messageSentEvent = this.messageSentEvent(messageName, null, "sent1");
        String correlationId = this.correlationId(messageSentEvent);
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
        this.send(messageSentEvent);
        this.send(this.startMessageDeployedEvent(messageName));
        Message out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        Assertions.assertThat(this.peek()).isNull();
        ((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting(new String[]{"name", "businessKey", "variables"}).contains(new Object[]{messageName, "sent1", Collections.singletonMap("key", "sent1")});
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1)).extracting(Message::getPayload).asList().extracting("name").containsOnly(new Object[]{"start2"});
        this.send(this.messageSentEvent(messageName, null, "sent2"));
        out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        Assertions.assertThat(this.peek()).isNull();
        ((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting(new String[]{"name", "businessKey", "variables"}).contains(new Object[]{messageName, "sent2", Collections.singletonMap("key", "sent2")});
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1)).extracting(Message::getPayload).asList().extracting("name").containsOnly(new Object[]{"start2"});
    }

    @Test
    public void testSentMessagesWithBuffer() throws Exception {
        String messageName = "message";
        String correlationKey = "1";
        Message<MessageEventPayload> messageSentEvent = this.messageSentEvent(messageName, correlationKey, "sent1");
        String correlationId = this.correlationId(messageSentEvent);
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
        this.send(messageSentEvent);
        this.send(this.messageWaitingEvent(messageName, correlationKey, "waiting1"));
        this.send(this.messageWaitingEvent(messageName, correlationKey, "waiting2"));
        Message out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        ((MapAssert)((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting("variables").asInstanceOf(InstanceOfAssertFactories.MAP)).containsEntry((Object)"key", (Object)"sent1");
        Assertions.assertThat(this.peek()).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(2)).extracting(Message::getPayload).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "waiting1"), Collections.singletonMap("key", "waiting2")});
        this.send(this.messageReceivedEvent(messageName, correlationKey));
        Assertions.assertThat(this.peek()).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1)).extracting(Message::getPayload).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "waiting2")});
        this.send(this.messageSentEvent(messageName, correlationKey, "sent2"));
        out = this.poll(TimeUnit.SECONDS.toMillis(1L));
        ((MapAssert)((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting("variables").asInstanceOf(InstanceOfAssertFactories.MAP)).containsEntry((Object)"key", (Object)"sent2");
        Assertions.assertThat(this.peek()).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1)).extracting(Message::getPayload).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "waiting2")});
        this.send(this.messageReceivedEvent(messageName, correlationKey));
        Assertions.assertThat(this.peek()).isNull();
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).isEmpty();
    }

    @Test
    public void testReceiveMessagePayload() throws Exception {
        String messageName = "message";
        String correlationKey = "1";
        String businessKey = "businessKey";
        Message<MessageEventPayload> messageSentEvent = this.messageSentEvent(messageName, correlationKey, businessKey);
        String correlationId = this.correlationId(messageSentEvent);
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
        this.send(messageSentEvent);
        this.send(this.messageWaitingEvent(messageName, correlationKey, businessKey));
        Message out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        ((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting(new String[]{"name", "correlationKey", "variables"}).contains(new Object[]{messageName, correlationKey, Collections.singletonMap("key", businessKey)});
        Assertions.assertThat(this.peek()).isNull();
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
    }

    @Test
    public void testStartMessagePayload() throws Exception {
        String messageName = "message";
        String correlationKey = null;
        String businessKey = "businessKey";
        Message<MessageEventPayload> startMessageDeployedEvent = this.startMessageDeployedEvent(messageName);
        String correlationId = this.correlationId(startMessageDeployedEvent);
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
        this.send(startMessageDeployedEvent);
        this.send(this.messageSentEvent(messageName, correlationKey, businessKey));
        Message out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        ((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting(new String[]{"name", "businessKey", "variables"}).contains(new Object[]{messageName, businessKey, Collections.singletonMap("key", businessKey)});
        Assertions.assertThat(this.peek()).isNull();
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
    }

    @Test
    public void testSentMessagesWithBufferInDifferentOrder() throws Exception {
        String messageName = "message";
        String correlationKey = "1";
        String correlationId = this.correlationId(this.messageWaitingEvent(messageName, correlationKey));
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
        this.send(this.messageSentEvent(messageName, correlationKey, "sent1"));
        this.send(this.messageSentEvent(messageName, correlationKey, "sent2"));
        this.send(this.messageWaitingEvent(messageName, correlationKey, "waiting1"));
        Message out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        ((MapAssert)((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting("variables").asInstanceOf(InstanceOfAssertFactories.MAP)).containsEntry((Object)"key", (Object)"sent1");
        Assertions.assertThat(this.peek()).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(2)).extracting(Message::getPayload).asList().extracting("variables").contains(new Object[]{Collections.singletonMap("key", "sent2"), Collections.singletonMap("key", "waiting1")});
        this.send(this.messageReceivedEvent(messageName, correlationKey, "received1"));
        Assertions.assertThat(this.peek()).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1)).extracting(Message::getPayload).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "sent2")});
        this.send(this.messageWaitingEvent(messageName, correlationKey, "waiting2"));
        out = this.poll(TimeUnit.SECONDS.toMillis(0L));
        Assertions.assertThat(this.peek()).isNull();
        ((MapAssert)((ObjectAssert)Assertions.assertThat(out).isNotNull()).extracting(Message::getPayload).extracting("variables").asInstanceOf(InstanceOfAssertFactories.MAP)).containsEntry((Object)"key", (Object)"sent2");
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1)).extracting(Message::getPayload).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "waiting2")});
        this.send(this.messageReceivedEvent(messageName, correlationKey, "received2"));
        Assertions.assertThat(this.peek()).isNull();
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).isEmpty();
    }

    @Test
    public void testSubscriptionCancelled() throws Exception {
        String messageName = "message";
        String correlationKey = "1";
        Message<MessageEventPayload> subscriptionCancelled = this.subscriptionCancelledEvent(messageName, correlationKey);
        String groupName = this.correlationId(subscriptionCancelled);
        this.messageGroupStore.removeMessageGroup((Object)groupName);
        this.send(this.messageWaitingEvent(messageName, correlationKey));
        this.send(this.messageWaitingEvent(messageName, correlationKey));
        Assertions.assertThat((Collection)this.messageGroup(groupName).getMessages()).hasSize(2);
        this.send(subscriptionCancelled);
        Assertions.assertThat(this.peek()).isNull();
        Assertions.assertThat((Collection)this.messageGroup(groupName).getMessages()).isEmpty();
    }

    @Test
    public void testIdempotentMessageInterceptor() throws Exception {
        String messageName = "message";
        String correlationKey = "1";
        Message<MessageEventPayload> waitingMessage = this.messageWaitingEvent(messageName, correlationKey);
        String correlationId = this.correlationId(waitingMessage);
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
        this.send(waitingMessage);
        this.send(waitingMessage);
        Assertions.assertThat(this.peek()).isNull();
        Assertions.assertThat((Object)this.discardQueue.receive(0L)).isNotNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).isNotNull()).hasSize(1);
        Message<MessageEventPayload> receivedMessage = this.messageReceivedEvent(messageName, correlationKey);
        this.send(receivedMessage);
        this.send(receivedMessage);
        Assertions.assertThat(this.peek()).isNull();
        Assertions.assertThat((Object)this.discardQueue.receive(0L)).isNotNull();
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(0);
    }

    @Test
    public void testMessageFilterDiscardChannel() throws Exception {
        Message invalidMessage = ((MessageBuilder)MessageBuilder.withPayload((Object)"message").setHeader("contentType", (Object)"text/plain")).build();
        this.streamBridge.send("messageConnectorInput", (Object)invalidMessage);
        Assertions.assertThat(this.peek()).isNull();
        Message out = this.discardQueue.receive(0L);
        Assertions.assertThat((String)new String((byte[])out.getPayload())).isEqualTo("message");
    }

    @Test
    public void testInvalidMessagePayloadDiscardChannel() throws Exception {
        Message invalidMessage = ((MessageBuilder)((MessageBuilder)MessageBuilder.withPayload((Object)"payload").setHeader("contentType", (Object)"text/plain")).setHeader("messageEventType", (Object)BPMNMessageEvent.MessageEvents.MESSAGE_SENT.name())).build();
        Throwable thrown = Assertions.catchThrowable(() -> this.streamBridge.send("messageConnectorInput", (Object)invalidMessage));
        Assertions.assertThat(this.peek()).isNull();
        Message out = this.errorQueue.receive(0L);
        Assertions.assertThat((Object)out).isNull();
        Assertions.assertThat((Throwable)thrown).isInstanceOf(MessageTransformationException.class);
    }

    @Test
    public void testControlBusStartStopComponents() throws Exception {
        String messageName = "test";
        this.controlBus.send("@aggregator.stop()");
        Throwable thrown = Assertions.catchThrowable(() -> this.send(this.messageSentEvent(messageName, null, "error")));
        this.controlBus.send("@aggregator.start()");
        Assertions.assertThat((Throwable)thrown).isInstanceOf(MessageDeliveryException.class);
    }

    @Test
    public void testTransactionException() throws Exception {
        String messageName = "start1";
        Message<MessageEventPayload> startMessage = this.startMessageDeployedEvent(messageName);
        String correlationId = this.correlationId(startMessage);
        this.removeMessageGroup(correlationId);
        this.send(startMessage);
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1);
        Throwable thrown = Assertions.catchThrowable(() -> this.send(this.messageSentEvent("errorInterceptor", null, "errorInterceptor")));
        Assertions.assertThat((Collection)this.messageGroup(correlationId).getMessages()).hasSize(1);
        Assertions.assertThat((Throwable)thrown).isInstanceOf(MessageDeliveryException.class);
    }

    protected MessageBuilder<MessageEventPayload> messageBuilder(String messageName) {
        return this.messageBuilder(messageName, null, null);
    }

    protected MessageBuilder<MessageEventPayload> messageBuilder(String messageName, String correlationKey) {
        return this.messageBuilder(messageName, correlationKey, null);
    }

    protected MessageBuilder<MessageEventPayload> messageBuilder(String messageName, String correlationKey, String businessKey) {
        MessageEventPayload payload = MessageEventPayloadBuilder.messageEvent((String)messageName).withCorrelationKey(correlationKey).withBusinessKey(businessKey).withVariables(Collections.singletonMap("key", businessKey)).build();
        return (MessageBuilder)((MessageBuilder)((MessageBuilder)((MessageBuilder)((MessageBuilder)((MessageBuilder)MessageBuilder.withPayload((Object)payload).setHeader("messageEventName", (Object)messageName)).setHeader("messageEventCorrelationKey", (Object)correlationKey)).setHeader("messageEventId", (Object)UUID.randomUUID())).setHeader("appName", (Object)this.activitiCloudApplicationName)).setHeader("messageEventOutputDestination", (Object)this.bindingServiceProperties.getBindingDestination("messageConnectorInput-out-0"))).setHeader("serviceFullName", (Object)this.springApplicationName);
    }

    protected Message<MessageEventPayload> startMessageDeployedEvent(String messageName) {
        return ((MessageBuilder)this.messageBuilder(messageName, null).setHeader("messageEventType", (Object)MessageDefinitionEvent.MessageDefinitionEvents.START_MESSAGE_DEPLOYED.name())).build();
    }

    protected Message<MessageEventPayload> messageSentEvent(String messageName) {
        return this.messageSentEvent(messageName, null);
    }

    protected Message<MessageEventPayload> messageSentEvent(String messageName, String correlationKey) {
        return this.messageSentEvent(messageName, correlationKey, null);
    }

    protected Message<MessageEventPayload> messageSentEvent(String messageName, String correlationKey, String businessKey) {
        return ((MessageBuilder)this.messageBuilder(messageName, correlationKey, businessKey).setHeader("messageEventType", (Object)BPMNMessageEvent.MessageEvents.MESSAGE_SENT.name())).build();
    }

    protected Message<MessageEventPayload> messageWaitingEvent(String messageName) {
        return this.messageWaitingEvent(messageName, null);
    }

    protected Message<MessageEventPayload> messageWaitingEvent(String messageName, String correlationKey, String businessKey) {
        return ((MessageBuilder)this.messageBuilder(messageName, correlationKey, businessKey).setHeader("messageEventType", (Object)BPMNMessageEvent.MessageEvents.MESSAGE_WAITING.name())).build();
    }

    protected Message<MessageEventPayload> messageWaitingEvent(String messageName, String correlationKey) {
        return ((MessageBuilder)this.messageBuilder(messageName, correlationKey).setHeader("messageEventType", (Object)BPMNMessageEvent.MessageEvents.MESSAGE_WAITING.name())).build();
    }

    protected Message<MessageEventPayload> messageReceivedEvent(String messageName, String correlationKey) {
        return this.messageReceivedEvent(messageName, correlationKey, null);
    }

    protected Message<MessageEventPayload> messageReceivedEvent(String messageName, String correlationKey, String businessKey) {
        return ((MessageBuilder)this.messageBuilder(messageName, correlationKey, businessKey).setHeader("messageEventType", (Object)BPMNMessageEvent.MessageEvents.MESSAGE_RECEIVED.name())).build();
    }

    protected Message<MessageEventPayload> subscriptionCancelledEvent(String messageName, String correlationKey) {
        return ((MessageBuilder)this.messageBuilder(messageName, correlationKey).setHeader("messageEventType", (Object)MessageSubscriptionEvent.MessageSubscriptionEvents.MESSAGE_SUBSCRIPTION_CANCELLED.name())).build();
    }

    protected void send(Message<?> message) {
        String json;
        try {
            json = this.objectMapper.writeValueAsString(message.getPayload());
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        this.streamBridge.send("messageConnectorInput", (Object)((MessageBuilder)MessageBuilder.withPayload((Object)json).copyHeaders((Map)message.getHeaders())).build());
    }

    protected <T> Message<T> poll(long timeout) {
        Message message = this.outputDestination.receive(timeout);
        return Optional.ofNullable(message).map(it -> ((MessageBuilder)MessageBuilder.withPayload((Object)this.messageEventPayload((Message<?>)it)).copyHeaders((Map)it.getHeaders())).build()).orElse(null);
    }

    protected <T> Message<T> peek() {
        return this.outputDestination.receive();
    }

    protected MessageGroup messageGroup(String groupName) {
        return this.aggregatingMessageHandler.getMessageStore().getMessageGroup((Object)groupName);
    }

    protected String correlationId(Message<?> message) {
        return Correlations.getCorrelationId(message);
    }

    protected void removeMessageGroup(String correlationId) {
        this.messageGroupStore.removeMessageGroup((Object)correlationId);
    }

    protected void sendAsync(Message<?> message, CountDownLatch start, CountDownLatch sent, ExecutorService exec) {
        exec.execute(() -> {
            try {
                start.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            Try.run(() -> this.send(message));
            sent.countDown();
        });
    }

    private Object messageEventPayload(Message<?> message) {
        Object payload = message.getPayload();
        try {
            if (payload instanceof String) {
                return this.objectMapper.readValue((String)payload, MessageEventPayload.class);
            }
            if (payload instanceof byte[]) {
                return this.objectMapper.readValue((byte[])payload, MessageEventPayload.class);
            }
        }
        catch (IOException e) {
            LOGGER.warn("The payload {} cannot be converted to MessageEventPayload, so it is returned as is: {}", payload, (Object)e);
            return payload;
        }
        return payload;
    }

    static class Try {
        Try() {
        }

        public static <T> T call(Callable<T> callable) throws RuntimeException {
            return Try.call(callable, RuntimeException::new);
        }

        public static void run(RunnableExceptionWrapper runnable) {
            try {
                runnable.run();
            }
            catch (Exception e) {
                Try.sneakyThrow(e);
            }
        }

        public static <T, E extends Throwable> T call(Callable<T> callable, ExceptionWrapper<E> wrapper) throws E {
            try {
                return callable.call();
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw (Throwable)wrapper.wrap(e);
            }
        }

        private static <T extends Throwable> void sneakyThrow(Throwable t) throws T {
            throw t;
        }

        @FunctionalInterface
        public static interface ExceptionWrapper<E> {
            public E wrap(Exception var1);
        }

        @FunctionalInterface
        public static interface RunnableExceptionWrapper {
            public void run() throws Exception;
        }
    }

    @TestConfiguration
    @Import(value={TestChannelBinderConfiguration.class})
    static class TestConfigurationContext {
        TestConfigurationContext() {
        }

        @Bean
        @BridgeFrom(value="errorChannel")
        MessageChannel errorQueue() {
            return (MessageChannel)MessageChannels.queue().getObject();
        }

        @Bean
        @BridgeFrom(value="discardChannel")
        MessageChannel discardQueue() {
            return (MessageChannel)MessageChannels.queue().getObject();
        }

        @Bean
        @GlobalChannelInterceptor
        public ChannelInterceptor channelInterceptor() {
            return new ChannelInterceptor(this){
                {
                    Objects.requireNonNull(this$0);
                }

                public Message<?> preSend(Message<?> message, MessageChannel channel) {
                    MessageHeaders headers = message.getHeaders();
                    if (headers.containsKey((Object)"messageEventName") && "errorInterceptor".equals(headers.get((Object)"messageEventName"))) {
                        throw new IllegalArgumentException("transaction failed");
                    }
                    return message;
                }
            };
        }
    }
}

