/*
 * Decompiled with CFR 0.152.
 */
package org.activiti.cloud.services.notifications.graphql.ws.transport;

import com.codahale.metrics.annotation.Timed;
import graphql.ExecutionResult;
import java.security.Principal;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.activiti.cloud.services.notifications.graphql.ws.api.GraphQLMessage;
import org.activiti.cloud.services.notifications.graphql.ws.api.GraphQLMessageType;
import org.activiti.cloud.services.notifications.graphql.ws.transport.GraphQLBrokerChannelSubscriber;
import org.activiti.cloud.services.notifications.graphql.ws.transport.GraphQLBrokerSubscriptionRegistry;
import org.activiti.cloud.services.notifications.graphql.ws.transport.GraphQLSubscriptionExecutor;
import org.activiti.cloud.services.notifications.graphql.ws.util.QueryParameters;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessageHeaderInitializer;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;

public class GraphQLBrokerMessageHandler
extends AbstractBrokerMessageHandler {
    public static final String BROKER_NOT_AVAILABLE = "Broker Not Available.";
    private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
    private MessageHeaderInitializer headerInitializer;
    private TaskScheduler taskScheduler;
    private long[] heartbeatValue;
    private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
    private ScheduledFuture<?> heartbeatFuture;
    private final GraphQLSubscriptionExecutor graphQLSubscriptionExecutor;
    private final GraphQLBrokerSubscriptionRegistry graphQLsubscriptionRegistry;
    private long bufferTimeSpanMs = 1000L;
    private int bufferCount = 50;

    public GraphQLBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, SubscribableChannel brokerChannel, GraphQLSubscriptionExecutor graphQLSubscriptionExecutor) {
        super(inboundChannel, outboundChannel, brokerChannel);
        this.graphQLSubscriptionExecutor = graphQLSubscriptionExecutor;
        this.graphQLsubscriptionRegistry = new GraphQLBrokerSubscriptionRegistry();
        this.setPreservePublishOrder(true);
    }

    public GraphQLBrokerSubscriptionRegistry getGraphQLsubscriptionRegistry() {
        return this.graphQLsubscriptionRegistry;
    }

    public long getBufferTimeSpanMs() {
        return this.bufferTimeSpanMs;
    }

    public GraphQLBrokerMessageHandler setBufferTimeSpanMs(long bufferTimeSpanMs) {
        this.bufferTimeSpanMs = bufferTimeSpanMs;
        return this;
    }

    public int getBufferCount() {
        return this.bufferCount;
    }

    public GraphQLBrokerMessageHandler setBufferCount(int bufferCount) {
        this.bufferCount = bufferCount;
        return this;
    }

    @EventListener
    public void on(BrokerAvailabilityEvent event) {
        this.brokerAvailable.set(event.isBrokerAvailable());
    }

    public boolean isBrokerAvailable() {
        return this.brokerAvailable.get();
    }

    protected void startInternal() {
        if (this.getTaskScheduler() != null) {
            long interval = this.initHeartbeatTaskDelay();
            if (interval > 0L) {
                this.heartbeatFuture = this.taskScheduler.scheduleWithFixedDelay((Runnable)new HeartbeatTask(), interval);
            }
        } else {
            Assert.isTrue((this.getHeartbeatValue() == null || this.getHeartbeatValue()[0] == 0L && this.getHeartbeatValue()[1] == 0L ? 1 : 0) != 0, (String)"Heartbeat values configured but no TaskScheduler provided");
        }
        this.publishBrokerAvailableEvent();
    }

    protected void stopInternal() {
        this.publishBrokerUnavailableEvent();
        try {
            if (this.heartbeatFuture != null) {
                this.heartbeatFuture.cancel(true);
            }
        }
        catch (Throwable ex) {
            this.logger.error((Object)"Error in shutdown of TCP client", ex);
        }
    }

    private long initHeartbeatTaskDelay() {
        if (this.getHeartbeatValue() == null) {
            return 0L;
        }
        if (this.getHeartbeatValue()[0] > 0L && this.getHeartbeatValue()[1] > 0L) {
            return Math.min(this.getHeartbeatValue()[0], this.getHeartbeatValue()[1]);
        }
        return this.getHeartbeatValue()[0] > 0L ? this.getHeartbeatValue()[0] : this.getHeartbeatValue()[1];
    }

    protected void handleMessageInternal(Message<?> message) {
        MessageHeaders headers = message.getHeaders();
        SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType((Map)headers);
        String destination = SimpMessageHeaderAccessor.getDestination((Map)headers);
        String sessionId = SimpMessageHeaderAccessor.getSessionId((Map)headers);
        Principal user = SimpMessageHeaderAccessor.getUser((Map)headers);
        this.updateSessionReadTime(sessionId);
        if (!this.checkDestinationPrefix(destination)) {
            return;
        }
        if (SimpMessageType.MESSAGE.equals((Object)messageType) && message.getPayload() instanceof GraphQLMessage) {
            this.logMessage(message);
            Message<?> graphQLMessage = message;
            GraphQLMessageType graphQLMessagePayloadType = ((GraphQLMessage)graphQLMessage.getPayload()).getType();
            switch (graphQLMessagePayloadType) {
                case CONNECTION_INIT: {
                    if (!this.isBrokerAvailable()) {
                        this.sendErrorMessageToClient(BROKER_NOT_AVAILABLE, GraphQLMessageType.CONNECTION_ERROR, message);
                        return;
                    }
                    long[] clientHeartbeat = SimpMessageHeaderAccessor.getHeartbeat((Map)headers);
                    long[] serverHeartbeat = this.getHeartbeatValue();
                    this.sessions.put(sessionId, new SessionInfo(sessionId, user, clientHeartbeat, serverHeartbeat));
                    this.handleConnectionInitMessage(graphQLMessage);
                    break;
                }
                case START: {
                    if (!this.isBrokerAvailable()) {
                        this.sendErrorMessageToClient(BROKER_NOT_AVAILABLE, GraphQLMessageType.ERROR, message);
                        return;
                    }
                    this.handleStartSubscription(graphQLMessage);
                    break;
                }
                case STOP: {
                    this.handleStopSubscription(graphQLMessage);
                    break;
                }
                case CONNECTION_TERMINATE: {
                    this.handleConnectionTerminate(graphQLMessage);
                    break;
                }
            }
        }
    }

    @Timed
    protected final void handleConnectionInitMessage(Message<GraphQLMessage> message) {
        GraphQLMessage operationPayload = (GraphQLMessage)message.getPayload();
        GraphQLMessage connection_ack = new GraphQLMessage(operationPayload.getId(), GraphQLMessageType.CONNECTION_ACK);
        MessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.getMutableAccessor(message);
        Message responseMessage = MessageBuilder.createMessage((Object)connection_ack, (MessageHeaders)headerAccessor.getMessageHeaders());
        this.getClientOutboundChannel().send(responseMessage);
    }

    @Timed
    protected final void handleStartSubscription(Message<GraphQLMessage> message) {
        this.logger.info((Object)("handleStartSubscription for message " + String.valueOf(message)));
        MessageHeaders headers = message.getHeaders();
        String sessionId = SimpMessageHeaderAccessor.getSessionId((Map)headers);
        GraphQLMessage operationPayload = (GraphQLMessage)message.getPayload();
        QueryParameters parameters = null;
        try {
            parameters = QueryParameters.from(operationPayload.getPayload());
        }
        catch (Exception e) {
            this.sendErrorMessageToClient(e.getMessage(), GraphQLMessageType.ERROR, message);
            return;
        }
        ExecutionResult executionResult = this.graphQLSubscriptionExecutor.execute(parameters.getQuery(), parameters.getVariables());
        if (executionResult.getErrors().isEmpty()) {
            if (executionResult.getData() == null) {
                this.sendErrorMessageToClient("Server error!", GraphQLMessageType.ERROR, message);
            } else if (executionResult.getData() instanceof Publisher) {
                Optional.of((Publisher)executionResult.getData()).ifPresent(data -> {
                    MessageChannel outboundChannel = this.getClientOutboundChannelForSession(sessionId);
                    GraphQLBrokerChannelSubscriber subscriber = new GraphQLBrokerChannelSubscriber(message, operationPayload.getId(), outboundChannel, this.bufferTimeSpanMs, this.bufferCount);
                    this.graphQLsubscriptionRegistry.subscribe(sessionId, operationPayload.getId(), subscriber, () -> data.subscribe((Subscriber)subscriber));
                });
            } else {
                this.handleQueryOrMutation(operationPayload.getId(), executionResult, message);
            }
        } else {
            Map<String, List> payload = Collections.singletonMap("errors", executionResult.getErrors());
            GraphQLMessage startSubscriptionMessage = (GraphQLMessage)message.getPayload();
            GraphQLMessage startSubscriptionErrors = new GraphQLMessage(startSubscriptionMessage.getId(), GraphQLMessageType.ERROR, payload);
            Message errorMessage = MessageBuilder.createMessage((Object)startSubscriptionErrors, (MessageHeaders)headers);
            this.getClientOutboundChannel().send(errorMessage);
        }
    }

    private void handleQueryOrMutation(String id, ExecutionResult result, Message<GraphQLMessage> message) {
        Map<String, Object> payload = Collections.singletonMap("data", result.getData());
        MessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.getMutableAccessor(message);
        GraphQLMessage operationData = new GraphQLMessage(id, GraphQLMessageType.DATA, payload);
        Message dataMessage = MessageBuilder.createMessage((Object)operationData, (MessageHeaders)headerAccessor.getMessageHeaders());
        this.getClientOutboundChannel().send(dataMessage);
        GraphQLMessage completeData = new GraphQLMessage(id, GraphQLMessageType.COMPLETE);
        Message completeMessage = MessageBuilder.createMessage((Object)completeData, (MessageHeaders)headerAccessor.getMessageHeaders());
        this.getClientOutboundChannel().send(completeMessage);
    }

    @Timed
    protected final void handleStopSubscription(Message<GraphQLMessage> message) {
        this.logger.info((Object)("handleStopSubscription for message " + String.valueOf(message)));
        MessageHeaders headers = message.getHeaders();
        String sessionId = SimpMessageHeaderAccessor.getSessionId((Map)headers);
        GraphQLMessage operationPayload = (GraphQLMessage)message.getPayload();
        this.graphQLsubscriptionRegistry.unsubscribe(sessionId, operationPayload.getId(), subscriber -> subscriber.onComplete());
    }

    @Timed
    protected final void handleConnectionTerminate(Message<GraphQLMessage> message) {
        this.logger.info((Object)("handleConnectionTerminate for message " + String.valueOf(message)));
        MessageHeaders headers = message.getHeaders();
        String sessionId = SimpMessageHeaderAccessor.getSessionId((Map)headers);
        this.graphQLsubscriptionRegistry.unsubscribe(sessionId, subscriber -> subscriber.cancel());
    }

    private void sendErrorMessageToClient(String errorText, GraphQLMessageType type, Message<?> inputMessage) {
        Map<String, List<String>> payload = Collections.singletonMap("errors", Collections.singletonList(errorText));
        GraphQLMessage inputOperation = (GraphQLMessage)inputMessage.getPayload();
        GraphQLMessage connectionError = new GraphQLMessage(inputOperation.getId(), type, payload);
        MessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.getMutableAccessor(inputMessage);
        Message errorMessage = MessageBuilder.createMessage((Object)connectionError, (MessageHeaders)headerAccessor.getMessageHeaders());
        this.getClientOutboundChannel().send(errorMessage);
    }

    private void updateSessionReadTime(String sessionId) {
        SessionInfo info;
        if (sessionId != null && (info = this.sessions.get(sessionId)) != null) {
            info.setLastReadTime(System.currentTimeMillis());
        }
    }

    private void logMessage(Message<?> message) {
        if (this.logger.isDebugEnabled()) {
            SimpMessageHeaderAccessor accessor = (SimpMessageHeaderAccessor)MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
            accessor = accessor != null ? accessor : SimpMessageHeaderAccessor.wrap(message);
            this.logger.debug((Object)("Processing " + accessor.getShortLogMessage(message.getPayload())));
        }
    }

    public GraphQLBrokerMessageHandler setTaskScheduler(TaskScheduler taskScheduler) {
        Assert.notNull((Object)taskScheduler, (String)"TaskScheduler must not be null");
        this.taskScheduler = taskScheduler;
        if (this.heartbeatValue == null) {
            this.heartbeatValue = new long[]{5000L, 5000L};
        }
        return this;
    }

    public TaskScheduler getTaskScheduler() {
        return this.taskScheduler;
    }

    public GraphQLBrokerMessageHandler setHeartbeatValue(long[] heartbeat) {
        if (heartbeat == null || heartbeat.length != 2 || heartbeat[0] < 0L || heartbeat[1] < 0L) {
            throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat));
        }
        this.heartbeatValue = heartbeat;
        return this;
    }

    public long[] getHeartbeatValue() {
        return this.heartbeatValue;
    }

    public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
        this.headerInitializer = headerInitializer;
    }

    public MessageHeaderInitializer getHeaderInitializer() {
        return this.headerInitializer;
    }

    private void initHeaders(SimpMessageHeaderAccessor accessor) {
        if (this.getHeaderInitializer() != null) {
            this.getHeaderInitializer().initHeaders((MessageHeaderAccessor)accessor);
        }
    }

    private void handleDisconnect(String sessionId, Principal user, Message<?> origMessage) {
        this.sessions.remove(sessionId);
        SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create((SimpMessageType)SimpMessageType.MESSAGE);
        accessor.setSessionId(sessionId);
        accessor.setUser(user);
        if (origMessage != null) {
            accessor.setHeader("simpDisconnectMessage", origMessage);
        }
        this.initHeaders(accessor);
        GraphQLMessage disconnect = new GraphQLMessage(null, GraphQLMessageType.CONNECTION_ERROR);
        Message message = MessageBuilder.createMessage((Object)disconnect, (MessageHeaders)accessor.getMessageHeaders());
        this.getClientOutboundChannel().send(message);
    }

    private class HeartbeatTask
    implements Runnable {
        private HeartbeatTask() {
        }

        @Override
        public void run() {
            long now = System.currentTimeMillis();
            for (SessionInfo info : GraphQLBrokerMessageHandler.this.sessions.values()) {
                if (info.getReadInterval() > 0L && now - info.getLastReadTime() > info.getReadInterval()) {
                    GraphQLBrokerMessageHandler.this.handleDisconnect(info.getSessiondId(), info.getUser(), null);
                }
                if (info.getWriteInterval() <= 0L || now - info.getLastWriteTime() <= info.getWriteInterval()) continue;
                SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create((SimpMessageType)SimpMessageType.HEARTBEAT);
                accessor.setSessionId(info.getSessiondId());
                accessor.setUser(info.getUser());
                GraphQLBrokerMessageHandler.this.initHeaders(accessor);
                MessageHeaders headers = accessor.getMessageHeaders();
                GraphQLMessage heartbeat = new GraphQLMessage(null, GraphQLMessageType.KA);
                GraphQLBrokerMessageHandler.this.getClientOutboundChannel().send(MessageBuilder.createMessage((Object)heartbeat, (MessageHeaders)headers));
            }
        }
    }

    protected static class SessionInfo {
        private static final long HEARTBEAT_MULTIPLIER = 3L;
        private final String sessiondId;
        private final Principal user;
        private final long readInterval;
        private final long writeInterval;
        private volatile long lastReadTime;
        private volatile long lastWriteTime;

        public SessionInfo(String sessiondId, Principal user, long[] clientHeartbeat, long[] serverHeartbeat) {
            this.sessiondId = sessiondId;
            this.user = user;
            if (clientHeartbeat != null && serverHeartbeat != null) {
                this.readInterval = clientHeartbeat[0] > 0L && serverHeartbeat[1] > 0L ? Math.max(clientHeartbeat[0], serverHeartbeat[1]) * 3L : 0L;
                this.writeInterval = clientHeartbeat[1] > 0L && serverHeartbeat[0] > 0L ? Math.max(clientHeartbeat[1], serverHeartbeat[0]) : 0L;
            } else {
                this.readInterval = 0L;
                this.writeInterval = 0L;
            }
            this.lastReadTime = this.lastWriteTime = System.currentTimeMillis();
        }

        public String getSessiondId() {
            return this.sessiondId;
        }

        public Principal getUser() {
            return this.user;
        }

        public long getReadInterval() {
            return this.readInterval;
        }

        public long getWriteInterval() {
            return this.writeInterval;
        }

        public long getLastReadTime() {
            return this.lastReadTime;
        }

        public void setLastReadTime(long lastReadTime) {
            this.lastReadTime = lastReadTime;
        }

        public long getLastWriteTime() {
            return this.lastWriteTime;
        }

        public void setLastWriteTime(long lastWriteTime) {
            this.lastWriteTime = lastWriteTime;
        }
    }
}

