Extend Kafka Connect SinkTask and start consuming from given offsets

apache-kafka apache-kafka-connect

706 просмотра

1 ответ

28 Репутация автора

I want to extend SinkTask to create my own sink connector.

If I save the offsets during the flush, and the next time I start the sink connector I would like to resume reading from my saved offsets, what would be the correct way to do it?

I tried using the SinkTaskContext of the overridden initialize(SinkTaskContext context) to assign my own offsets:

@Override
public void initialize(SinkTaskContext context) {
  HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
  ...
  context.offset(offsetMap);
}

But this doesn’t work, because the partitions are not assigned yet. I was getting an exception.

Should I then save the context (from initialize()) into a global variable and then use it to assign the offsets to it inside the method open(Collection<TopicPartition> partitions) (overridden from SinkTask) in the same way I was doing inside initialize? e.g.:

@Override
public void open(Collection<TopicPartition> partitions) {
  HashMap<TopicPartition, Long> offsetMapNew = new HashMap<>();
  for (TopicPartition tp : partitions) // for each partition assigned
  {
     Long offset = myOffsetMap.get(tp.topic() + "-" + tp.partition());
     if (offset == null) { offset = 0l; } // 0 Long
     offsetMapNew.put(tp, offset);
  }
  mySavedTaskContext.offset(offsetMapNew); // sync offsets ?
}
Автор: Lucas Martinez Источник Размещён: 17.07.2016 10:35

Ответы (1)


0 плюса

1246 Репутация автора

Решение

Resetting offsets during open() should be the correct approach, but due to a bug that's still unresolved, it will not currently be handled properly.

The workaround for now is to handle resetting offsets in put(). This may be a bit counterintuitive, but since you're managing your own offsets, you can actually ignore data if you like. When you get the first put() call you can handle loading the offsets and resetting them. All subsequent data will be from the offsets you specified when resetting. This is how the HDFS connector currently implements its exactly once delivery. (That is a good example of how you can get exactly once, but relatively complex code, unfortunately.) In fact, since the HDFS connector was what drove the offset management functionality in Kafka Connect, the fact that it doesn't do the reset on rebalance is precisely how this was missed in the implementation.

Автор: Ewen Cheslack-Postava Размещён: 23.07.2016 08:56
Вопросы из категории :
32x32