2016-10-04 3 views
1

私はApache Kafkaを使って作業しています: ディレクトリにいくつかのcsvファイルがあります。これはミニバッチファイルで、各ファイルは約25-30MBです。私が必要とするのは、ファイルを解析してカフカに置くだけです。Apache Kafkaコネクタタスクをどのように閉じることができますか?

私が見ることができるように、カフカにはコネクターのような興味深いものがあります。

私はSource-ConnectorとSourceTaskを作成できますが、1つのことを理解できません: ファイルを処理すると、どのようにタスクを停止または削除できますか?例えば

私はダミーのコネクタがあります。

public class DummySourceConnector extends SourceConnector { 
private static final Logger logger = LogManager.getLogger(); 

@Override 
public String version() { 
    logger.info("version"); 

    return "1"; 
} 

@Override 
public ConfigDef config() { 
    logger.info("config"); 

    return null; 
} 

@Override 
public Class<? extends Task> taskClass() { 
    return DummySourceTask.class; 
} 

@Override 
public void start(Map<String, String> props) { 
    logger.info("start {}", props); 
} 

@Override 
public void stop() { 
    logger.info("stop"); 
} 

@Override 
public List<Map<String, String>> taskConfigs(int maxTasks) { 
    logger.info("taskConfigs {}", maxTasks); 

    return ImmutableList.of(ImmutableMap.of("key", "value")); 
} 

とタスク:

public class DummySourceTask extends SourceTask { 
private static final Logger logger = LogManager.getLogger(); 

private long offset = 0; 

@Override 
public String version() { 
    logger.info("version"); 

    return "1"; 
} 

@Override 
public void start(Map<String, String> props) { 
    logger.info("start {}", props); 
} 


@Override 
public List<SourceRecord> poll() throws InterruptedException { 
    Thread.sleep(3000); 

    final String value = "Offset " + offset++ + " Timestamp " + Instant.now().toString(); 

    logger.info("poll value {}", value); 

    return ImmutableList.of(new SourceRecord(
      ImmutableMap.of("partition", 0), 
      ImmutableMap.of("offset", offset), 
      "topic-dummy", 
      SchemaBuilder.STRING_SCHEMA, 
      value 
    )); 
} 

public void stop() { 
    logger.info("stop"); 
} 

しかし、それはすべて完了だとき、私は自分の仕事を閉じることができますどのように? または、このタスクの別のアイデアを手伝ってもらえますか?

あなたの助けを頼りにしてください!

答えて

1

まず、既存のコネクタhereをご覧ください。私はspooldirコネクタがあなたに役立つように感じる。コードを書かなくてもダウンロードしてインストールすることも可能です。

第2に、正しく理解していれば、タスクを停止したいと考えています。私はthis discussionがあなたの望むものだと信じています。

+0

こんにちは! あなたのご協力ありがとうございます! これはまさに私が望むものではありませんが、spooldirコネクタは面白いです。 いいえ、私が望むときに私の仕事を止めたい、状況を想像させてください。 - 私の仕事はファイルを一行ずつ読み込み、ファイルの最後には私たちは仕事を止めることができません。 'stop'メソッドはコネクタだけで呼び出します(例えば、再調整の場合)。 – aarexer

+0

ああ、いくつかのイベントに基づいてタスク自体からタスクを停止したいと考えています。定義上、タスクの調整はコネクターの仕事であるため、私たちは通常、タスクを自分自身でやり直すことができないようにしたくないからです。代わりに次のファイルにタスクをフィードすることはできますか? – dawsaw

+0

はい、私の知るとおり、タスクの調整はコネクタの仕事です。 新しいファイルを供給していることが良い判断である可能性があります... Thanx、それは良い答えです! – aarexer

関連する問題