2017-12-19 14 views
2

私はMapReduceのコードから奇妙な出力を取得していますに影響を与えません。削減機能は、最終的な出力

入力:

aa bb 
aa cc 
bb aa 
cc dd 
dd bb 
xx aa 
ss rr 

出力:

aa [email protected] 
aa [email protected] 
aa [email protected] 
aa [email protected] 
bb [email protected] 
bb [email protected] 
bb [email protected] 
cc [email protected] 
cc [email protected] 
dd [email protected] 
dd [email protected] 
rr [email protected] 
ss [email protected] 
xx [email protected] 

コード:

package org.mapreduce.userscore; 

import java.io.*; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 


public class UserScore { 

public static class ScoreWritable implements Writable { 
      private IntWritable N; 
      private IntWritable M; 

      //Default Constructor 
      public ScoreWritable() { 
       this.N = new IntWritable(); 
       this.M = new IntWritable(); 
      } 

      //Custom constructor 
      public ScoreWritable(IntWritable N, IntWritable M){ 
       this.N = N; 
       this.M = M; 
      } 

      //Setter method to set the values of ScoreWritable objects 
      public void set(IntWritable NN,IntWritable MM) { 
       this.N = NN; 
       this.M = MM; 
      } 

      //to get the first object from Score Record 
      public IntWritable getN() { 
       return N; 
      } 

      //to get the second object from Score Record 
      public IntWritable getM() { 
       return M; 
      } 

      @Override 
      //overriding default readFields method. 
      //It de-serializes the byte stream data 
      public void readFields(DataInput in) throws IOException { 
       N.readFields(in); 
       M.readFields(in); 
      } 

      @Override 
      //It serializes object data into byte stream data 
      public void write(DataOutput out) throws IOException { 
       N.write(out); 
       M.write(out); 
      } 

      //@Override 
      //public boolean equals(Object o) { 
       //if (o instanceof ScoreWritable) { 
       //ScoreWritable other = (ScoreWritable) o; 
       //return N.equals(other.N) && M.equals(other.M); 
       //} 
       //return false; 
      //} 

      @Override 
      public int hashCode() { 
       return N.hashCode(); 
      } 

} 

public static class Map extends Mapper<LongWritable, Text, Text, ScoreWritable> { 
    private Text user = new Text(); 
    private ScoreWritable score = new ScoreWritable(); 
    private IntWritable NN = new IntWritable(); 
    private IntWritable MM = new IntWritable(); 

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     int iterator = 1; 
     String line = value.toString(); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     while (tokenizer.hasMoreTokens()) { 
      user.set(tokenizer.nextToken()); 
      if (iterator == 1) { 
       NN = new IntWritable(1); 
       MM = new IntWritable(0); 
       iterator += 1; 
      } else { 
       NN = new IntWritable(0); 
       MM = new IntWritable(1); 
      } 
      score.set(NN,MM); 
      context.write(user, score); 
     } 
    } 
} 

public static class Reduce extends Reducer<Text, ScoreWritable, Text, IntWritable> { 
    private IntWritable resultf = new IntWritable(); 
    public void reduce(Text key, Iterable<ScoreWritable> values, Context context) throws IOException, InterruptedException { 
     //int result = ((values.getN().get()) * (values.getM()).get()); 
     resultf.set(result); 
     context.write(key, resultf = new IntWritable(2)); 
    } 
} 

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 

    //Create a new Jar and set the driver class(this class) as the main class of jar: 
    Job job = new Job(conf, "userscore"); 
    job.setJarByClass(UserScore.class); 

    //Set the map and reduce classes in the job: 
    job.setMapperClass(Map.class); 
    job.setReducerClass(Reduce.class); 
    job.setCombinerClass(Reduce.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    //job.setMapOutputKeyClass(Text.class); 
    //job.setMapOutputValueClass(ScoreWritable.class); 

    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    job.setNumReduceTasks(4); 

    //Set the input and the output path from the arguments 
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    //Run the job and wait for its completion 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 

} 

テキストファイルから読み込むMapreduceコードを作成しようとしています。テキストファイルには、各行に1組の文字列があり、これらの文字列はソーシャルネットワーク内のユーザー名を表し、最初の文字列は2番目のユーザーに従います。フォロワーの総数を計算し、各ユーザーのユーザー名に続いて、これらの2つの数値を乗算して、各ユーザーのスコアの一種を作成しようとしています。

