package io.gravitee.connector.kafka;

import io.gravitee.common.utils.UUID;
import io.gravitee.connector.api.AbstractConnection;
import io.gravitee.connector.api.response.StatusResponse;
import io.gravitee.gateway.api.ExecutionContext;
import io.gravitee.gateway.api.buffer.Buffer;
import io.gravitee.gateway.api.proxy.ProxyRequest;
import io.gravitee.gateway.api.stream.WriteStream;
import io.vertx.core.Handler;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;

/* loaded from: input_file:io/gravitee/connector/kafka/InsertDataConnection.class */
public class InsertDataConnection extends AbstractConnection {
    public static final String CONTEXT_ATTRIBUTE_KAFKA_RECORD_KEY = "gravitee.attribute.kafka.key";
    private final ExecutionContext context;
    private final Buffer buffer = Buffer.buffer();
    private final KafkaProducer<String, String> producer;
    private final String topic;
    private final int partition;
    private final ProxyRequest request;

    public InsertDataConnection(ExecutionContext executionContext, KafkaProducer<String, String> kafkaProducer, String str, int i, ProxyRequest proxyRequest) {
        this.context = executionContext;
        this.producer = kafkaProducer;
        this.topic = str;
        this.partition = i;
        this.request = proxyRequest;
    }

    public WriteStream<Buffer> write(Buffer buffer) {
        this.buffer.appendBuffer(buffer);
        return this;
    }

    public void end() {
        KafkaProducerRecord<String, String> create = KafkaProducerRecord.create(this.topic, getKey(this.context), this.buffer.toString(), this.partition != -1 ? Integer.valueOf(this.partition) : null);
        setHeaders(create);
        this.producer.send(create).onFailure(new Handler<Throwable>() { // from class: io.gravitee.connector.kafka.InsertDataConnection.2
            public void handle(Throwable th) {
                InsertDataConnection.this.responseHandler.handle(new StatusResponse(500));
                InsertDataConnection.this.producer.close();
            }
        }).onSuccess(new Handler<RecordMetadata>() { // from class: io.gravitee.connector.kafka.InsertDataConnection.1
            public void handle(RecordMetadata recordMetadata) {
                StatusResponse statusResponse = new StatusResponse(201);
                statusResponse.headers().set("x-gravitee-kafka-topic", recordMetadata.getTopic());
                statusResponse.headers().set("x-gravitee-kafka-partition", Integer.toString(recordMetadata.getPartition()));
                statusResponse.headers().set("x-gravitee-kafka-offset", Long.toString(recordMetadata.getOffset()));
                InsertDataConnection.this.responseHandler.handle(statusResponse);
                InsertDataConnection.this.producer.close();
            }
        });
    }

    private void setHeaders(KafkaProducerRecord<String, String> kafkaProducerRecord) {
        this.request.headers().forEach(entry -> {
            kafkaProducerRecord.addHeader((String) entry.getKey(), (String) entry.getValue());
        });
    }

    private String getKey(ExecutionContext executionContext) {
        String str = (String) executionContext.getAttribute(CONTEXT_ATTRIBUTE_KAFKA_RECORD_KEY);
        if (str == null) {
            str = UUID.random().toString();
            executionContext.setAttribute(CONTEXT_ATTRIBUTE_KAFKA_RECORD_KEY, str);
        }
        return str;
    }
}
