2013-10-09 14 views
5

私のプレーフレームワークベースのWebアプリケーションでは、さまざまなデータベーステーブルのすべての行をCSV形式またはJSON形式でダウンロードできます。テーブルは比較的大きく(100k +行)、Play 2.2でチャンクを使用して結果をストリームバックしようとしています。Play 2.2の低速チャンク応答

しかし、問題はprintlnステートメントが行がChunks.Outオブジェクトに書き込まれることを示しますが、クライアント側には表示されません。私が行を返送することを制限した場合、それはうまくいくでしょう。しかし、すべての行を返送しようとするとタイムアウトが発生するか、サーバがメモリ不足になった場合には、大きくなります。

私はEbean ORMを使用しており、テーブルにはインデックスが作成されており、psqlからのクエリには時間がかかりません。誰が何が問題なのか考えている人はいますか?

多くのご支援をいただきありがとうございます。ここで

は、コントローラのいずれかのコードです:

@SecureSocial.UserAwareAction 
public static Result showEpex() { 

    User user = getUser(); 
    if(user == null || user.getRole() == null) 
     return ok(views.html.profile.render(user, Application.NOT_CONFIRMED_MSG)); 

    DynamicForm form = DynamicForm.form().bindFromRequest(); 
    final UserRequest req = UserRequest.getRequest(form); 

    if(req.getFormat().equalsIgnoreCase("html")) { 
     Page<EpexEntry> page = EpexEntry.page(req.getStart(), req.getFinish(), req.getPage()); 
     return ok(views.html.epex.render(page, req)); 
    } 

    // otherwise chunk result and send back 
    final ResultStreamer<EpexEntry> streamer = new ResultStreamer<EpexEntry>(); 
    Chunks<String> chunks = new StringChunks() { 
      @Override 
      public void onReady(play.mvc.Results.Chunks.Out<String> out) { 

       Page<EpexEntry> page = EpexEntry.page(req.getStart(), req.getFinish(), 0); 
       ResultStreamer<EpexEntry> streamer = new ResultStreamer<EpexEntry>(); 
       streamer.stream(out, page, req); 
      } 
    }; 
    return ok(chunks).as("text/plain"); 
} 

ストリーマ:

public class ResultStreamer<T extends Entry> { 

private static ALogger logger = Logger.of(ResultStreamer.class); 

public void stream(Out<String> out, Page<T> page, UserRequest req) { 

    if(req.getFormat().equalsIgnoreCase("json")) { 
     JsonContext context = Ebean.createJsonContext(); 
     out.write("[\n"); 
     for(T e: page.getList()) 
      out.write(context.toJsonString(e) + ", "); 
     while(page.hasNext()) { 
      page = page.next(); 
      for(T e: page.getList()) 
       out.write(context.toJsonString(e) + ", "); 
     } 
     out.write("]\n"); 
     out.close(); 
    } else if(req.getFormat().equalsIgnoreCase("csv")) { 
     for(T e: page.getList()) 
      out.write(e.toCsv(CSV_SEPARATOR) + "\n"); 
     while(page.hasNext()) { 
      page = page.next(); 
      for(T e: page.getList()) 
       out.write(e.toCsv(CSV_SEPARATOR) + "\n"); 
     } 
     out.close(); 
    }else { 
     out.write("Invalid format! Only CSV, JSON and HTML can be generated!"); 
     out.close(); 
    } 
} 


public static final String CSV_SEPARATOR = ";"; 
} 

とモデル:ブラウザのほとんど

@Entity 
@Table(name="epex") 
public class EpexEntry extends Model implements Entry { 

    @Id 
    @Column(columnDefinition = "pg-uuid") 
    private UUID id; 
    private DateTime start; 
    private DateTime finish; 
    private String contract; 
    private String market; 
    private Double low; 
    private Double high; 
    private Double last; 
    @Column(name="weight_avg") 
    private Double weightAverage; 
    private Double index; 
    private Double buyVol; 
    private Double sellVol; 

