/*
 * Decompiled with CFR 0.152.
 */
package org.activiti.cloud.services.notifications.graphql.events.consumer;

import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.activiti.cloud.common.messaging.functional.FunctionBinding;
import org.activiti.cloud.services.notifications.graphql.events.RoutingKeyResolver;
import org.activiti.cloud.services.notifications.graphql.events.SpELTemplateRoutingKeyResolver;
import org.activiti.cloud.services.notifications.graphql.events.consumer.EngineEventsConsumerChannels;
import org.activiti.cloud.services.notifications.graphql.events.consumer.EngineEventsConsumerMessageHandler;
import org.activiti.cloud.services.notifications.graphql.events.consumer.EngineEventsConsumerProperties;
import org.activiti.cloud.services.notifications.graphql.events.model.EngineEvent;
import org.activiti.cloud.services.notifications.graphql.events.transformer.EngineEventsTransformer;
import org.activiti.cloud.services.notifications.graphql.events.transformer.Transformer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.annotation.PropertySources;
import org.springframework.integration.dsl.GatewayEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@AutoConfiguration
@EnableConfigurationProperties(value={EngineEventsConsumerProperties.class})
@ConditionalOnProperty(name={"spring.activiti.cloud.services.notifications.graphql.events.enabled"}, matchIfMissing=true)
@PropertySources(value={@PropertySource(value={"classpath:META-INF/graphql-events.properties"}), @PropertySource(value={"classpath:graphql-events.properties"}, ignoreResourceNotFound=true)})
public class EngineEventsConsumerAutoConfiguration {

    @Configuration
    public static class DefaultEngineEventsConsumerConfiguration
    implements EngineEventsConsumerChannels {
        private static final Logger logger = LoggerFactory.getLogger(DefaultEngineEventsConsumerConfiguration.class);
        public static final String ENGINE_EVENTS_FLUX_SCHEDULER = "engineEventsScheduler";
        private final EngineEventsConsumerProperties properties;

        @Autowired
        public DefaultEngineEventsConsumerConfiguration(EngineEventsConsumerProperties properties) {
            this.properties = properties;
        }

        @Bean
        @ConditionalOnMissingBean
        public RoutingKeyResolver routingKeyResolver() {
            return new SpELTemplateRoutingKeyResolver();
        }

        @Bean
        @ConditionalOnMissingBean
        public Transformer engineEventsTransformer() {
            return new EngineEventsTransformer(Arrays.asList(this.properties.getProcessEngineEventAttributeKeys().split(",")), this.properties.getProcessEngineEventTypeKey());
        }

        @Bean
        @ConditionalOnMissingBean
        public EngineEventsConsumerMessageHandler engineEventsMessageHandler(Transformer engineEventsTransformer) {
            return new EngineEventsConsumerMessageHandler(engineEventsTransformer);
        }

        @Bean
        @FunctionBinding(input="graphQLEngineEventsConsumerSource")
        public Consumer<Message<List<EngineEvent>>> engineEventsGraphQlSourceConsumer(MessageChannel engineEventsPublisherInput) {
            return arg_0 -> ((MessageChannel)engineEventsPublisherInput).send(arg_0);
        }

        @Bean
        MessageChannel engineEventsPublisherInput() {
            return (MessageChannel)MessageChannels.direct((String)"engineEventsPublisherInput").getObject();
        }

        @Bean
        @ConditionalOnMissingBean
        public Publisher<Message<List<EngineEvent>>> engineEventsPublisher(EngineEventsConsumerMessageHandler engineEventsMessageHandler, MessageChannel engineEventsPublisherInput) {
            return ((IntegrationFlowBuilder)((IntegrationFlowBuilder)IntegrationFlow.from((MessageChannel)engineEventsPublisherInput).log(LoggingHandler.Level.DEBUG)).gateway(gatewayFlow -> gatewayFlow.transform((Object)engineEventsMessageHandler), gatewaySpec -> ((GatewayEndpointSpec)gatewaySpec.sendTimeout(-1L)).requestTimeout(Long.valueOf(-1L)).requiresReply(false))).toReactivePublisher();
        }

        @Bean
        @ConditionalOnMissingBean
        public Flux<Message<List<EngineEvent>>> engineEventsFlux(Publisher<Message<List<EngineEvent>>> engineEventsPublisher, Scheduler engineEventsScheduler) {
            return Flux.from(engineEventsPublisher).doOnError(error -> logger.error("Error while publishing engine events: {}", (Object)error.getMessage(), error)).onErrorResume(e -> Mono.empty()).publish().autoConnect().parallel().runOn(engineEventsScheduler).sequential().onBackpressureLatest();
        }

        @Bean
        @ConditionalOnMissingBean(name={"engineEventsScheduler"})
        public Scheduler engineEventsScheduler() {
            return Schedulers.boundedElastic();
        }

        @Bean
        InitializingBean engineEventsFluxConsumer(Flux<Message<List<EngineEvent>>> engineEventsFlux) {
            return () -> {
                logger.info("Subscribing engineEventsFlux consumer");
                engineEventsFlux.subscribe(message -> logger.debug("Received engine events {}", (Object)message.getHeaders()), e -> logger.error("Error while receiving engine events", e), () -> logger.warn("Completing engineEventsFlux consumer"));
            };
        }
    }
}

