package io.gravitee.connector.kafka;

import io.gravitee.common.http.HttpMethod;
import io.gravitee.common.utils.UUID;
import io.gravitee.connector.api.AbstractConnector;
import io.gravitee.connector.api.Connection;
import io.gravitee.connector.api.response.StatusResponse;
import io.gravitee.connector.kafka.configuration.ClientDnsLookup;
import io.gravitee.connector.kafka.direct.DirectResponseConnection;
import io.gravitee.connector.kafka.endpoint.KafkaEndpoint;
import io.gravitee.connector.kafka.ws.PartitionBasedWebsocketConnection;
import io.gravitee.connector.kafka.ws.TopicBasedWebsocketConnection;
import io.gravitee.connector.kafka.ws.WebsocketConnection;
import io.gravitee.gateway.api.ExecutionContext;
import io.gravitee.gateway.api.handler.Handler;
import io.gravitee.gateway.api.proxy.ProxyRequest;
import io.gravitee.gateway.api.proxy.ws.WebSocketProxyRequest;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/connector/kafka/KafkaConnector.class */
public class KafkaConnector extends AbstractConnector<Connection, ProxyRequest> {
    static final String KAFKA_CONTEXT_ATTRIBUTE = "gravitee.attribute.kafka.";
    static final String CONTEXT_ATTRIBUTE_KAFKA_OFFSET = "gravitee.attribute.kafka.offset";
    static final String CONTEXT_ATTRIBUTE_KAFKA_PARTITION = "gravitee.attribute.kafka.partition";
    static final String CONTEXT_ATTRIBUTE_KAFKA_TOPIC = "gravitee.attribute.kafka.topic";
    static final String CONTEXT_ATTRIBUTE_KAFKA_TIMEOUT = "gravitee.attribute.kafka.timeout";
    static final String CONTEXT_ATTRIBUTE_KAFKA_CLIENT_ID = "gravitee.attribute.kafka.client.id";
    static final String CONTEXT_ATTRIBUTE_KAFKA_GROUP_ID = "gravitee.attribute.kafka.group.id";
    static final String KAFKA_TOPIC_HEADER = "x-gravitee-kafka-topic";
    private static final String KAFKA_TOPIC_QUERY_PARAMETER = "topic";
    static final String KAFKA_PARTITION_HEADER = "x-gravitee-kafka-partition";
    private static final String KAFKA_PARTITION_QUERY_PARAMETER = "partition";
    static final String KAFKA_OFFSET_HEADER = "x-gravitee-kafka-offset";
    private static final String KAFKA_OFFSET_QUERY_PARAMETER = "offset";
    static final String KAFKA_GROUP_HEADER = "x-gravitee-kafka-groupid";
    private static final String KAFKA_GROUP_QUERY_PARAMETER = "groupid";
    static final String KAFKA_TIMEOUT_HEADER = "x-gravitee-kafka-timeout";
    private static final String KAFKA_TIMEOUT_QUERY_PARAMETER = "timeout";
    private final KafkaEndpoint endpoint;
    private final Logger LOGGER = LoggerFactory.getLogger(KafkaConnector.class);
    private final Map<String, WebSocketProxyRequest> wsRequests = new ConcurrentHashMap();

    public KafkaConnector(KafkaEndpoint kafkaEndpoint) {
        this.endpoint = kafkaEndpoint;
    }

    public void request(ExecutionContext executionContext, ProxyRequest proxyRequest, Handler<Connection> handler) {
        String extractTopic = extractTopic(executionContext, proxyRequest);
        proxyRequest.metrics().setEndpoint(extractTopic);
        if (isWebSocket(proxyRequest)) {
            handleWebsocketRequest(executionContext, proxyRequest, extractTopic, handler);
        } else {
            handleRequest(executionContext, proxyRequest, extractTopic, handler);
        }
    }

