2016-06-12 11 views
0

ログファイルを処理するmapreduceジョブを作成しました。現時点では約52GBの入力ファイルがありますが、データを処理するのに約1時間かかります。デフォルトでは1つの減速ジョブしか作成されません。減らすタスクでタイムアウトエラーが発生することがあります。 。以下は、仕事の成功のための統計です。パフォーマンスがどのように向上するかをお知らせください。Mapreduce Job - 完了までに時間がかかりすぎる

File System Counters 
      FILE: Number of bytes read=876100387 
      FILE: Number of bytes written=1767603407 
      FILE: Number of read operations=0 
      FILE: Number of large read operations=0 
      FILE: Number of write operations=0 
      HDFS: Number of bytes read=52222279591 
      HDFS: Number of bytes written=707429882 
      HDFS: Number of read operations=351 
      HDFS: Number of large read operations=0 
      HDFS: Number of write operations=2 
    Job Counters 
      Failed reduce tasks=1 
      Launched map tasks=116 
      Launched reduce tasks=2 
      Other local map tasks=116 
      Total time spent by all maps in occupied slots (ms)=9118125 
      Total time spent by all reduces in occupied slots (ms)=7083783 
      Total time spent by all map tasks (ms)=3039375 
      Total time spent by all reduce tasks (ms)=2361261 
      Total vcore-seconds taken by all map tasks=3039375 
      Total vcore-seconds taken by all reduce tasks=2361261 
      Total megabyte-seconds taken by all map tasks=25676640000 
      Total megabyte-seconds taken by all reduce tasks=20552415744 
    Map-Reduce Framework 
      Map input records=49452982 
      Map output records=5730971 
      Map output bytes=864140911 
      Map output materialized bytes=876101077 
      Input split bytes=13922 
      Combine input records=0 
      Combine output records=0 
      Reduce input groups=1082133 
      Reduce shuffle bytes=876101077 
      Reduce input records=5730971 
      Reduce output records=5730971 
      Spilled Records=11461942 
      Shuffled Maps =116 
      Failed Shuffles=0 
      Merged Map outputs=116 
      GC time elapsed (ms)=190633 
      CPU time spent (ms)=4536110 
      Physical memory (bytes) snapshot=340458307584 
      Virtual memory (bytes) snapshot=1082745069568 
      Total committed heap usage (bytes)=378565820416 
    Shuffle Errors 
      BAD_ID=0 
      CONNECTION=0 
      IO_ERROR=0 
      WRONG_LENGTH=0 
      WRONG_MAP=0 
      WRONG_REDUCE=0 
    File Input Format Counters 
      Bytes Read=52222265669 
    File Output Format Counters 
      Bytes Written=707429882 

レデューサーの数を増やすと、次のようなクラスキャスト例外が発生します。私は問題がパーティショナークラスから来ていると思います。

java.lang.Exception: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text 
    at com.emaar.bigdata.exchg.logs.ActualKeyPartitioner.getPartition(ActualKeyPartitioner.java:1) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:716) 
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) 
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:56) 
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:1) 
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 

私のパーティショナクラス

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 

public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> { 

    HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>(); 
    Text newKey = new Text(); 

    @Override 
    public int getPartition(CompositeKey key, Text value, int numReduceTasks) { 

     try { 
      // Execute the default partitioner over the first part of the key 
      newKey.set(key.getSubject()); 
      return hashPartitioner.getPartition(newKey, value, numReduceTasks); 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return (int) (Math.random() * numReduceTasks); // this would return 
                  // a random value in 
                  // the range 
      // [0,numReduceTasks) 
     } 
    } 
} 

マッパーコード

public class ExchgLogsMapper extends Mapper<LongWritable, List<Text>, CompositeKey, Writable> { 
    String recepientAddresses = ""; 
    public static final String DELIVER = "DELIVER"; 
    public static final String RESOLVED = "Resolved"; 
    public static final String JUNK = "Junk E-mail"; 
    public static final String SEMICOLON = ";"; 
    public static final String FW1 = "FW: "; 
    public static final String FW2 = "Fw: "; 
    public static final String FW3 = "FWD: "; 
    public static final String FW4 = "Fwd: "; 
    public static final String FW5 = "fwd: "; 
    public static final String RE1 = "RE: "; 
    public static final String RE2 = "Re: "; 
    public static final String RE3 = "re: "; 


    Text mailType = new Text("NEW"); 
    Text fwType = new Text("FW"); 
    Text reType = new Text("RE"); 
    Text recepientAddr = new Text(); 

    @Override 
    public void map(LongWritable key, List<Text> values, Context context) throws IOException, InterruptedException { 
     String subj = null; 
     int lstSize=values.size() ; 
     if ((lstSize >= 26)) { 
      if (values.get(8).toString().equals(DELIVER)) { 
       if (!(ExclusionList.exclusions.contains(values.get(18).toString()))) { 
        if (!(JUNK.equals((values.get(12).toString())))) { 
         subj = values.get(17).toString(); 
         recepientAddresses = values.get(11).toString(); 
         String[] recepientAddressArr = recepientAddresses.split(SEMICOLON); 
         if (subj.startsWith(FW1) || subj.startsWith(FW2) || subj.startsWith(FW3) 
           || subj.startsWith(FW4) || subj.startsWith(FW5)) { 
          mailType = fwType; 
          subj = subj.substring(4); 
         } else if (subj.startsWith(RE1) || subj.startsWith(RE2) || subj.startsWith(RE3)) { 
          mailType = reType; 
          subj = subj.substring(4); 
         } 
         for (int i = 0; i < recepientAddressArr.length; i++) { 
          CompositeKey ckey = new CompositeKey(subj, values.get(0).toString()); 
          recepientAddr.set(recepientAddressArr[i]); 
          CompositeWritable out = new CompositeWritable(mailType, recepientAddr, values.get(18), 
            values.get(0)); 
          context.write(ckey, out); 
//       System.err.println(out); 

         } 
        } 
       } 
      } 
     } 
+0

減速機の数を増やすのを止めているのは何ですか? –

+0

私はレデューサーの数を増やすかのようにクラスキャストの例外を受け取ります。私は問題がパーティショナークラスから来ていると思います。私は詳細を質問を編集しました –

+0

はkey.getSubject a Textですか? 試すことができます return(key.getSubject.hashCode()&Integer.MAX_VALUE)%numReduceTasks; ハッシュ・パーティショナ・インスタンスを作成するのではなく、これは、HashPartitioner実装のコードです。 –

答えて

1

数sysoutsは、ログをたくさん書いていたループ内で、それらを除去した後、減速コードでした減速器は2〜3分で終了します。

関連する問題