В Кафке как получить точное смещение по времени производства



Мне нужно, чтобы послание Кафки воспроизводилось час за часом в течение дня. Каждый час я буду запускать задание, чтобы использовать сообщение, созданное 1 час назад. например, если текущее время 20: 12, я буду использовать сообщение между 19: 00: 00 и 19:59: 59. Это означает, что мне нужно получить смещение начала по времени 19: 00: 00 и смещение конца по времени 19:59:59. Я использовал SimpleConsumer.getOffsetsBefore, как показано в " 0.8.0 Simpleconsumer Example ". Проблема в том, что возвращаемое смещение не совпадает с заданной меткой времени в качестве параметра. например, когда сделать отметку времени 19: 00: 00, я получаю сообщение, произведенное в 16: 38: 00.

974   5  

5 ответов:

В Кафке в настоящее время нет способа получить смещение, соответствующее определенной временной метке - это сделано специально. Как описано в верхней части статьи журналаДжея Крепса , номер смещения обеспечивает своего рода метку времени для журнала, который отделен от времени настенных часов. С помощью смещения как вашего понятия времени, то вы можете знать, если любые две системы находятся в согласованном состоянии просто купить зная, какое смещение они прочитали до. Никогда не бывает путаницы с разными часами время на разных серверах, високосные годы, дневное время экономии света, часовые пояса и т. д. Это довольно мило...

Сейчас... тем не менее, если вы знаете, что ваш сервер вышел из строя в какой-то момент X, то практически говоря, вы действительно хотели бы знать соответствующее смещение. Ты можешь подойти ближе. Файлы журналов на машинах kafka называются в соответствии со временем, когда они начали писать, и существует инструмент kafka (который я не могу найти прямо сейчас), который позволяет вам узнать, какие смещения связаны с этими файлы. Если вы хотите знать точную метку времени, то вы должны кодировать метку времени в сообщениях, которые вы отправляете Кафке.

Ниже kafka consumer api метод getOffsetsByTimes() может быть использован для этого , он доступен от версии 0.10.0 или выше. Смотрите JavaDoc .

/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * Notice that this method may block indefinitely if the partition does not exist.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws IllegalArgumentException if the target timestamp is negative.
 */
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
        // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
        // OffsetAndTimestamp is always positive.
        if (entry.getValue() < 0)
            throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                    entry.getValue() + ". The target time cannot be negative.");
    }
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}

Как отмечают другие ответы, более старые версии Кафки имели только приблизительный способ отображения времен на смещения. Однако, начиная с Kafka 0.10.0 (выпущен в мае 2016 года), Kafka поддерживает индекс времени для каждой темы. Это позволит вам эффективно переходить от таймов к точным смещениям. Для доступа к этой информации можно использовать метод KafkaConsumer#offsetsForTimes.

Есть более подробная информация о том, как индекс времени реализуется на обсуждении дизайна KIP-33 страница.

Kafka 1.10 поддерживает временные метки, хотя все равно будет немного сложно использовать его для того, чтобы сделать то, что вы хотите сделать. Но если вы знаете, но с какой временной отметки вы хотите читать, и до тех пор, пока вы не захотите читать, то вы можете просто опрашивать сообщения до этого времени и прекратить потребление.

Показать вам код:

public static Map<TopicPartition, OffsetAndTimestamp> getOffsetAndTimestampAtTime(String kafkaServer, String topic, long time) {
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    kafkaParams.put(GROUP_ID_CONFIG, "consumerGroupId");
    kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(AUTO_OFFSET_RESET_CONFIG, "latest");
    kafkaParams.put(ENABLE_AUTO_COMMIT_CONFIG, false);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaParams);

    List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

    List<TopicPartition> topicPartitions = partitionInfos
            .stream()
            .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
            .collect(Collectors.toList());

    Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream()
            .collect(Collectors.toMap(tp -> tp, tp -> time));

    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(topicPartitionToTimestampMap);
    consumer.close();
    return result;
}

Comments

    Ничего не найдено.