    private void handleWebsocketRequest(ExecutionContext executionContext, ProxyRequest proxyRequest, String str, Handler<Connection> handler) {
        WebSocketProxyRequest webSocketProxyRequest = (WebSocketProxyRequest) proxyRequest;
        int readIntValue = readIntValue(extractPartition(executionContext, proxyRequest));
        long readLongValue = readLongValue(extractOffset(executionContext, proxyRequest));
        KafkaConsumer<String, String> createConsumer = createConsumer(executionContext);
        String id = executionContext.request().id() != null ? executionContext.request().id() : UUID.random().toString();
        this.LOGGER.debug("Keeping reference to websocket request [{}]", id);
        this.wsRequests.put(id, webSocketProxyRequest);
        webSocketProxyRequest.upgrade().whenComplete((webSocketProxyRequest2, th) -> {
            if (th != null) {
                webSocketProxyRequest.close();
                createConsumer.close();
                this.wsRequests.remove(id);
            } else {
                WebsocketConnection topicBasedWebsocketConnection = readIntValue == -1 ? new TopicBasedWebsocketConnection(createConsumer, webSocketProxyRequest, str) : new PartitionBasedWebsocketConnection(createConsumer, webSocketProxyRequest, str, readIntValue, readLongValue);
                webSocketProxyRequest.closeHandler(r6 -> {
                    this.wsRequests.remove(id);
                    createConsumer.unsubscribe().onComplete(asyncResult -> {
                        createConsumer.close();
                    });
                });
                handler.handle(topicBasedWebsocketConnection);
                topicBasedWebsocketConnection.listen().onFailure(th -> {
                    this.LOGGER.error("Unexpected error while listening for a given topic for websocket", th.getCause());
                    createConsumer.close();
                });
            }
        });
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.wsRequests.forEach((str, webSocketProxyRequest) -> {
            webSocketProxyRequest.close();
        });
    }

    private void handleRequest(ExecutionContext executionContext, ProxyRequest proxyRequest, String str, Handler<Connection> handler) {
        int readIntValue = readIntValue(extractPartition(executionContext, proxyRequest));
        int readIntValue2 = readIntValue(extractOffset(executionContext, proxyRequest));
        int readIntValue3 = readIntValue(extractTimeout(executionContext, proxyRequest));
        if (proxyRequest.method() == HttpMethod.POST || proxyRequest.method() == HttpMethod.PUT) {
            handler.handle(new InsertDataConnection(executionContext, createProducer(executionContext), str, readIntValue, proxyRequest));
        } else {
            if (proxyRequest.method() == HttpMethod.GET) {
                handler.handle(new ReadDataConnection(createConsumer(executionContext), str, readIntValue, readIntValue2, readIntValue3));
                return;
            }
            DirectResponseConnection directResponseConnection = new DirectResponseConnection();
            handler.handle(directResponseConnection);
            directResponseConnection.sendResponse(new StatusResponse(400));
        }
    }

    private static int readIntValue(String str) {
        try {
            return Integer.parseInt(str);
        } catch (Exception e) {
            return -1;
        }
    }

    private static long readLongValue(String str) {
        try {
            return Long.parseLong(str);
        } catch (Exception e) {
            return -1L;
        }
    }

    private KafkaProducer<String, String> createProducer(ExecutionContext executionContext) {
        Map<String, String> kafkaConfig = this.endpoint.getProducerConfig().getKafkaConfig();
        kafkaConfig.put("bootstrap.servers", this.endpoint.target());
        kafkaConfig.put("client.dns.lookup", ClientDnsLookup.getOrDefault(this.endpoint.getCommonConfig().getClientDnsLookup()).toString());
        overrideWithContextAttributes(kafkaConfig, executionContext);
        kafkaConfig.put("client.id", getClientId(executionContext));
        kafkaConfig.entrySet().removeIf(entry -> {
            return entry.getValue() == null || ((String) entry.getValue()).isEmpty();
        });
        return (KafkaProducer) create(map -> {
            return KafkaProducer.create(Vertx.currentContext().owner(), map);
        }, kafkaConfig);
    }

