2012-06-16 29 views
8

HBaseに出力を書き出すhaddopジョブがあります。私は本当に減速機が必要ではない、私が挿入したい行の種類はマッパーで決定されます。Hadoop - Mapperから直接HBaseに書き込む

これを行うにはどのようにしてTableOutputFormatを使用できますか?私が見てきたすべての例から、減速機はPutを作成するものであり、TableMapperはHBaseテーブルから読み込むためのものであるという前提があります。

私の場合、入力はHDFSです。出力は特定のテーブルに配置されていますが、私はそれを手伝ってくれるTableMapReduceUtilには何も見つかりません。

私にそれを手伝ってくれる例がありますか?

ところで、私はあなただけマッパー出力ペアを作成する必要があり

+0

挿入しようとしているレコードの数はいくつですか? – Gevorg

答えて

1

新しいHadoopのAPIを使用しています。 OutputFormatは、出力キー値を保持する方法のみを指定します。必ずしもキー値がレデューサーから来ているわけではありません。

... extends TableMapper<ImmutableBytesWritable, Put>() { 
    ... 
    ... 
    context.write(<some key>, <some Put or Delete object>); 
} 
7

これは、ファイルからの読み込みの例であり、HBaseのにすべての行を置く: あなたはマッパーでこのような何かをする必要があります。この例は "Hbase:The Definitive Guide"のもので、リポジトリで見つけることができます。

git clone git://github.com/larsgeorge/hbase-book.git 

この書籍では、コードに関するすべての説明を見つけることもできます。しかし、何かが理解できない場合は、お気軽にお問い合わせください。

` public class ImportFromFile { 
    public static final String NAME = "ImportFromFile"; 
    public enum Counters { LINES } 

    static class ImportMapper 
    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
     private byte[] family = null; 
     private byte[] qualifier = null; 

     @Override 
     protected void setup(Context context) 
     throws IOException, InterruptedException { 
     String column = context.getConfiguration().get("conf.column"); 
     byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); 
     family = colkey[0]; 
     if (colkey.length > 1) { 
      qualifier = colkey[1]; 
     } 
     } 

     @Override 
     public void map(LongWritable offset, Text line, Context context) 
     throws IOException { 
      try { 
      String lineString = line.toString(); 
      byte[] rowkey = DigestUtils.md5(lineString); 
      Put put = new Put(rowkey); 
      put.add(family, qualifier, Bytes.toBytes(lineString)); 
      context.write(new ImmutableBytesWritable(rowkey), put); 
      context.getCounter(Counters.LINES).increment(1); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     } 
    } 

    private static CommandLine parseArgs(String[] args) throws ParseException { 
     Options options = new Options(); 
     Option o = new Option("t", "table", true, 
     "table to import into (must exist)"); 
     o.setArgName("table-name"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("c", "column", true, 
     "column to store row data into (must exist)"); 
     o.setArgName("family:qualifier"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("i", "input", true, 
     "the directory or file to read from"); 
     o.setArgName("path-in-HDFS"); 
     o.setRequired(true); 
     options.addOption(o); 
     options.addOption("d", "debug", false, "switch on DEBUG log level"); 
     CommandLineParser parser = new PosixParser(); 
     CommandLine cmd = null; 
     try { 
     cmd = parser.parse(options, args); 
     } catch (Exception e) { 
     System.err.println("ERROR: " + e.getMessage() + "\n"); 
     HelpFormatter formatter = new HelpFormatter(); 
     formatter.printHelp(NAME + " ", options, true); 
     System.exit(-1); 
     } 
     return cmd; 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = HBaseConfiguration.create(); 
     String[] otherArgs = 
     new GenericOptionsParser(conf, args).getRemainingArgs(); 
     CommandLine cmd = parseArgs(otherArgs); 
     String table = cmd.getOptionValue("t"); 
     String input = cmd.getOptionValue("i"); 
     String column = cmd.getOptionValue("c"); 
     conf.set("conf.column", column); 
     Job job = new Job(conf, "Import from file " + input + " into table " + table); 

      job.setJarByClass(ImportFromFile.class); 
     job.setMapperClass(ImportMapper.class); 
     job.setOutputFormatClass(TableOutputFormat.class); 
     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); 
     job.setOutputKeyClass(ImmutableBytesWritable.class); 
     job.setOutputValueClass(Writable.class); 
     job.setNumReduceTasks(0); 
     FileInputFormat.addInputPath(job, new Path(input)); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
    }` 
+1

私は以下を取得しています: 'コンテナの起動からの例外:org.apache.hadoop.util.Shell $ ExitCodeException'上記のコードと同様にこの問題に遭遇しましたか?私はHadoop2.4とHbase0.94.18を使用しています – Gevorg

関連する問題