私はApache Kafkaを使って作業しています: ディレクトリにいくつかのcsvファイルがあります。これはミニバッチファイルで、各ファイルは約25-30MBです。私が必要とするのは、ファイルを解析してカフカに置くだけです。Apache Kafkaコネクタタスクをどのように閉じることができますか?
私が見ることができるように、カフカにはコネクターのような興味深いものがあります。
私はSource-ConnectorとSourceTaskを作成できますが、1つのことを理解できません: ファイルを処理すると、どのようにタスクを停止または削除できますか?例えば
私はダミーのコネクタがあります。
public class DummySourceConnector extends SourceConnector {
private static final Logger logger = LogManager.getLogger();
@Override
public String version() {
logger.info("version");
return "1";
}
@Override
public ConfigDef config() {
logger.info("config");
return null;
}
@Override
public Class<? extends Task> taskClass() {
return DummySourceTask.class;
}
@Override
public void start(Map<String, String> props) {
logger.info("start {}", props);
}
@Override
public void stop() {
logger.info("stop");
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
logger.info("taskConfigs {}", maxTasks);
return ImmutableList.of(ImmutableMap.of("key", "value"));
}
とタスク:
public class DummySourceTask extends SourceTask {
private static final Logger logger = LogManager.getLogger();
private long offset = 0;
@Override
public String version() {
logger.info("version");
return "1";
}
@Override
public void start(Map<String, String> props) {
logger.info("start {}", props);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
Thread.sleep(3000);
final String value = "Offset " + offset++ + " Timestamp " + Instant.now().toString();
logger.info("poll value {}", value);
return ImmutableList.of(new SourceRecord(
ImmutableMap.of("partition", 0),
ImmutableMap.of("offset", offset),
"topic-dummy",
SchemaBuilder.STRING_SCHEMA,
value
));
}
public void stop() {
logger.info("stop");
}
しかし、それはすべて完了だとき、私は自分の仕事を閉じることができますどのように? または、このタスクの別のアイデアを手伝ってもらえますか?
あなたの助けを頼りにしてください!
こんにちは! あなたのご協力ありがとうございます! これはまさに私が望むものではありませんが、spooldirコネクタは面白いです。 いいえ、私が望むときに私の仕事を止めたい、状況を想像させてください。 - 私の仕事はファイルを一行ずつ読み込み、ファイルの最後には私たちは仕事を止めることができません。 'stop'メソッドはコネクタだけで呼び出します(例えば、再調整の場合)。 – aarexer
ああ、いくつかのイベントに基づいてタスク自体からタスクを停止したいと考えています。定義上、タスクの調整はコネクターの仕事であるため、私たちは通常、タスクを自分自身でやり直すことができないようにしたくないからです。代わりに次のファイルにタスクをフィードすることはできますか? – dawsaw
はい、私の知るとおり、タスクの調整はコネクタの仕事です。 新しいファイルを供給していることが良い判断である可能性があります... Thanx、それは良い答えです! – aarexer