    private void overrideWithContextAttributes(Map<String, String> map, ExecutionContext executionContext) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            Object attribute = executionContext.getAttribute("gravitee.attribute.kafka." + entry.getKey());
            if (attribute != null) {
                entry.setValue((String) attribute);
            }
        }
    }

    private KafkaConsumer<String, String> createConsumer(ExecutionContext executionContext) {
        Map<String, String> kafkaConfig = this.endpoint.getConsumerConfig().getKafkaConfig();
        kafkaConfig.put("bootstrap.servers", this.endpoint.target());
        kafkaConfig.put("client.dns.lookup", ClientDnsLookup.getOrDefault(this.endpoint.getCommonConfig().getClientDnsLookup()).toString());
        overrideWithContextAttributes(kafkaConfig, executionContext);
        kafkaConfig.put("group.id", getGroupId(executionContext));
        kafkaConfig.put("client.id", getClientId(executionContext));
        kafkaConfig.entrySet().removeIf(entry -> {
            return entry.getValue() == null || ((String) entry.getValue()).isEmpty();
        });
        return (KafkaConsumer) create(map -> {
            return KafkaConsumer.create(Vertx.currentContext().owner(), map);
        }, kafkaConfig);
    }

    private <T> T create(Function<Map<String, String>, T> function, Map<String, String> map) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(null);
            T apply = function.apply(map);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return apply;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private boolean isWebSocket(ProxyRequest proxyRequest) {
        return proxyRequest.method() == HttpMethod.GET && HttpHeaderValues.UPGRADE.contentEqualsIgnoreCase(proxyRequest.headers().get("Connection")) && HttpHeaderValues.WEBSOCKET.contentEqualsIgnoreCase(proxyRequest.headers().get("Upgrade"));
    }

    private String getClientId(ExecutionContext executionContext) {
        String str = (String) executionContext.getAttribute(CONTEXT_ATTRIBUTE_KAFKA_CLIENT_ID);
        if (str == null) {
            str = UUID.random().toString();
        }
        return str;
    }

    private String getGroupId(ExecutionContext executionContext) {
        String str = (String) executionContext.getAttribute(CONTEXT_ATTRIBUTE_KAFKA_GROUP_ID);
        if (str == null || str.isEmpty()) {
            str = executionContext.request().headers().get(KAFKA_GROUP_HEADER);
            if (str == null || str.isEmpty()) {
                str = (String) executionContext.request().parameters().getFirst(KAFKA_GROUP_QUERY_PARAMETER);
            }
            if (str == null || str.isEmpty()) {
                str = UUID.random().toString();
            }
        }
        return str;
    }

    private String extractTopic(ExecutionContext executionContext, ProxyRequest proxyRequest) {
        String str = (String) executionContext.getAttribute(CONTEXT_ATTRIBUTE_KAFKA_TOPIC);
        if (str == null || str.isEmpty()) {
            str = proxyRequest.headers().get(KAFKA_TOPIC_HEADER);
            if (str == null || str.isEmpty()) {
                str = (String) proxyRequest.parameters().getFirst(KAFKA_TOPIC_QUERY_PARAMETER);
                if (str == null || str.isEmpty()) {
                    int lastIndexOf = proxyRequest.uri().lastIndexOf(47);
                    if (lastIndexOf != proxyRequest.uri().length()) {
                        return proxyRequest.uri().substring(lastIndexOf + 1);
                    }
                    String substring = proxyRequest.uri().substring(0, lastIndexOf - 1);
                    return substring.substring(substring.lastIndexOf(47) + 1);
                }
            }
        }
        return str;
    }

    private String extractPartition(ExecutionContext executionContext, ProxyRequest proxyRequest) {
        String str = (String) executionContext.getAttribute(CONTEXT_ATTRIBUTE_KAFKA_PARTITION);
        if (str == null || str.isEmpty()) {
            str = proxyRequest.headers().get(KAFKA_PARTITION_HEADER);
            if (str == null || str.isEmpty()) {
                str = (String) proxyRequest.parameters().getFirst(KAFKA_PARTITION_QUERY_PARAMETER);
            }
        }
        return str;
    }

    private String extractOffset(ExecutionContext executionContext, ProxyRequest proxyRequest) {
        String str = (String) executionContext.getAttribute(CONTEXT_ATTRIBUTE_KAFKA_OFFSET);
        if (str == null || str.isEmpty()) {
            str = proxyRequest.headers().get(KAFKA_OFFSET_HEADER);
            if (str == null || str.isEmpty()) {
                str = (String) proxyRequest.parameters().getFirst(KAFKA_OFFSET_QUERY_PARAMETER);
            }
        }
        return str;
    }

    private String extractTimeout(ExecutionContext executionContext, ProxyRequest proxyRequest) {
        String str = (String) executionContext.getAttribute(CONTEXT_ATTRIBUTE_KAFKA_TIMEOUT);
        if (str == null || str.isEmpty()) {
            str = proxyRequest.headers().get(KAFKA_TIMEOUT_HEADER);
            if (str == null || str.isEmpty()) {
                str = (String) proxyRequest.parameters().getFirst(KAFKA_TIMEOUT_QUERY_PARAMETER);
            }
        }
        return str;
    }
}
