2016-06-19 5 views
2

mapreduce & hadoop worldの新機能で、基本的なマップリダクションプログラムを試した後、私はcompositekeyサンプルコードを試したかったのです。mapreduce compositeキーサンプル - 希望の出力が表示されない

次のように入力データセットは次のとおりです。

国、州、郡、populationinmillions

アメリカ、カリフォルニア州、アラメダ、100

米国、CA、losangels、200

USA 、CA、サクラメント、100

アメリカ、フロリダ、xxx、10

米国、FL、YYY、12

所望の出力データは次のようにする必要があります:

米国、CA、500

米国、FL、22

ここでは代わりに国+状態フィールドはコンポジットキーを形成します。 次の出力が表示されます。人口は何らかの理由で追加されていません。誰かが私がやっている間違いを指摘できますか?また、WriteableComparableインターフェースを実装しているCountry.javaクラスを見てください。その実装で何かが間違っているかもしれません。

USA、CA、100

USA、CA、200

USA、CA、100

USA、FL、10

USA、FL、12

Country + Stateごとに人口が追加されることはありません。

これは、WritableComparableインターフェイスを実装するCountryクラスです。

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.File; 
import java.io.IOException; 
import java.util.Iterator; 
import org.apache.commons.io.FileUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

* The Country class implements WritabelComparator to implements custom sorting to perform group by operation. It 
* sorts country and then state. 
* 
*/ 
public class Country implements WritableComparable<Country> { 

    Text country; 
    Text state; 

    public Country(Text country, Text state) { 
     this.country = country; 
     this.state = state; 
    } 
    public Country() { 
     this.country = new Text(); 
     this.state = new Text(); 

    } 

    /* 
    * (non-Javadoc) 
    * 
    * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) 
    */ 
    public void write(DataOutput out) throws IOException { 
     this.country.write(out); 
     this.state.write(out); 

    } 

    /* 
    * (non-Javadoc) 
    * 
    * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) 
    */ 
    public void readFields(DataInput in) throws IOException { 

     this.country.readFields(in); 
     this.state.readFields(in); 
     ; 

    } 

    /* 
    * (non-Javadoc) 
    * 
    * @see java.lang.Comparable#compareTo(java.lang.Object) 
    */ 
    public int compareTo(Country pop) { 
     if (pop == null) 
      return 0; 
     int intcnt = country.compareTo(pop.country); 
     if (intcnt != 0) { 
      return intcnt; 
     } else { 
      return state.compareTo(pop.state); 

     } 
    } 

    /* 
    * (non-Javadoc) 
    * 
    * @see java.lang.Object#toString() 
    */ 
    @Override 
    public String toString() { 

     return country.toString() + ":" + state.toString(); 
    } 

} 

ドライバプログラム:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.File; 
import java.io.IOException; 
import java.util.Iterator; 

import org.apache.commons.io.FileUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


public class CompositeKeyDriver { 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 


    Configuration conf = new Configuration(); 

    Job job = Job.getInstance(conf, "CompositeKeyDriver"); 

    //first argument is job itself 
    //second argument is location of the input dataset 
    FileInputFormat.addInputPath(job, new Path(args[0])); 

    //first argument is the job itself 
    //second argument is the location of the output path   
    FileOutputFormat.setOutputPath(job, new Path(args[1]));   


    job.setJarByClass(CompositeKeyDriver.class); 

    job.setMapperClass(CompositeKeyMapper.class); 

    job.setReducerClass(CompositeKeyReducer.class); 

    job.setOutputKeyClass(Country.class); 

    job.setOutputValueClass(IntWritable.class); 


    //setting the second argument as a path in a path variable   
    Path outputPath = new Path(args[1]); 

    //deleting the output path automatically from hdfs so that we don't have delete it explicitly    
    outputPath.getFileSystem(conf).delete(outputPath); 


    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 

}

マッパープログラム:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.File; 
import java.io.IOException; 
import java.util.Iterator; 

