package io.gravitee.connector.kafka;

import io.gravitee.connector.api.AbstractConnection;
import io.gravitee.connector.api.response.StatusResponse;
import io.gravitee.connector.kafka.json.JsonRecordFormatter;
import io.gravitee.connector.kafka.response.RecordResponse;
import io.gravitee.gateway.api.buffer.Buffer;
import io.gravitee.gateway.api.stream.WriteStream;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/connector/kafka/ReadDataConnection.class */
public class ReadDataConnection extends AbstractConnection {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReadDataConnection.class);
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private final int partition;
    private final int offset;
    private final int timeout;
    private static final int DEFAULT_POLLING_TIMEOUT = 5000;

    public ReadDataConnection(KafkaConsumer<String, String> kafkaConsumer, String str, int i, int i2, int i3) {
        this.consumer = kafkaConsumer;
        this.topic = str;
        this.partition = i;
        this.offset = i2;
        this.timeout = i3 == -1 ? DEFAULT_POLLING_TIMEOUT : i3;
    }

    public WriteStream<Buffer> write(Buffer buffer) {
        return this;
    }

    public void end() {
        Future subscribe;
        if (this.partition != -1) {
            TopicPartition topicPartition = new TopicPartition(this.topic, this.partition);
            Future assign = this.consumer.assign(topicPartition);
            subscribe = this.offset != -1 ? assign.compose(r7 -> {
                return this.consumer.seek(topicPartition, this.offset);
            }) : assign.compose(r5 -> {
                return this.consumer.seekToBeginning(topicPartition);
            });
        } else {
            subscribe = this.consumer.subscribe(this.topic);
        }
        subscribe.onSuccess(new Handler<Void>() { // from class: io.gravitee.connector.kafka.ReadDataConnection.1
            public void handle(Void r52) {
                ReadDataConnection.this.consumer.poll(Duration.ofMillis(ReadDataConnection.this.timeout)).onSuccess(kafkaConsumerRecords -> {
                    if (kafkaConsumerRecords.isEmpty()) {
                        ReadDataConnection.this.responseHandler.handle(new StatusResponse(404));
                    } else {
                        Buffer buffer = Buffer.buffer(JsonRecordFormatter.toString(kafkaConsumerRecords, true));
                        RecordResponse recordResponse = new RecordResponse(200);
                        recordResponse.headers().set("Content-Length", Integer.toString(buffer.length()));
                        ReadDataConnection.this.responseHandler.handle(recordResponse);
                        recordResponse.bodyHandler().handle(buffer);
                        recordResponse.endHandler().handle((Object) null);
                    }
                    ReadDataConnection.this.consumer.close();
                }).onFailure(th -> {
                    ReadDataConnection.LOGGER.error("Kafka consume unable to poll a given partition", th.getCause());
                    ReadDataConnection.this.responseHandler.handle(new StatusResponse(500));
                    ReadDataConnection.this.consumer.close();
                });
            }
        }).onFailure(th -> {
            LOGGER.error("Kafka consume unable to seek for a given partition", th.getCause());
            this.responseHandler.handle(new StatusResponse(500));
            this.consumer.close();
        });
    }
}
