2017-01-16 3 views
5

私は1つのバイト配列で表現する必要があるヘッダとデータを持っています。また、私は、バイト配列内のヘッダをパックするための特定のフォーマットと、バイト配列内のデータをパックするための別のフォーマットを持っています。私はこれらの2つを持って、それから1つの最終的なバイト配列を作る必要があります。効率的な方法でByteBufferを使用して、1バイト配列のヘッダーとデータレイアウトをパックしますか?

以下は、C++での定義方法であり、Javaで行う必要があるレイアウトです。

// below is my header offsets layout 

// addressedCenter must be the first byte 
static constexpr uint32_t addressedCenter  = 0; 
static constexpr uint32_t version    = addressedCenter + 1; 
static constexpr uint32_t numberOfRecords  = version + 1; 
static constexpr uint32_t bufferUsed   = numberOfRecords + sizeof(uint32_t); 
static constexpr uint32_t location    = bufferUsed + sizeof(uint32_t); 
static constexpr uint32_t locationFrom   = location + sizeof(CustomerAddress); 
static constexpr uint32_t locationOrigin  = locationFrom + sizeof(CustomerAddress); 
static constexpr uint32_t partition   = locationOrigin + sizeof(CustomerAddress); 
static constexpr uint32_t copy     = partition + 1; 

// this is the full size of the header 
static constexpr uint32_t headerOffset = copy + 1; 

そしてCustomerAddressuint64_tのtypedefで、それはこのように作られている -

typedef uint64_t CustomerAddress; 

void client_data(uint8_t datacenter, 
       uint16_t clientId, 
       uint8_t dataId, 
       uint32_t dataCounter, 
       CustomerAddress& customerAddress) 
{ 
    customerAddress = (uint64_t(datacenter) << 56) 
        + (uint64_t(clientId) << 40) 
        + (uint64_t(dataId) << 32) 
        + dataCounter; 
} 

そして、以下は私のデータレイアウトである -

// below is my data layout - 
// 
// key type - 1 byte 
// key len - 1 byte 
// key (variable size = key_len) 
// timestamp (sizeof uint64_t) 
// data size (sizeof uint16_t) 
// data (variable size = data size) 

問題文: -

今、pの一部私は必要なフィールドを渡すことができるように、Javaのある特定のクラスの全体的なものを表現しようとしています。それでは、最初にヘッダを持ち、次にデータを持っている最後のByte Arrayを私に作れます:

以下は私のDataFrameクラスです:あなたは私のDataFrameクラスで見ることができるように、私はを割り当てています

public static void main(String[] args) throws IOException { 
    // header layout 
    byte addressedCenter = 0; 
    byte version = 1; 

    long location = packCustomerAddress((byte) 12, (short) 13, (byte) 32, (int) 120); 
    long locationFrom = packCustomerAddress((byte) 21, (short) 23, (byte) 41, (int) 130); 
    long locationOrigin = packCustomerAddress((byte) 21, (short) 24, (byte) 41, (int) 140); 

    byte partition = 3; 
    byte copy = 0; 

    // this map will have key as the actual key and value as the actual data, both in byte array 
    // for now I am storing only two entries in this map 
    Map<byte[], byte[]> keyDataHolder = new HashMap<byte[], byte[]>(); 
    for (int i = 1; i <= 2; i++) { 
     keyDataHolder.put(generateKey(), getMyData()); 
    } 

    DataFrame records = 
     new DataFrame(addressedCenter, version, keyDataHolder, location, locationFrom, 
      locationOrigin, partition, copy); 

    // this will give me final packed byte array 
    // which will have header and data in it. 
    byte[] packedArray = records.serialize(); 
    } 

    private static long packCustomerAddress(byte datacenter, short clientId, byte dataId, 
     int dataCounter) { 
    return ((long) (datacenter) << 56) | ((long) clientId << 40) | ((long) dataId << 32) 
     | ((long) dataCounter); 
    } 

:以下

public final class DataFrame { 
    private final byte addressedCenter; 
    private final byte version; 
    private final Map<byte[], byte[]> keyDataHolder; 
    private final long location; 
    private final long locationFrom; 
    private final long locationOrigin; 
    private final byte partition; 
    private final byte copy; 

    public DataFrame(byte addressedCenter, byte version, 
     Map<byte[], byte[]> keyDataHolder, long location, long locationFrom, 
     long locationOrigin, byte partition, byte copy) { 
    this.addressedCenter = addressedCenter; 
    this.version = version; 
    this.keyDataHolder = keyDataHolder; 
    this.location = location; 
    this.locationFrom = locationFrom; 
    this.locationOrigin = locationOrigin; 
    this.partition = partition; 
    this.copy = copy; 
    } 

