2012-04-09 13 views
0

私はamazon s3で膨大なcsvファイル(100MB +)を持っていて、それらをチャンクで読み込み、ルビーCSVライブラリを使用して処理します。Rubyのバッファリングされた/ RingBuffer IO + Amazon S3ノンブロッキングチャンクの読み込み

buffer = TheRightIOClass.new 
bytes_received = 0 
RightAws::S3Interface.new(<access_key>, <access_secret>).retrieve_object(bucket, key) do  |chunk| 
    bytes_received += buffer.write(chunk) 
    if bytes_received >= 1*MEGABYTE 
    bytes_received = 0 
    csv(buffer).each do |row| 
     process_csv_record(row) 
    end 
    end 
end 

def csv(io) 
    @csv ||= CSV.new(io, headers: true) 
end 

は、私は右のセットアップはここにあるべきかわからないし、何TheRightIOClassがある:私はハードのcsv処理のための右のIOオブジェクトを作成する時間を持っています。私はStringIOでファイル全体をメモリにロードしたくありません。これを行うには、rubyにbufferedioまたはringbufferがありますか? 誰かがスレッド(プロセスなし)とパイプを使って良い解決策を持っていれば、私はそれを見たいと思っています。

答えて

2

StringIOを使用して、賢明なエラー処理を行うことで、処理する前に行全体がチャンク内にあることを保証できます。この例のpackerクラスは、ディスクまたはデータベースにフラッシュするまで、解析された行をメモリに蓄積するだけです。

packer = Packer.new 
object = AWS::S3.new.buckets[bucket].objects[path] 
io = StringIO.new 
csv = ::CSV.new(io, headers: true) 
object.read do |chunk| 
    #Append the most recent chunk and rewind the IO 
    io << chunk 
    io.rewind 
    last_offset = 0 
    begin 
    while row = csv.shift do 
     #Store the parsed row unless we're at the end of a chunk 
     unless io.eof? 
     last_offset = io.pos 
     packer << row.to_hash 
     end 
    end 
    rescue ArgumentError, ::CSV::MalformedCSVError => e 
    #Only rescue malformed UTF-8 and CSV errors if we're at the end of chunk 
    raise e unless io.eof? 
    end 
    #Seek to our last offset, create a new StringIO with that partial row & advance the cursor 
    io.seek(last_offset) 
    io.reopen(io.read) 
    io.read 
    #Flush our accumulated rows to disk every 1 Meg 
    packer.flush if packer.bytes > 1*MEGABYTES 
end 
#Read the last row 
io.rewind 
packer << csv.shift.to_hash 
packer 
関連する問題