/*
 * Decompiled with CFR 0.152.
 */
package org.activiti.cloud.services.notifications.graphql.ws.transport;

import com.codahale.metrics.annotation.Gauge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.IOException;
import java.security.Principal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.activiti.cloud.services.notifications.graphql.ws.api.GraphQLMessage;
import org.activiti.cloud.services.notifications.graphql.ws.api.GraphQLMessageType;
import org.activiti.cloud.services.notifications.graphql.ws.transport.GraphQLSessionConnectEvent;
import org.activiti.cloud.services.notifications.graphql.ws.transport.GraphQLSessionDisconnectEvent;
import org.activiti.cloud.services.notifications.graphql.ws.transport.GraphQLSessionSubscribeEvent;
import org.activiti.cloud.services.notifications.graphql.ws.transport.GraphQLSessionUnsubscribeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.SimpAttributesContextHolder;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.SessionLimitExceededException;
import org.springframework.web.socket.messaging.SubProtocolHandler;

public class GraphQLBrokerSubProtocolHandler
implements SubProtocolHandler,
ApplicationEventPublisherAware {
    private static final int DEFAULT_KA_INTERVAL = 5000;
    private static final String KA_INTERVAL_HEADER = "kaInterval";
    private static final String X_AUTHORIZATION = "X-Authorization";
    private static final String GRAPHQL_MESSAGE_TYPE = "graphQLMessageType";
    public static final String GRAPHQL_WS = "graphql-ws";
    public static final int MINIMUM_WEBSOCKET_MESSAGE_SIZE = 16640;
    private static final Logger logger = LoggerFactory.getLogger(GraphQLBrokerSubProtocolHandler.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Map<String, Principal> graphqlAuthentications = new ConcurrentHashMap<String, Principal>();
    private final Stats stats = new Stats();
    private ApplicationEventPublisher eventPublisher;
    private final String destination;
    private ScheduledFuture<?> loggingTask;
    private ScheduledExecutorService taskScheduler = Executors.newSingleThreadScheduledExecutor();
    private long loggingPeriod = 300000L;

    public GraphQLBrokerSubProtocolHandler(String destination) {
        this.destination = destination;
        this.setLoggingPeriod(this.loggingPeriod);
    }

    public List<String> getSupportedProtocols() {
        return Collections.singletonList(GRAPHQL_WS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel) throws Exception {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage)message;
            GraphQLMessage sourceMessage = (GraphQLMessage)this.objectMapper.reader().forType(GraphQLMessage.class).readValue((String)textMessage.getPayload());
            try {
                boolean isConnect;
                SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create((SimpMessageType)SimpMessageType.MESSAGE);
                headerAccessor.setDestination(this.destination);
                headerAccessor.setSessionId(session.getId());
                headerAccessor.setSessionAttributes(session.getAttributes());
                headerAccessor.setUser(this.getUser(session));
                headerAccessor.setLeaveMutable(true);
                Message decodedMessage = MessageBuilder.createMessage((Object)sourceMessage, (MessageHeaders)headerAccessor.getMessageHeaders());
                headerAccessor.setHeader(GRAPHQL_MESSAGE_TYPE, (Object)sourceMessage.getType().toString());
                if (logger.isTraceEnabled()) {
                    logger.trace("From client: " + headerAccessor.getShortLogMessage(message.getPayload()));
                }
                if (isConnect = GraphQLMessageType.CONNECTION_INIT.equals((Object)sourceMessage.getType())) {
                    this.stats.incrementConnectCount();
                    Optional.ofNullable(sourceMessage.getPayload()).ifPresent(map -> map.entrySet().forEach(e -> headerAccessor.setHeader((String)e.getKey(), e.getValue())));
                    Integer kaInterval = Optional.ofNullable(headerAccessor.getHeader(KA_INTERVAL_HEADER)).map(v -> Integer.parseInt(v.toString())).orElse(5000);
                    headerAccessor.setHeader("simpHeartbeat", (Object)new long[]{0L, kaInterval.intValue()});
                } else if (GraphQLMessageType.CONNECTION_TERMINATE.equals((Object)sourceMessage.getType())) {
                    this.stats.incrementDisconnectCount();
                } else if (GraphQLMessageType.START.equals((Object)sourceMessage.getType())) {
                    this.stats.incrementStartCount();
                } else if (GraphQLMessageType.STOP.equals((Object)sourceMessage.getType())) {
                    this.stats.incrementStopCount();
                }
                try {
                    SimpAttributesContextHolder.setAttributesFromMessage((Message)decodedMessage);
                    boolean sent = outputChannel.send(decodedMessage);
                    if (sent) {
                        Principal user;
                        if (isConnect && (user = headerAccessor.getUser()) != null && user != session.getPrincipal()) {
                            this.graphqlAuthentications.put(session.getId(), user);
                        }
                        if (this.eventPublisher != null) {
                            if (isConnect) {
                                this.publishEvent(new GraphQLSessionConnectEvent(this, (Message<GraphQLMessage>)decodedMessage, this.getUser(session)));
                            } else if (GraphQLMessageType.START.equals((Object)sourceMessage.getType())) {
                                this.publishEvent(new GraphQLSessionSubscribeEvent(this, (Message<GraphQLMessage>)decodedMessage, this.getUser(session)));
                            } else if (GraphQLMessageType.STOP.equals((Object)sourceMessage.getType())) {
                                this.publishEvent(new GraphQLSessionUnsubscribeEvent(this, (Message<GraphQLMessage>)decodedMessage, this.getUser(session)));
                            }
                        }
                    }
                }
                finally {
                    SimpAttributesContextHolder.resetAttributes();
                }
            }
            catch (Throwable ex) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Failed to send client message to application via MessageChannel in session " + session.getId() + ". Sending CONNECTION_ERROR to client. The client should reestablish a new connection. Cause: {}:{}", (Object)ex.getMessage(), (Object)ex.getCause().getMessage());
                    logger.debug("Exception stacktrace: ", ex);
                }
                this.sendErrorMessage(session, ex, sourceMessage);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleMessageToClient(WebSocketSession session, Message<?> message) {
        if (!(message.getPayload() instanceof GraphQLMessage)) {
            logger.error("Expected OperationMessage. Ignoring " + String.valueOf(message) + ".");
            return;
        }
        boolean closeWebSocketSession = false;
        try {
            GraphQLMessage operation = (GraphQLMessage)message.getPayload();
            if (GraphQLMessageType.CONNECTION_ACK.equals((Object)operation.getType())) {
                this.stats.incrementConnectedCount();
            }
            byte[] bytes = this.objectMapper.writer().writeValueAsBytes(message.getPayload());
            session.sendMessage((WebSocketMessage)new TextMessage(bytes));
        }
        catch (SessionLimitExceededException ex) {
            throw ex;
        }
        catch (Throwable ex) {
            logger.debug("Failed to send WebSocket message to client in session " + session.getId() + ".", ex);
            closeWebSocketSession = true;
        }
        finally {
            if (closeWebSocketSession) {
                try {
                    session.close(CloseStatus.PROTOCOL_ERROR);
                }
                catch (IOException iOException) {}
            }
        }
    }

    public String resolveSessionId(Message<?> message) {
        return SimpMessageHeaderAccessor.getSessionId((Map)message.getHeaders());
    }

    public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) throws Exception {
        if (session.getTextMessageSizeLimit() < 16640) {
            session.setTextMessageSizeLimit(16640);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) throws Exception {
        this.stats.incrementDisconnectCount();
        Message<GraphQLMessage> message = this.createDisconnectMessage(session);
        try {
            SimpAttributesContextHolder.setAttributesFromMessage(message);
            if (this.eventPublisher != null) {
                Principal user = this.getUser(session);
                this.publishEvent(new GraphQLSessionDisconnectEvent(this, message, session.getId(), closeStatus, user));
            }
            outputChannel.send(message);
        }
        catch (Exception e) {
            if (logger.isDebugEnabled()) {
                logger.debug("Failed to send WebSocket message to client after session {}. The client might have closed the connection. Cause: {}:{}", new Object[]{session.getId(), e.getMessage(), e.getCause().getMessage()});
                logger.debug("Exception stacktrace: ", (Throwable)e);
            }
        }
        finally {
            this.graphqlAuthentications.remove(session.getId());
            SimpAttributesContextHolder.resetAttributes();
        }
    }

    private Message<GraphQLMessage> createDisconnectMessage(WebSocketSession session) {
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create((SimpMessageType)SimpMessageType.MESSAGE);
        headerAccessor.setDestination(this.destination);
        headerAccessor.setSessionId(session.getId());
        headerAccessor.setSessionAttributes(session.getAttributes());
        headerAccessor.setUser(this.getUser(session));
        headerAccessor.setLeaveMutable(false);
        GraphQLMessage operation = new GraphQLMessage(null, GraphQLMessageType.CONNECTION_TERMINATE);
        return MessageBuilder.createMessage((Object)operation, (MessageHeaders)headerAccessor.getMessageHeaders());
    }

    protected void sendErrorMessage(WebSocketSession session, Throwable error, GraphQLMessage message) {
        this.stats.incrementErrorCount();
        GraphQLMessage response = new GraphQLMessage(message.getId(), GraphQLMessageType.CONNECTION_ERROR);
        ObjectWriter writer = this.objectMapper.writer();
        try {
            byte[] bytes = writer.writeValueAsBytes((Object)response);
            session.sendMessage((WebSocketMessage)new TextMessage(bytes));
        }
        catch (Throwable ex) {
            logger.debug("Failed to send ERROR to client", ex);
        }
    }

    private Principal getUser(WebSocketSession session) {
        Principal user = this.graphqlAuthentications.get(session.getId());
        return user != null ? user : session.getPrincipal();
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }

    private void publishEvent(ApplicationEvent event) {
        block2: {
            try {
                this.eventPublisher.publishEvent(event);
            }
            catch (Throwable ex) {
                if (!logger.isErrorEnabled()) break block2;
                logger.error("Error publishing " + String.valueOf(event), ex);
            }
        }
    }

    @Nullable
    private ScheduledFuture<?> initLoggingTask(long initialDelay) {
        if (this.taskScheduler != null && this.loggingPeriod > 0L && logger.isInfoEnabled()) {
            return this.taskScheduler.scheduleAtFixedRate(() -> logger.info("graphql-ws[" + this.stats.toString() + "]"), initialDelay, this.loggingPeriod, TimeUnit.MILLISECONDS);
        }
        return null;
    }

    public void setLoggingPeriod(long period) {
        if (this.loggingTask != null) {
            this.loggingTask.cancel(true);
        }
        this.loggingPeriod = period;
        this.loggingTask = this.initLoggingTask(0L);
    }

    public long getLoggingPeriod() {
        return this.loggingPeriod;
    }

    private static class Stats {
        private final AtomicInteger connect = new AtomicInteger();
        private final AtomicInteger connected = new AtomicInteger();
        private final AtomicInteger disconnect = new AtomicInteger();
        private final AtomicInteger start = new AtomicInteger();
        private final AtomicInteger stop = new AtomicInteger();
        private final AtomicInteger error = new AtomicInteger();

        private Stats() {
        }

        @Gauge
        public Integer connectCount() {
            return this.connect.get();
        }

        @Gauge
        public Integer connectedCount() {
            return this.connected.get();
        }

        @Gauge
        public Integer disconnectCount() {
            return this.disconnect.get();
        }

        @Gauge
        public Integer startCount() {
            return this.start.get();
        }

        @Gauge
        public Integer stopCount() {
            return this.stop.get();
        }

        @Gauge
        public Integer errorCount() {
            return this.error.get();
        }

        public void incrementConnectCount() {
            this.connect.incrementAndGet();
        }

        public void incrementConnectedCount() {
            this.connected.incrementAndGet();
        }

        public void incrementDisconnectCount() {
            this.disconnect.incrementAndGet();
        }

        public void incrementStartCount() {
            this.start.incrementAndGet();
        }

        public void incrementStopCount() {
            this.stop.incrementAndGet();
        }

        public void incrementErrorCount() {
            this.error.incrementAndGet();
        }

        public String toString() {
            return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + this.connected.get() + ")-START(" + this.start.get() + ")-STOP(" + this.stop.get() + ")-ERROR(" + this.error.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
        }
    }
}