    public byte[] serialize() { 
    // All of the data is embedded in a binary array with fixed maximum size 70000 
    ByteBuffer byteBuffer = ByteBuffer.allocate(70000); 
    byteBuffer.order(ByteOrder.BIG_ENDIAN); 

    int numOfRecords = keyDataHolder.size(); 
    int bufferUsed = getBufferUsed(keyDataHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2; 

    // header layout 
    byteBuffer.put(addressedCenter); // byte 
    byteBuffer.put(version); // byte 
    byteBuffer.putInt(numOfRecords); // int 
    byteBuffer.putInt(bufferUsed); // int 
    byteBuffer.putLong(location); // long 
    byteBuffer.putLong(locationFrom); // long 
    byteBuffer.putLong(locationOrigin); // long 
    byteBuffer.put(partition); // byte 
    byteBuffer.put(copy); // byte 

    // now the data layout 
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) { 
     byte keyType = 0; 
     byte keyLength = (byte) entry.getKey().length; 
     byte[] key = entry.getKey(); 
     byte[] data = entry.getValue(); 
     short dataSize = (short) data.length; 

     ByteBuffer dataBuffer = ByteBuffer.wrap(data); 
     long timestamp = 0; 

     if (dataSize > 10) { 
     timestamp = dataBuffer.getLong(2);    
     }  

     byteBuffer.put(keyType); 
     byteBuffer.put(keyLength); 
     byteBuffer.put(key); 
     byteBuffer.putLong(timestamp); 
     byteBuffer.putShort(dataSize); 
     byteBuffer.put(data); 
    } 
    return byteBuffer.array(); 
    } 

    private int getBufferUsed(final Map<byte[], byte[]> keyDataHolder) { 
    int size = 36; 
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) { 
     size += 1 + 1 + 8 + 2; 
     size += entry.getKey().length; 
     size += entry.getValue().length; 
    } 
    return size; 
    } 
} 

そして、私は私の上記DataFrameクラスを使用しています方法ですの定義済みサイズは70000です。ハードコードされた70000の代わりにByteBufferを作成しながら使用しているサイズを割り当てることができるより良い方法はありますか?

また、私のヘッダーとデータを1バイトの配列にパックしているのと比べて、より良い方法がありますか?私はまた、それが複数のスレッドによって呼び出すことができるので、スレッドセーフであることを確認する必要があります。

+1

マルチスレッドコンテキストでは、byteBufferは静的であってはなりません。 –

答えて

1

ByteBufferを作る代わりにハードコードさ70000を使用している間、私は私が使用していますサイズを割り当てることが可能な、より良い方法はありますか?

少なくとも2つの重複しないアプローチがあります。両方を使うことができます。

1つはバッファプールです。ピーク時に必要なバッファの数を調べ、その上に最大値を使用する必要があります。 max + max/2、max + average、max + mode、2 * maxです。

import java.nio.ByteBuffer; 
import java.nio.ByteOrder; 
import java.util.concurrent.CompletionStage; 
import java.util.concurrent.LinkedBlockingDeque; 
import java.util.function.Consumer; 
import java.util.function.Function; 

public class ByteBufferPool { 
    private final int bufferCapacity; 
    private final LinkedBlockingDeque<ByteBuffer> queue; 

    public ByteBufferPool(int limit, int bufferCapacity) { 
     if (limit < 0) throw new IllegalArgumentException("limit must not be negative."); 
     if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative."); 

     this.bufferCapacity = bufferCapacity; 
     this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit); 
    } 

    public ByteBuffer acquire() { 
     ByteBuffer buffer = (queue == null) ? null : queue.pollFirst(); 
     if (buffer == null) { 
      buffer = ByteBuffer.allocate(bufferCapacity); 
     } 
     else { 
      buffer.clear(); 
      buffer.order(ByteOrder.BIG_ENDIAN); 
     } 
     return buffer; 
    } 

    public boolean release(ByteBuffer buffer) { 
     if (buffer == null) throw new IllegalArgumentException("buffer must not be null."); 
     if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity."); 
     if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct."); 
     if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only."); 

     return (queue == null) ? false : queue.offerFirst(buffer); 
    } 

    public void withBuffer(Consumer<ByteBuffer> action) { 
     if (action == null) throw new IllegalArgumentException("action must not be null."); 

     ByteBuffer buffer = acquire(); 
     try { 
      action.accept(buffer); 
     } 
     finally { 
      release(buffer); 
     } 
    } 

    public <T> T withBuffer(Function<ByteBuffer, T> function) { 
     if (function == null) throw new IllegalArgumentException("function must not be null."); 

     ByteBuffer buffer = acquire(); 
     try { 
      return function.apply(buffer); 
     } 
     finally { 
      release(buffer); 
     } 
    } 

    public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) { 
     if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null."); 

     ByteBuffer buffer = acquire(); 
     CompletionStage<T> future = null; 
     try { 
      future = asyncFunction.apply(buffer); 
     } 
     finally { 
      if (future == null) { 
       release(buffer); 
      } 
      else { 
       future = future.whenComplete((result, throwable) -> release(buffer)); 
      } 
     } 
     return future; 
    } 
} 

