2016-01-09 14 views
16

レコードをバッチでプッシュ(INSERT)するJavaクライアントがCassandraクラスタにあります。バッチ内の要素はすべて同じ行キーを持つため、すべてが同じノードに配置されます。また、トランザクションをアトミックにする必要はないので、未記録のバッチを使用しています。カッサンドラのバッチ制限はどのくらいですか?

各バッチ内のINSERTコマンドの数は、さまざまな要因によって異なりますが、5〜50000の間の値にすることができます。まず、1つのバッチにある数のコマンドを入力して送信します。これはcom.datastax.driver.core.exceptions.InvalidQueryException: Batch too largeを投げた。次に、バッチごとに1000 INSERTのキャップを使用してから300に下がりました。私は、この限界がどこから来ているのかを突き止めるだけで無作為に推測していることに気付きました。

私の質問は、この制限は何ですか?それを変更することはできますか?どのように多くの要素をバッチに配置できるのかを知るにはどうすればよいですか?私のバッチが「満杯」になったら?

答えて

17

キャップを増やさず、複数のリクエストに分割することをおすすめします。巨大な単一の要求にすべてを入れることは、コーディネーターに大きな負の影響を与えます。すべてのパーティションを1つのパーティションにまとめることで、レイテンシを減らすことによって、一部のバッチでスループットを向上させることができますが、パフォーマンスを向上させるためにバッチを使用することは決してありません。したがって、異なるバッチサイズを使用して最大のスループットを得るために最適化しようとすると、ユースケース/スキーマ/ノードに大きく左右され、特定のテストが必要になります。

はそれを増やすが、あなたが実際に助け、あなたはスループットをしている傷つけていないことを確認するためにテストしてくださいにあなたのcassandra.yaml

# Fail any batch exceeding this value. 50kb (10x warn threshold) by default. 
batch_size_fail_threshold_in_kb: 50 

オプションがあります。

+0

これは私が探していたものです。ありがとうございます。クライアントのバッチのサイズを監視する最良の方法は何ですか? –

+2

使用しているドライバによって異なりますが、Javaドライバでは、バッチ内の個々のステートメントごとにgetValues()を使用して、残りの()メソッドを使用してサイズを取得できるByteBuffersの配列を返します。バッファーを個別にまとめ、それらを合計しますが、一般的にはそうすることをお勧めしません。あなたは超大規模なバッチを作成すべきではありません。それは、あなたがその限界に近づいていないと感じるところで十分です。 –

+0

ここにはたくさんのものがあります。 C *は行の代わりに列で設計し、C *はパーティションあたり2Bの列を示しますが、経験的にはスイートスポットは100MBです。100MBのパーティションでも、バッチのデフォルトサイズが50KBであれば、100MB/50KB = 3125リクエストで100MBのパーティションを取得できます。 – user1870400

2

仕事は、あなたがのようなものを見つけることができるでしょう記録します:[マッチ]を

ERROR 19時54分13秒バッチですサイズが103.072KiB、指定したしきい値が50,000KiB、53.072KiBを超えています。ここではどのようにバッチバッチの機能例であり、Javaで同様の問題を越え

0

蘭を(batch_size_fail_threshold_in_kbを参照してください):

import com.datastax.driver.core.BatchStatement; 
import com.datastax.driver.core.PreparedStatement; 
import com.datastax.driver.core.Session; 
import com.google.common.collect.Lists; 

import java.util.Collection; 
import java.util.stream.Collectors; 

public class CassandraBatchOfBatchesExample { 

    private final PreparedStatement statement; 
    private final Session session; 
    private final int batchSize; 

    public CassandraBatchOfBatchesExample(Session session, int batchSize) { 
     this.session = session; 
     this.batchSize = batchSize; 
     statement = session.prepare("INSERT_INTO some_table JSON ?"); 
    } 

    public void execute(Collection<String> jsons) { 
     Lists.partition(jsons 
       .stream() 
       .map(statement::bind) 
       .collect(Collectors.toList() 
      ), batchSize).stream() 
      .map(statements -> new BatchStatement().addAll(statements)) 
      .forEach(session::execute); 
    } 
} 

batchSize変数が挿入されている個々のレコードのサイズに基づいて変更する必要があります。

関連する問題