2016-08-22 20 views
1

ファイル名のリストが与えられています。このファイルにはカンマ区切りのデータが含まれており、ファイル名に基づいて情報を含む列でさらに拡張する必要があります。したがって、私は小さなread_file関数を実装しました。この関数は、最初のクリーニングと追加の列の計算の両方を処理します。 db.from_sequence(files).map(read_file)を使用して、読み込み関数をすべてのファイルにマッピングして、それぞれの辞書のリストを取得します。複数のファイルから複数のファイルからDaskデータフレームへダークバッグ

しかし、辞書のリストではなく、入力ファイルの個々の行をエントリとして格納するようにします。その後、辞書のキーをdaskデータフレーム内の列名にマップする必要があります。

from dask import bag as db 

def read_file(filename): 
    ret = [] 
    with open(filename, 'r') as fp: 
     ... # reading line of file and storing result in dict 
     ret.append({'a': val_a, 'b': val_b, 'c': val_c}) 

    return ret 

from dask import bag as db 
files = ['a.txt', 'b.txt', 'c.txt'] 
my_bag = db.from_sequence(files).map(read_file) 
# a,b,c are the keys of the dictionaries returned by read_file 
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c']) 

このコードを実行するために変更する必要があることを教えてもらえますか?もっと適切なアプローチがありますか?

編集: 私は3つのテストファイルa_20160101.txt,a_20160102.txt,a_20160103.txtを作成しました。それらのすべてには、それぞれ1つの文字列を含む数行しかありません。

asdf 
sadfsadf 
sadf 
fsadff 
asdf 
sadfasd 
fa 
sf 
ads 
f 

は、以前私がread_fileの小さな誤りがあったが、今、読者にマッピングした後my_bag.take(10)を呼び出すと、正常に動作します:

([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],) 

しかしmy_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])、その後 my_df.head(10)はまだdask.async.AssertionError: 3 columns passed, passed data had 10 columns

+0

のシングルフラット化コレクションである私達の袋を連結することができますか? – MRocklin

答えて

0

あなたを提起しますおそらく電話する必要がありますconcat

あなたがあなたのバッグをマップ呼び出した後

['a.txt', 
'b.txt', 
'c.txt'] 

次のようになります:ファイル名の

あなたのバッグは、このようになります。各ファイルは、dictsのリストになっていた

[[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}], 
[{'a': 1, 'b': 2, 'c': 3}], 
[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]] 

。今あなたのバッグは、リストのリストのようなものです。

.to_dataframeの方法では、dictsのリストが必要です。だから、辞書の袋を持った後、to_dataframeに渡すと間違っている何dicts

my_bag = db.from_sequence(files).map(read_file).concat() 

[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}, 
{'a': 1, 'b': 2, 'c': 3}, 
{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}] 
+0

と 'concat()'は完全に動作します、ありがとう!別のアプローチを使用する方が良いでしょうか、それともうまく平坦化していますか?もし私が[dask-tutorial](https://github.com/dask/dask-tutorial)を正しく覚えていれば、袋は摂取/変換のためにうまくいくはずです。これは正しいのですか? – sim

+0

これは私にとって妥当なアプローチのようです。 [dask.delayed](http://dask.readthedocs.io/en/latest/delayed.html)を試すこともできます。 [コレクションを扱う](http://dask.readthedocs.io/en/latest/delayed-collections.html)の注記を参照してください。 – MRocklin

関連する問題