acquirereleaseが獲得を分離し、ポイントを解放可能にしながら、withBuffer方法は、プールの直接的使用を可能にします。もう1つは、シリアル化インターフェースを分離することである。 put,putIntおよびputLongで、バイトカウントクラスと実際のバイトバッファリングクラスを実装できます。そのようなインタフェースにメソッドを追加して、不要なバイトの生成を避けるためにシリアライザがバイトまたはバッファをカウントしているかどうかを知る必要があります。また、実際にシリアル化せずに文字列のサイズを計算する際に、 。あなたのコードで

public interface ByteSerializer { 
    ByteSerializer put(byte value); 

    ByteSerializer putInt(int value); 

    ByteSerializer putLong(long value); 

    boolean isSerializing(); 

    ByteSerializer add(int bytes); 

    int position(); 
} 

 

public class ByteCountSerializer implements ByteSerializer { 
    private int count = 0; 

    @Override 
    public ByteSerializer put(byte value) { 
     count += 1; 
     return this; 
    } 

    @Override 
    public ByteSerializer putInt(int value) { 
     count += 4; 
     return this; 
    } 

    @Override 
    public ByteSerializer putLong(long value) { 
     count += 8; 
     return this; 
    } 

    @Override 
    public boolean isSerializing() { 
     return false; 
    } 

    @Override 
    public ByteSerializer add(int bytes) { 
     if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative."); 

     count += bytes; 
     return this; 
    } 

    @Override 
    public int position() { 
     return count; 
    } 
} 

 

import java.nio.ByteBuffer; 

public class ByteBufferSerializer implements ByteSerializer { 
    private final ByteBuffer buffer; 

    public ByteBufferSerializer(int bufferCapacity) { 
     if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative."); 

     this.buffer = ByteBuffer.allocate(bufferCapacity); 
    } 

    @Override 
    public ByteSerializer put(byte value) { 
     buffer.put(value); 
     return this; 
    } 

    @Override 
    public ByteSerializer putInt(int value) { 
     buffer.putInt(value); 
     return this; 
    } 

    @Override 
    public ByteSerializer putLong(long value) { 
     buffer.putLong(value); 
     return this; 
    } 

    @Override 
    public boolean isSerializing() { 
     return true; 
    } 

    @Override 
    public ByteSerializer add(int bytes) { 
     if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative."); 

     for (int b = 0; b < bytes; b++) { 
      buffer.put((byte)0); 
     } 
     return this; 
     // or throw new UnsupportedOperationException(); 
    } 

    @Override 
    public int position() { 
     return buffer.position(); 
    } 

    public ByteBuffer buffer() { 
     return buffer; 
    } 
} 

、あなたはこれらの線に沿って何か(テストされていない)んだろう:

ByteCountSerializer counter = new ByteCountSerializer(); 
dataFrame.serialize(counter); 
ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position()); 
dataFrame.serialize(serializer); 
ByteBuffer buffer = serializer.buffer(); 
// ... write buffer, ?, profit ... 

あなたをメソッドは、ByteSerializerを受け入れるようにリファクタリングする必要があり、データを生成する場合は、isSerializingをチェックしてサイズを計算するか実際にバイトを書き込むべきかを確認する必要があります。

私は練習として両方のアプローチを組み合わせたままにしています。これは主にどのように決定するかによって大きく左右されるためです。

たとえば、ByteBufferSerializerはプールを直接使用し、任意の容量(たとえば70000)を維持することができます。容量はByteBufferです(ただし、必要な容量の代わりに、必要な容量を設定し、バッファの制限をacquireから返す前に設定するか、reset()メソッドを追加する限り、直接ByteBufferSerializerをプールすることができます)。

