В Кафке как получить точное смещение по времени производства
Мне нужно, чтобы послание Кафки воспроизводилось час за часом в течение дня. Каждый час я буду запускать задание, чтобы использовать сообщение, созданное 1 час назад. например, если текущее время 20: 12, я буду использовать сообщение между 19: 00: 00 и 19:59: 59. Это означает, что мне нужно получить смещение начала по времени 19: 00: 00 и смещение конца по времени 19:59:59. Я использовал SimpleConsumer.getOffsetsBefore, как показано в " 0.8.0 Simpleconsumer Example ". Проблема в том, что возвращаемое смещение не совпадает с заданной меткой времени в качестве параметра. например, когда сделать отметку времени 19: 00: 00, я получаю сообщение, произведенное в 16: 38: 00.
5 ответов:
В Кафке в настоящее время нет способа получить смещение, соответствующее определенной временной метке - это сделано специально. Как описано в верхней части статьи журналаДжея Крепса , номер смещения обеспечивает своего рода метку времени для журнала, который отделен от времени настенных часов. С помощью смещения как вашего понятия времени, то вы можете знать, если любые две системы находятся в согласованном состоянии просто купить зная, какое смещение они прочитали до. Никогда не бывает путаницы с разными часами время на разных серверах, високосные годы, дневное время экономии света, часовые пояса и т. д. Это довольно мило...
Сейчас... тем не менее, если вы знаете, что ваш сервер вышел из строя в какой-то момент X, то практически говоря, вы действительно хотели бы знать соответствующее смещение. Ты можешь подойти ближе. Файлы журналов на машинах kafka называются в соответствии со временем, когда они начали писать, и существует инструмент kafka (который я не могу найти прямо сейчас), который позволяет вам узнать, какие смещения связаны с этими файлы. Если вы хотите знать точную метку времени, то вы должны кодировать метку времени в сообщениях, которые вы отправляете Кафке.
Ниже kafka consumer api метод
getOffsetsByTimes()может быть использован для этого , он доступен от версии 0.10.0 или выше. Смотрите JavaDoc ./** * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. * * This is a blocking call. The consumer does not have to be assigned the partitions. * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null * will be returned for that partition. * * Notice that this method may block indefinitely if the partition does not exist. * * @param timestampsToSearch the mapping from partition to the timestamp to look up. * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. * @throws IllegalArgumentException if the target timestamp is negative. */ @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { // we explicitly exclude the earliest and latest offset here so the timestamp in the returned // OffsetAndTimestamp is always positive. if (entry.getValue() < 0) throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative."); } return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs); }
Как отмечают другие ответы, более старые версии Кафки имели только приблизительный способ отображения времен на смещения. Однако, начиная с Kafka 0.10.0 (выпущен в мае 2016 года), Kafka поддерживает индекс времени для каждой темы. Это позволит вам эффективно переходить от таймов к точным смещениям. Для доступа к этой информации можно использовать метод KafkaConsumer#offsetsForTimes.
Есть более подробная информация о том, как индекс времени реализуется на обсуждении дизайна KIP-33 страница.
Kafka 1.10 поддерживает временные метки, хотя все равно будет немного сложно использовать его для того, чтобы сделать то, что вы хотите сделать. Но если вы знаете, но с какой временной отметки вы хотите читать, и до тех пор, пока вы не захотите читать, то вы можете просто опрашивать сообщения до этого времени и прекратить потребление.
Показать вам код:
public static Map<TopicPartition, OffsetAndTimestamp> getOffsetAndTimestampAtTime(String kafkaServer, String topic, long time) { Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServers); kafkaParams.put(GROUP_ID_CONFIG, "consumerGroupId"); kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(AUTO_OFFSET_RESET_CONFIG, "latest"); kafkaParams.put(ENABLE_AUTO_COMMIT_CONFIG, false); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaParams); List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); List<TopicPartition> topicPartitions = partitionInfos .stream() .map(pi -> new TopicPartition(pi.topic(), pi.partition())) .collect(Collectors.toList()); Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream() .collect(Collectors.toMap(tp -> tp, tp -> time)); Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(topicPartitionToTimestampMap); consumer.close(); return result; }
Comments