2017-10-25 1 views
1

長時間実行するflinkジョブ用の外部設定を実装しようとしています。私の考えは、JSONでエンコードされた設定をhttpで外部サービスから定期的に(5分ごとに)ポーリングするカスタムソースを作成することです。定期更新用のFlinkソース

N分ごとにアクションを実行するソースを作成するにはどうすればよいですか? この設定をすべてのエグゼキュータに再ブロードキャストできますか?

答えて

0

まず、イベントストリームにあるすべての属性を定義するイベントクラスを作成し、すべてのgetter、setterおよびその他のメソッドを作成する必要があります。このクラスの例は、あなたが、あなたがすることによって行われ、この

public class Qrs_interval_Gen extends RichParallelSourceFunction<qrsIntervalStreamEvent> { 
@Override 
public void run(SourceContext<qrsIntervalStreamEvent> sourceContext) throws Exception { 


    int qrsInterval; 
    int Sensor_id; 
    long currentTime; 
    Random random = new Random(); 

    Integer InputRate = 10; 

    Integer Sleeptime = 1000 * 5/InputRate ; 


    for(int i = 0 ; i <= 100000 ; i++){ 


     // int randomNum = rand.nextInt((max - min) + 1) + min; 
     Sensor_id = 1; 

     qrsInterval = 10 + random.nextInt((20-10)+ 1); 
     // currentTime = System.currentTimeMillis(); 
     currentTime = i; 

     //System.out.println("qrsInterval = " + qrsInterval + ", Sensor_id = "+ Sensor_id); 
     try { 
      Thread.sleep(Sleeptime); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 


     qrsIntervalStreamEvent stream = new qrsIntervalStreamEvent(Sensor_id,currentTime,qrsInterval); 

      sourceContext.collect(stream); 

    } // for loop 


} 

    @Override 
    public void cancel() { 

    } 
} 

ここでは、全体の論理のようなコードに何かを書くことができます/ 5秒後、Xイベントでこれらのイベントを送信したいとしましょう今

public class qrsIntervalStreamEvent { 

    public Integer Sensor_id; 
    public long time; 
    public Integer qrsInterval; 


    public qrsIntervalStreamEvent(Integer sensor_id, long time, Integer qrsInterval) { 
     Sensor_id = sensor_id; 
     this.time = time; 
     this.qrsInterval = qrsInterval; 
    } 


    public Integer getSensor_id() { 
     return Sensor_id; 
    } 

    public void setSensor_id(Integer sensor_id) { 
     Sensor_id = sensor_id; 
    } 

    public long getTime() { 
     return time; 
    } 

    public void setTime(long time) { 
     this.time = time; 
    } 

    public Integer getQrsInterval() { 
     return qrsInterval; 
    } 

    public void setQrsInterval(Integer qrsInterval) { 
     this.qrsInterval = qrsInterval; 
    } 

    @Override 
    public boolean equals(Object o) { 
     if (this == o) return true; 
     if (!(o instanceof qrsIntervalStreamEvent)) return false; 

     qrsIntervalStreamEvent that = (qrsIntervalStreamEvent) o; 

     if (getTime() != that.getTime()) return false; 
     if (getSensor_id() != null ? !getSensor_id().equals(that.getSensor_id()) : that.getSensor_id() != null) 
      return false; 
     return getQrsInterval() != null ? getQrsInterval().equals(that.getQrsInterval()) : that.getQrsInterval() == null; 
    } 

    @Override 
    public int hashCode() { 
     int result = getSensor_id() != null ? getSensor_id().hashCode() : 0; 
     result = 31 * result + (int) (getTime()^(getTime() >>> 32)); 
     result = 31 * result + (getQrsInterval() != null ? getQrsInterval().hashCode() : 0); 
     return result; 
    } 


    @Override 
    public String toString() { 
     return "StreamEvent{" + 
       "Sensor_id=" + Sensor_id + 
       ", time=" + time + 
       ", qrsInterval=" + qrsInterval + 
       '}'; 
    } 


} //class 

になります

xイベント/秒を送信する場合、スリープ時間はその逆になります。例では、10のイベント/ 5秒を送信するため

同様に、第10イベント/

スリープ時間= 1000/10 = 100ミリ秒を送信するために、睡眠時間があろう

スリープ時間= 1000 * 5月10日= 500ミリ秒

それはあなたが何か質問がある場合は私に知らせて、お役に立てば幸いです

関連する問題