また、私のヘッダーとデータを1バイトの配列でパックしているのと比べて、より良い方法がありますか?

はい。特定のメソッドの長さがチェックされた後、またはその内容がコピーされた瞬間に破棄されるバイト配列を返す代わりに、バイトバッファリングのインスタンスを渡します。

また、複数のスレッドで呼び出すことができるため、スレッドセーフであることを確認する必要があります。

それぞれのバッファが1つのスレッドのみで使用され、適切な同期化が行われている限り、心配する必要はありません。

適切な同期化とは、プールマネージャーがそのメソッドでセマンティクスを取得して解放すること、およびプールからフェッチしてプールに戻す間に複数のスレッドによってバッファーが使用される場合、停止セマンティクスを停止するスレッドに追加することを意味します。バッファの使用を開始するスレッドで取得のセマンティクスを追加します。たとえば、バッファをCompletableFutureに渡している場合は、これについて心配する必要はありません。または、Exchangerのスレッド間、またはBlockingQueueの適切な実装を使用して明示的に通信している必要があります。 java.util.concurrentのパッケージの説明から

java.util.concurrent内のすべてのクラスのメソッドとそのサブパッケージは、より高いレベルの同期にこれらの保証を延長します。具体的には:前の任意の同時収集にオブジェクトを配置するスレッドで

  • アクションが起こる前に、別のスレッドでコレクションからその要素のアクセスまたは除去後にアクション。 Executor前のRunnableの提出にスレッドで

  • アクションが起こる前に、その実行が開始されます。同様にCallablesについては、ExecutorServiceに提出されている。 Futureで表される非同期計算によって取ら

  • アクションが起こる前に、別のスレッドでFuture.get()介して結果の検索に続いアクション。

  • アクション前に「解放する」シンクロナイザ方法などLock.unlockSemaphore.release、およびCountDownLatch.countDownとして起こる、前と同じシンクロナイザーオブジェクト上アクションメソッドなどLock.lockSemaphore.acquireCondition.await「取得」の成功に続いて、そしてCountDownLatch.await別のスレッドで正常Exchangerを介してオブジェクトを交換するスレッドの各ペアについて

  • 、各スレッドにおける前exchange()のアクションが起こる前に、別のスレッドに対応exchange()に続くもの。

  • アクションCyclicBarrier.awaitPhaser.awaitAdvance(だけでなく、その変種)を呼び出す前に起こり、前遮断作用によって行わ遮断作用によって実行されるアクション、およびアクションが起こる-前から正常復帰へアクション以降の他のスレッドに対応するawait

+0

あなたの提案をありがとう。私が理解しやすいように、答えの最初の2つの提案の例を私に教えてもらえますか?今私はこれがどのように機能するのか混乱しています。 – john

+0

私は例を追加しました。 – acelent

+0

私は非常に正直である。私はあなたの例からいくつかの部分を把握することができた。しかし私はあなたの提案を自分のコードでどのように使うのか理解できません。あなたはこれを私のための練習として残していると言いました。私は私と私がどのように統合するのかよく分かりません。私はいつもバイトバッファーの非常に低いレベルのAPIを扱い、バイトを扱っています。私はこれらのすべてのものが似ていることを学ばなければなりません。あなたが私に例を提供するのを助けることができれば、これはどのように私のものと統合されるでしょうか、それは大きな助けになるでしょう。 – john

0

ByteArrayOutputStreamを中心にDataOutputStreamを経由する別の方法がありますが、必要な場所にパフォーマンスチューニングを集中させる必要があります。これはその1つではありません。ここでは効率性は問題ではありません。ネットワークI/Oは数桁のオーダーで支配的です。

ByteArrayOutputStreamを使用するもう1つの理由は、バッファサイズをあらかじめ推測する必要がないことです。必要に応じてサイズが大きくなります。

スレッドセーフな状態に保つには、ローカル変数のみを使用してください。

+0

ネットワークI/Oの効率があまり高くない場合でも、何百万ものアクティブな接続を処理する必要があるサーバーを実装している場合はどうなりますか?バッファの割り当て(およびコピー)は、Javaおよび.NETで実装されたそのようなサーバーでのパフォーマンスの原因の1つです。 – acelent

+0

@acelentそして、あなたは多くのCPUパワーを必要とします。しかし、7000バイトの事前割り振りを除いて、OPのコードについて特に非効率的なものは何もありません。 – EJP

関連する問題