kafka consumer.commitSync

在做kafka消费的时候,consumer的配置一直是周期性自动提交消费成功。
但问题来了,如果poll到的数据没有被成功消费,那么逻辑上应该被标记为未被消费,等待队列的第二次尝试拉取消费(虽然这样也不对,但是就这样硬着头皮说下去先)。那么这里应该在每次数据集合records被poll拉取之后,在对每个record进行业务处理完成成功之后,对broker进行一次提交(提交下一次poll出去数据的offset位置),这里提交我就用到

public void commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)

具体实现:

for (TopicPartition partition : records.partitions()) {
         List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
         // 数据处理
         for (ConsumerRecord<String, String> record : partitionRecords)
         {
               System.out.println(record.offset() + ": " + record.value());
         }
         // 取得当前读取到的最后一条记录的offset
         long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
         // 同步提交offset,记得要 + 1
         consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
添加新评论