    private static final String START_COL = "start"; 
    private static final String FINISH_COL = "finish"; 
    private static final String CONTRACT_COL = "contract"; 
    private static final String MARKET_COL = "market"; 
    private static final String ORDER_BY = MARKET_COL + "," + CONTRACT_COL + "," + START_COL; 

    public static final int PAGE_SIZE = 100; 

    public static final String HOURLY_CONTRACT = "hourly"; 
    public static final String MIN15_CONTRACT = "15min"; 

    public static final String FRANCE_MARKET = "france"; 
    public static final String GER_AUS_MARKET = "germany/austria"; 
    public static final String SWISS_MARKET = "switzerland"; 

    public static Finder<UUID, EpexEntry> find = 
      new Finder(UUID.class, EpexEntry.class); 

    public EpexEntry() { 
    } 

    public EpexEntry(UUID id, DateTime start, DateTime finish, String contract, 
      String market, Double low, Double high, Double last, 
      Double weightAverage, Double index, Double buyVol, Double sellVol) { 
     this.id = id; 
     this.start = start; 
     this.finish = finish; 
     this.contract = contract; 
     this.market = market; 
     this.low = low; 
     this.high = high; 
     this.last = last; 
     this.weightAverage = weightAverage; 
     this.index = index; 
     this.buyVol = buyVol; 
     this.sellVol = sellVol; 
    } 

    public static Page<EpexEntry> page(DateTime from, DateTime to, int page) { 

     if(from == null && to == null) 
      return find.order(ORDER_BY).findPagingList(PAGE_SIZE).getPage(page); 
     ExpressionList<EpexEntry> exp = find.where(); 
     if(from != null) 
      exp = exp.ge(START_COL, from); 
     if(to != null) 
      exp = exp.le(FINISH_COL, to.plusHours(24)); 
     return exp.order(ORDER_BY).findPagingList(PAGE_SIZE).getPage(page); 
    } 