考えられるのは、値の書き込み可能なカスタムクラス(ScoreWritable)を作成し、ユーザー名をテキストキーとして、値をScoreWritableクラスとして送信することです。 Reduceの出力を変更して定数 "2"を出力したことに気付いた場合は、チェックしてみてください。しかし、出力は上記のようになります。

私は間違っていますか?

私は仮想マシンでClouderaイメージを使用してjarファイルをコンパイルして実行しています。

+0

はあなたがデバッグ出力は何ですか値2?そしてあなたの出力をどのように出力しますか?UserScoreの列がtoString()呼び出しの結果と同じであると思われます – gtosto

+0

@gtosto出力は4つのテキストファイルで構成されています。これらのファイルのいずれかからのテキストが含まれ、残りの3つは空です。私は4つの作業を減らしたので、出力はテキストファイルとして4つの部分に分けられるべきだと思っていました。私がマッパーから受け取ったのと同じ値を出力しようとすると、私はあなたが上に示したのと同じ出力を得ます。 – BlueTile

答えて

0

TextOutputFormatは、カスタムScoreWritableの印刷方法(テキスト)を知らず、実際にはScoreWritableインスタンスの文字列表現を出力するだけです。私が知っている 最速の回避策は、例

public String toString() { 
    return "" + N.get() + "\t" + M.get(); 
} 

それとも、独自のカスタムOUTPUTFORMATを書くことができためScoreWritableののtoString()メソッドをオーバーライドすることです。例えばhereを参照してください

が、これはだから私は、コードを動作させるために管理

+0

