/*
 * Decompiled with CFR 0.152.
 */
package org.activiti.cloud.common.messaging.config;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.activiti.cloud.common.messaging.ActivitiCloudMessagingProperties;
import org.activiti.cloud.common.messaging.config.ActivitiMessagingDestinationsAutoConfiguration;
import org.activiti.cloud.common.messaging.config.CompletableFutureRetry;
import org.activiti.cloud.common.messaging.config.InputBindingConfiguration;
import org.activiti.cloud.common.messaging.functional.FunctionBinding;
import org.activiti.cloud.common.messaging.functional.InputBinding;
import org.activiti.cloud.common.messaging.functional.OutputBinding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;

@AutoConfiguration(before={InputBindingConfiguration.class}, after={BinderFactoryAutoConfiguration.class, ActivitiMessagingDestinationsAutoConfiguration.class})
@ConditionalOnProperty(value={"activiti.cloud.messaging.function-router.enabled"})
public class FunctionRouterConfiguration {
    private static final Logger log = LoggerFactory.getLogger(FunctionRouterConfiguration.class);
    public static final String FUNCTION_DESTINATION = "spring.cloud.function.destination";
    public static final String FUNCTION_ROUTER_INPUT = "functionRouterInput";

    @Bean
    ApplicationRunner functionRouterConfigurationApplicationRunner(ActivitiCloudMessagingProperties messagingProperties) {
        return args -> log.warn("Function Router has been initialized: {}", (Object)messagingProperties.getFunctionRouter());
    }

    @Bean
    @FunctionBinding(input="functionRouterInput")
    Consumer<Message<?>> functionRouterConsumer(RoutingFunction routingFunction, ActivitiCloudMessagingProperties messagingProperties) {
        ActivitiCloudMessagingProperties.FunctionRouterProperties functionRouter = messagingProperties.getFunctionRouter();
        return message -> Optional.of(message).filter(it -> it.getHeaders().containsKey((Object)FUNCTION_DESTINATION)).map(it -> (String)it.getHeaders().get((Object)FUNCTION_DESTINATION, String.class)).map(messagingProperties.getFunctionRouter().registrations()::get).filter(Predicate.not(Collection::isEmpty)).ifPresentOrElse(registrations -> {
            Function<Message, String> resolveFunctionDefinition = functionMessage -> (String)functionMessage.getHeaders().get((Object)"spring.cloud.function.definition", String.class);
            BiFunction<Message, String, Message> toFunctionRequest = (functionMessage, functionRegistration) -> MessageBuilder.fromMessage((Message)functionMessage).setHeader("spring.cloud.function.definition", functionRegistration).build();
            CompletableFuture[] functions = (CompletableFuture[])registrations.stream().map(functionRegistration -> (Message)toFunctionRequest.apply((Message)message, (String)functionRegistration)).map(functionRequest -> ((CompletableFuture)CompletableFutureRetry.supplyAsyncWithRetry(() -> CompletableFuture.supplyAsync(() -> routingFunction.apply(functionRequest)), functionRouter.getMaxRetries(), functionRouter.getRetryInterval()).thenApply(result -> {
                String functionDefinition = (String)resolveFunctionDefinition.apply((Message)functionRequest);
                log.debug("Function message request {} successfully routed to {}", functionRequest, (Object)functionDefinition);
                return Map.entry(functionDefinition, Optional.ofNullable(result));
            })).exceptionally(error -> {
                String functionDefinition = (String)resolveFunctionDefinition.apply((Message)functionRequest);
                log.error("Error routing message request {} to function registration {}", new Object[]{functionRequest, functionDefinition, error});
                return Map.entry(functionDefinition, Optional.of(error));
            })).toArray(CompletableFuture[]::new);
            CompletionStage completed = CompletableFuture.allOf(functions).thenApply(v -> Stream.of(functions).map(CompletableFuture::join).toList());
            ((CompletableFuture)completed).thenAccept(results -> {
                List<Object> errors = results.stream().map(Map.Entry.class::cast).filter(entry -> ((Optional)Optional.class.cast(entry.getValue())).filter(Exception.class::isInstance).isPresent()).map(entry -> ((Optional)Optional.class.cast(entry.getValue())).get()).toList();
                if (!errors.isEmpty()) {
                    log.debug("Errors handling function route message request {}", errors);
                } else {
                    log.debug("Successfully completed function route message request {}", message);
                }
            });
        }, () -> {
            String destination = (String)message.getHeaders().get((Object)FUNCTION_DESTINATION, String.class);
            List registration = Optional.ofNullable(destination).map(it -> messagingProperties.getFunctionRouter().registrations().get(it)).orElse(List.of());
            log.warn("Unable to route message {} to destination '{}' for function registration '{}'", new Object[]{message, destination, registration});
        });
    }

    @Bean
    MessageRoutingCallback functionRouterMessageRoutingCallback() {
        return new MessageRoutingCallback(this){

            public String routingResult(Message<?> message) {
                return (String)message.getHeaders().get((Object)"spring.cloud.function.definition", String.class);
            }
        };
    }

    @Bean
    public BeanPostProcessor outputBindingChannelPostProcessor(final @Autowired DefaultListableBeanFactory beanFactory, final @Autowired BindingServiceProperties bindingServiceProperties) {
        return new BeanPostProcessor(){

            public Object postProcessAfterInitialization(Object bean, final String beanName) {
                if (bean instanceof DirectChannel) {
                    DirectChannel messageChannel = (DirectChannel)bean;
                    Optional.ofNullable((OutputBinding)beanFactory.findAnnotationOnBean(beanName, OutputBinding.class)).ifPresent(outputBinding -> messageChannel.addInterceptor(new ChannelInterceptor(){

                        public Message<?> preSend(Message<?> message, MessageChannel channel) {
                            return Optional.ofNullable((BindingProperties)bindingServiceProperties.getBindings().get(beanName)).map(binding -> MessageBuilder.fromMessage((Message)message).setHeader(FunctionRouterConfiguration.FUNCTION_DESTINATION, (Object)binding.getDestination()).build()).orElse(message);
                        }
                    }));
                }
                return bean;
            }
        };
    }

    @Configuration
    static class FunctionRouterChannels {
        FunctionRouterChannels() {
        }

        @InputBinding(value={"functionRouterInput"})
        SubscribableChannel functionRouterInput() {
            return (SubscribableChannel)MessageChannels.publishSubscribe((String)FunctionRouterConfiguration.FUNCTION_ROUTER_INPUT).getObject();
        }
    }
}