    @Override 
    public String toCsv(String s) { 
     return id + s + start + s + finish + s + contract + 
       s + market + s + low + s + high + s + 
       last + s + weightAverage + s + 
       index + s + buyVol + s + sellVol; 
    } 

答えて

3

1.結果を表示する前に1〜5kbのデータを待ちます。プレイフレームワークが実際にコマンドcurl http://localhost:9000でデータを送信しているかどうかを確認することができます。

2.最初final ResultStreamer<EpexEntry> streamer = new ResultStreamer<EpexEntry>();

3.を削除し、二回ストリーマを作成 - あなたは大規模なデータセットを取得するためのPageクラスを使用する - これが正しくありません。実際には、1回の大きな初期要求を行い、次に繰り返しごとに1つの要求を行います。これは遅いです。単純なfindIterate()を使用します。

EpexEntry(あなたが必要として、それを変更すること自由に感じ)にこれを追加し

public static QueryIterator<EpexEntry> all() { 
    return find.order(ORDER_BY).findIterate(); 
} 

新しいストリームのメソッドの実装:

public void stream(Out<String> out, QueryIterator<T> iterator, UserRequest req) { 

    if(req.getFormat().equalsIgnoreCase("json")) { 
     JsonContext context = Ebean.createJsonContext(); 
     out.write("[\n"); 
     while (iterator.hasNext()) { 
      out.write(context.toJsonString(iterator.next()) + ", "); 
     } 
     iterator.close(); // its important to close iterator 
     out.write("]\n"); 
     out.close(); 
    } else // csv implementation here 

そして、あなたのonReady方法:

  QueryIterator<EpexEntry> iterator = EpexEntry.all(); 
      ResultStreamer<EpexEntry> streamer = new ResultStreamer<EpexEntry>(); 
      streamer.stream(new BuffOut(out, 10000), iterator, req); // notice buffering here 

もう1つの問題は、Out<String>.write()をあまりにも頻繁に呼び出すことです。 write()のコールは、サーバーがクライアントに即座に新しいチャンクを送信する必要があることを意味します。です。 Out<String>.write()のすべてのコールには大きなオーバーヘッドがあります。

サーバーがチャンクされた結果に応答をラップする必要があるため、オーバーヘッドが表示されます。メッセージごとに6〜7バイトです。Chunked response Format。小さなメッセージを送信するので、オーバーヘッドが重要です。 また、サーバーはあなたの応答をTCPパケットでラップする必要があります。 そして、サーバーはチャンクを送信するために何らかの内部アクションを実行する必要がありますが、これにはいくつかのリソースも必要です。結果として、ダウンロード帯域幅は最適ではありません。

ここに簡単なテストがあります:10000行のテキストTEST0〜TEST9999をチャンクで送信します。私のコンピュータでは平均3秒かかります。しかし、これをバッファリングするには65ミリ秒かかります。また、ダウンロードサイズは136kbと87.5kbです。バッファリングと

例:

コントローラ

public class Application extends Controller { 
    public static Result showEpex() { 
     Chunks<String> chunks = new StringChunks() { 
      @Override 
      public void onReady(play.mvc.Results.Chunks.Out<String> out) { 
       new ResultStreamer().stream(out); 
      } 
     }; 
     return ok(chunks).as("text/plain"); 
    } 
} 

新しいBuffOutクラス。それは

public class BuffOut { 
    private StringBuilder sb; 
    private Out<String> dst; 

    public BuffOut(Out<String> dst, int bufSize) { 
     this.dst = dst; 
     this.sb = new StringBuilder(bufSize); 
    } 

    public void write(String data) { 
     if ((sb.length() + data.length()) > sb.capacity()) { 
      dst.write(sb.toString()); 
      sb.setLength(0); 
     } 
     sb.append(data); 
    } 

    public void close() { 
     if (sb.length() > 0) 
      dst.write(sb.toString()); 
     dst.close(); 
    } 
} 

この実装は、3秒ダウンロード時間と136キロバイトのサイズを持っている

public class ResultStreamer { 
    public void stream(Out<String> out) { 
    for (int i = 0; i < 10000; i++) { 
      out.write("TEST" + i + "\n"); 
     } 
     out.close(); 
    } 
} 

私が知っている、この実装ダム65ミリ秒の時間と87.5キロバイトのサイズをダウンロードし、あなたのための

public class ResultStreamer { 
    public void stream(Out<String> out) { 
     BuffOut out2 = new BuffOut(out, 1000); 
     for (int i = 0; i < 10000; i++) { 
      out2.write("TEST" + i + "\n"); 
     } 
     out2.close(); 
    } 
} 
+0

感謝していますViktorに答えてください。バッファリングはスピードを向上させますが、書き出してからブラウザに表示されるまでの遅延はまだまだ巨大です。単純なprintlnステートメントを追加すると、すべての行がアウトに書き込まれ、それ以上の行がなくなっても閉じられていない場合、ブラウザーでロードが開始されます。行の数が大きすぎると、次のようなタイムアウトエラーが発生します。 – p00ya00

+0

[ERROR] [10/22/2013 13:57:16.285] [application-akka.actor.default-dispatcher-5] [ActorSystem(application )] [5000ミリ秒後に先物がタイムアウトしたため]終了コールバックの実行に失敗しました java.util.concurrent.TimeoutException:先物が[5000ミリ秒]後にタイムアウトしました scala.concurrent.impl.Promise $ DefaultPromise.ready (Promise.scala:96) scala.concurrent.impl.Promise $ DefaultPromise.result(Promise.scala:100) at scala.concurrent.Await $$ anonfun $結果$ 1.apply(package.scala:107) atkka.dispatch.MonitorableThreadFactory $ AkkaForkJoinWorkerThread $$ anon $ – p00ya00

+0

コードにいくつかの 'System.out.println(System.currentTimeMillis())'を挿入して出力を表示してください。 public void stream(Out out、Page page、UserRequest req)の最後の行の直前、直前に 'static result showEpex()'の後に置いてください。 '返信OK(チャンク).as(" text/plain ");'?あなたのチャンクの実行が何らかの理由で終了していない、または時間がかかるので、実行はプレイフレームワークによって終了しました。また、私のコードを実行しようとしましたか?あなたはそれと同じ問題を抱えているかどうか確認してください。 –

関連する問題