import org.apache.commons.io.FileUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


    // First two parameters are Input Key and Input Value. Input Key = offset of each line (remember each line is a record). Input value = Line itself 
    // Second two parameters are Output Key and Output value of the Mapper. BTW, the outputs of the mapper are stored in the local file system and not on HDFS. 
    // Output Key = Country object is sent. Output Value = population in millions in that country + state combination 


    public class CompositeKeyMapper extends Mapper<LongWritable, Text, Country, IntWritable> { 

    /** The cntry. */ 
    Country cntry = new Country(); 

    /** The cnt text. */ 
    Text cntText = new Text(); 

    /** The state text. */ 
    Text stateText = new Text(); 

    //population in a Country + State 
    IntWritable populat = new IntWritable(); 

    /** 
    * 
    * Reducer are optional in Map-Reduce if there is no Reducer defined in program then the output of the Mapper 
    * directly write to disk without sorting. 
    * 
    */ 

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

     //Reader will give each record in a line to the Mapper. 
     //That line is split with the de-limiter "," 
     String line = value.toString(); 

     String[] keyvalue = line.split(","); 


     //Country is the first item in the line in each record 
     cntText.set(new Text(keyvalue[0])); 

     //State is the second item in the line in each record 
     stateText.set(keyvalue[1]); 

     //This is the population. BTW, we can't send Java primitive datatypes into Context object. Java primitive data types are not effective in Serialization and De-serialization. 
     //So we have to use the equivalent Writable datatypes provided by mapreduce framework 

     populat.set(Integer.parseInt(keyvalue[3])); 

     //Here you are creating an object of Country class and in the constructor assigning the country name and state 
     Country cntry = new Country(cntText, stateText); 

     //Here you are passing the country object and their population to the context object. 
     //Remember that country object already implements "WritableComparable" interface which is equivalient to "Comparable" interface in Java. That implementation is in Country.java class 
     //Because it implements the WritableComparable interface, the Country objects can be sorted in the shuffle phase. If WritableComparable interface is not implemented, we 
     //can't sort the objects. 

     context.write(cntry, populat); 

    } 
} 

減速プログラム:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.File; 
import java.io.IOException; 
import java.util.Iterator; 

import org.apache.commons.io.FileUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.FloatWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


//Remember the two output parameters of the Mapper class will become the first two input parameters to the reducer class. 

public class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, IntWritable> { 

// The first parameter to reduce method is "Country". The country object has country name and state name (look at the Country.java class for more details. 
// The second parameter "values" is the collection of population for Country+State (this is a composite Key) 

    public void reduce(Country key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException { 

     int numberofelements = 0; 

     int cnt = 0; 

     while (values.hasNext()) { 

      cnt = cnt + values.next().get(); 

     } 

    context.write(key, new IntWritable(cnt)); 

    } 

} 

答えて

1

あなたはそう、あなたのCountryクラスがhashCode()メソッドを実装する必要がありHashPartitionerを使用しています。

現時点では、キーが正しくグループ化されないのデフォルトのhashCode()実装が使用されます。

@Override 
public int hashCode() { 
    final int prime = 31; 
    int result = 1; 
    result = prime * result + ((country == null) ? 0 : country.hashCode()); 
    result = prime * result + ((state == null) ? 0 : state.hashCode()); 
    return result; 
} 

追加情報:

が安全側あなたべきsetテキストオブジェクト上にあるように

は、ここでは一例hashCode()方法です。現時点では、Countryコンストラクタでこれを行います。減速の問題が修正されました

public Country(Text country, Text state) { 
    this.country.set(country); 
    this.state.set(state); 
} 
+0

ハッシュコードメソッドを実装するにはどうすればよいですか?あなたはこのことについて親切に説明したり、私が学ぶことができるリンクを指してくれますか? –

+0

私はあなたのための例を追加しました。迅速なgoogleは、ハッシュコードに関する豊富なリソースを見つけるでしょう。 –

+0

ありがとうございました。私はこれをうまく実装して、あなたに知らせるでしょう –

0

public Country(Text country, Text state) { 
    this.country = country; 
    this.state = state; 
} 

あなたはこれを変更する必要があります。私はコードを変更しなかった。私がしたのは、Cloudera Hadoopのイメージを再開することだけでした。

私はデバッグの過程で次のことに気付きました。誰かがこれらの観測にコメントできますか?

  1. 頻繁にコードを変更し、jarファイルを作成してmapreduce jarプログラムを実行しても出力に反映されません。これは常に起こっているわけではありません。 hadoopデーモンをしばらくして再起動する必要があるかどうかは不明です。
関連する問題