試してみましたが、このエラーが表示されます: "エラー:java.io.IOException:型がマップの値と一致しません:予期されるorg.apache.hadoop.io.IntWritable、受信org.mapreduce.userscore.UserScore $ ScoreWritable"なぜそれはIntWritableを期待していますか?あなたの提案した回避策には、2行目に特別な( ")が付いていましたか? – BlueTile

+0

愚かな質問を申し訳ありません。マッパーとレデューサーの設定を確認しましたか? – gtosto

+0

マップの内容を意味していますか?私はいくつか疑念を抱いています:カスタムクラスでは、equals()メソッドをオーバーライドしていませんでした。そして、hashCode()メソッドが正しいかどうかわかりません(Iterable ?iterator またはScoreWritable?) – BlueTile

0

に役立ちます願っています。 ToStringメソッドを使用して減速機の変数の()

  • 間違った使い方を提案して@gtostoため

    1. カスタムクラス(私は推測する)内のデータの流れを管理する、感謝:あなたが見ることができるようにいくつかの問題がありました。
    2. Reducerで間違った反復方法。

    MapperとReducerの間のネットワークフローを最適化するために別のCombinerクラスも追加しました。

    これは、最終的なコードである:(コメント付き)

    package org.mapreduce.userscore; 
    
    import java.io.*; 
    import java.util.*; 
    
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.conf.*; 
    import org.apache.hadoop.io.*; 
    import org.apache.hadoop.mapreduce.*; 
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
    
    
    public class UserScore { 
        // Defining a custum class that contains two IntWritabe values 
        // this custom class will be used to hold the Value part of the key-value pairs between the mapper and reducers 
    
    public static class ScoreWritable implements Writable { 
          private IntWritable N; 
          private IntWritable M; 
    
          //Default Constructor 
          public ScoreWritable() { 
           this.N = new IntWritable(); 
           this.M = new IntWritable(); 
          } 
    
          //Custom constructor 
          public ScoreWritable(IntWritable N, IntWritable M){ 
           this.N = N; 
           this.M = M; 
          } 
    
          //Setter method to set the values of ScoreWritable objects 
          public void set(IntWritable NN,IntWritable MM) { 
           this.N = NN; 
           this.M = MM; 
          } 
    
          //to get the first object from Score Record 
          public IntWritable getN() { 
           return N; 
          } 
    
          //to get the second object from Score Record 
          public IntWritable getM() { 
           return M; 
          } 
    
          @Override 
          //overriding default readFields method. 
          //It de-serializes the byte stream data 
          public void readFields(DataInput in) throws IOException { 
           N.readFields(in); 
           M.readFields(in); 
          } 
    
          @Override 
          //It serializes object data into byte stream data 
          public void write(DataOutput out) throws IOException { 
           N.write(out); 
           M.write(out); 
          } 
    
          @Override 
          //OrganizING the data stream in this custom class 
          public String toString() { 
           return "" + N.get() + "\t" + M.get(); 
          } 
    
    
          @Override 
          public int hashCode() { 
           return N.hashCode(); 
          } 
    
    } 
    
    public static class Map extends Mapper<LongWritable, Text, Text, ScoreWritable> { 
        private Text user = new Text(); 
        private ScoreWritable score = new ScoreWritable(); //variabe sscore will hold the pair (N,M) for eatch user 
        private IntWritable NN = new IntWritable(); 
        private IntWritable MM = new IntWritable(); 
    
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
         int iterator = 1; 
         // tokenizing: variable tokenizer will hold the first username then the second username in each ine of the input text file 
         String line = value.toString(); 
         StringTokenizer tokenizer = new StringTokenizer(line); 
         while (tokenizer.hasMoreTokens()) { 
          user.set(tokenizer.nextToken()); 
          if (iterator == 1) {       // here variabe tokenizer holds the first username 
           NN = new IntWritable(1);     // saying that this user (username1) is folowing ssomeone 
           MM = new IntWritable(0); 
           iterator += 1; 
          } else {          // here variabe tokenizer will hold the second username 
           NN = new IntWritable(0); 
           MM = new IntWritable(1);     // saying that this user (username2) is being followed by someone 
          } 
          score.set(NN,MM);       // giving eiter (1,0) or (0,1) to variable score 
          context.write(user, score);     // assigning variable score for each user in each line 
         } // emitting [Ali, (1,0)] or [Ali, (0,1)] means that Ali is following someone or being followed by someone, respectively. 
        }  // next: the Reducer will go through all the values for each key, sum the total internal values of the key. 
    } 
    
        public static class Combine extends Reducer<Text, ScoreWritable, Text, ScoreWritable> { 
         private IntWritable resultf = new IntWritable(); 
         private IntWritable NNN = new IntWritable(); 
         private IntWritable MMM = new IntWritable(); 
         public void reduce(Text key, Iterable<ScoreWritable> values, Reducer<Text, ScoreWritable, Text, ScoreWritable>.Context context) 
           throws IOException, InterruptedException { 
          int sum1 = 0; 
          int sum2 = 0; 
          for (ScoreWritable val:values) { 
           sum1 += val.getN().get(); 
           sum2 += val.getM().get(); 
          } 
          NNN = new IntWritable(sum1); 
          MMM = new IntWritable(sum2); 
          context.write(key, new ScoreWritable(NNN, MMM)); // this will combine all the values for each key before emitting the new pairs to Reduce function 
         } 
        } 
    
    public static class Reduce extends Reducer<Text, ScoreWritable, Text, IntWritable> { 
        private IntWritable resultf = new IntWritable(); 
        public void reduce(Text key, Iterable<ScoreWritable> values, Reducer<Text, ScoreWritable, Text, IntWritable>.Context context) 
          throws IOException, InterruptedException { 
         int sum3 = 0; 
         int sum4 = 0; 
         for (ScoreWritable val:values) { 
          sum3 = val.getN().get();    // if the current user is following 20 people, then Sum3 = 20 
          sum4 = val.getM().get();    // if the current user is being followed by 30 people, then Sum4 = 30 
         } 
         int result = sum3 * sum4; 
         resultf.set(result); 
         context.write(key, resultf);    // this will emit the current user and his/her corresponding score 
        } 
    } 
    
    public static void main(String[] args) throws Exception { 
        Configuration conf = new Configuration(); 
    
        //Create a new Jar and set the driver class(this class) as the main class of jar: 
        Job job = new Job(conf, "userscore"); 
        job.setJarByClass(UserScore.class); 
    
        //Set the map and reduce classes in the job: 
        job.setMapperClass(Map.class); 
        job.setReducerClass(Reduce.class); 
        job.setCombinerClass(Combine.class);     //activated unique combiner class which is different than the Reducer's IO is different 
    
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(IntWritable.class); 
    
        job.setMapOutputKeyClass(Text.class);    //assigning output class for mapper since it is different than the Reducer's output class 
        job.setMapOutputValueClass(ScoreWritable.class); 
    
        job.setInputFormatClass(TextInputFormat.class); 
        job.setOutputFormatClass(TextOutputFormat.class); 
    
        job.setNumReduceTasks(4);       //assigning 4 reducers 
    
        //Set the input and the output path from the arguments 
        FileInputFormat.addInputPath(job, new Path(args[0])); 
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    
        //Run the job and wait for its completion 
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
    
    } 
    

    そして、これは4出力テキストファイルの1の一部です:

    user0 2745 
    user1001 18724 
    user1005 2405 
    user1009 16577 
    user1012 1710 
    user1016 10074 
    user1023 2173 
    user1027 791 
    
  • 関連する問題