fork download
//printenv
// JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera
// PATH=/usr/java/jdk1.7.0_67-cloudera/bin
// export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

// hadoop com.sun.tools.javac.Main WordCount3.java
// jar cf WordCount2.jar WordCount2*.class
// hadoop jar WordCount2.jar WordCount2 big.txt outWCBig
// hadoop jar WordCount2.jar WordCount2 t8.shakespeare.txt outWCShake

// t8.shakespeare

// Remove the previous results.
// $ hadoop fs -rm -r -f /user/cloudera/wordcount/output
 

import java.io.* ;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.jobWordCount;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class WordCount2 extends Configured implements Tool
{
	private static final Logger LOG = Logger.getLogger(WordCount2.class);
	
	public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WordCount2(), args);
        System.exit(exitCode);
    }
	
	public static boolean isNullOrEmpty(String str) {
        if(str != null && !str.trim().isEmpty())
            return false;
        return true;
    }
	
 
  public static class PunctuationMapper 
      extends Mapper<Object, Text, NullWritable, Text> {

    private Text punctd = new Text();  
	//private static fin//al String PunctuationMarks="\"\'\\[\\]\\\\!$&@~#%:;`<>(){}/!|?*-+=^,.";
	private static final String PunctuationMarks="\"\\[\\]\\\\!$&@~#%:;`<>(){}/!|?*-+=^,.";
	//DON'T
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String word= value.toString();
	  word=word.replaceAll("-", "");
	  //word=word.replaceAll("'", "");
	  word=word.replaceAll("["+PunctuationMarks+"]", " ");
	  //word = word.replaceAll("[^a-zA-Z\\s+]", " ").toLowerCase();
	  //word = curr.trim();
	  //punctd.set(Regex.Replace(word.toString(),[^\w\d\s-]," "));
	  //String word_punc=word.replaceAll("[^\\w\\d\\s-]", " ");
	  //word=word.replace( "/\s\s+/g", "" );//     ->  390ms
	  punctd.set(word);      
	  context.write(new IntWritable(1), punctd);	  
	  }
  } 
  
   public static class TrimMapper 
      extends Mapper<Object, Text, NullWritable, Text> {

    private Text trimd = new Text();  	
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String word= value.toString().trim();	  	  
	  //word=word.replaceAll( "/\s\s+/g", "" );//     ->  390ms
      word=word.replaceAll("^ +| +$|( )+", "$1");
	  trimd.set(word);      
	  context.write(new IntWritable(1), trimd);	  
	  }
  }   
   public static class LowerCaseMapper 
      extends Mapper<Object, Text, NullWritable, Text> {

    private Text lowercased = new Text();
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        lowercased.set(value.toString().toLowerCase());
        context.write(new IntWritable(1), lowercased);
    }
  }  

  public static class TokenizerMapper
       extends Mapper<IntWritable, Text, Text, IntWritable>{
	   
	private static final java.util.regex.Pattern WORD_BOUNDARY = java.util.regex.Pattern.compile("\\s");
    
	private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(IntWritable key, Text lineText, Context context
                    ) throws IOException, InterruptedException {
	  String line = lineText.toString();
      Text currentWord = new Text();
      for (String word : WORD_BOUNDARY.split(line)) {
        if (WordCount2.isNullOrEmpty(word)) {
            continue;
        }
            currentWord = new Text(word);
            context.write(currentWord,one);
        }      
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key_word, Iterable<IntWritable> counts,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable count : counts) {
        sum += count.get();
      }
      result.set(sum);
      context.write(key_word, result);
    }
  }
  //////////////////////////////////////
  public static class TopTenMapper extends Mapper<Object, Text, NullWritable, Text>
    {
        private TreeMap<Integer, Text> topN = new TreeMap<Integer, Text>(); //Collections.reverseOrder()

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context)
               throws IOException, InterruptedException {
            // (word, count) tuple
            String[] words = value.toString().split("\t") ;
                if (words.length < 2) {
                    return;
            }

            topN.put(Integer.parseInt(words[1]), new Text(value));

            if (topN.size() > 10) {
                    topN.remove(topN.firstKey());
					//topN.remove(topN.lastKey());
            }
        }	
	

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            for (Text t : topN.values()) {
                context.write(NullWritable.get(), t);
            }
        }
    }
  
  public static class TopTenReducer extends
            Reducer<NullWritable, Text, NullWritable, Text> {

        private TreeMap<Integer, Text> topN = new TreeMap<Integer, Text>();

        @Override
        public void reduce(NullWritable key, Iterable<Text> values,
                           Context context) throws IOException, InterruptedException {
            for (Text value : values) {
                String[] words = value.toString().split("\t") ;

                topN.put(Integer.parseInt(words[1]), new Text(value));

                if (topN.size() > 10) {
                    topN.remove(topN.firstKey());
                }
            }

            for (Text word : topN.descendingMap().values()) {
                context.write(NullWritable.get(), word);
            }
        }
    }
  //////////////////////////////////////

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
		//////////////
		FileSystem fs = FileSystem.get(conf);
		Path tmpPath = new Path("/w1/tmp");
		fs.delete(tmpPath, true);
		
		Path inputPath = new Path(args[0]);
		//Path partitionFile = new Path(args[1] + "_partitions.lst");
		//Path outputStage = new Path(args[1] + "_staging");
		Path outputStage = new Path("/w1/tmp");
		Path outputOrder = new Path(args[1]);
		//////////////

        args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
		// creating a word count jobWordCount
        jobWordCount jobWordCount = jobWordCount.getInstance(conf,"wordcount");
		//jobWordCount jobWordCount = jobWordCount.getInstance(getConf(), "wordcount");
        //jobWordCount.setJarByClass(WordCount2.class);
		jobWordCount.setJarByClass(this.getClass());
    // Use TextInputFormat, the default unless jobWordCount.setInputFormatClass is used
        
		//public static class PunctuationMapper 
		//extends Mapper<Object, Text, NullWritable, Text> 
		Configuration punctuationMapperConf = new Configuration(false);
        ChainMapper.addMapper(jobWordCount,
          PunctuationMapper.class,
          Object.class, Text.class,
          IntWritable.class, Text.class,
          punctuationMapperConf);
		  
		  //public static class TrimMapper 
      //extends Mapper<Object, Text, NullWritable, Text>
		  Configuration trimMapperConf = new Configuration(false);
        ChainMapper.addMapper(jobWordCount,
          TrimMapper.class,
          Object.class, Text.class,
          IntWritable.class, Text.class,
          trimMapperConf);
		
		//public static class LowerCaseMapper 
      //extends Mapper<Object, Text, NullWritable, Text> 		
		Configuration lowerCaseMapperConf = new Configuration(false);
        ChainMapper.addMapper(jobWordCount,
          LowerCaseMapper.class,
          Object.class, Text.class,
		  //IntWritable.class, Text.class,
          IntWritable.class, Text.class,
          lowerCaseMapperConf);  
		
		//public static class TokenizerMapper
      // extends Mapper<IntWritable, Text, Text, IntWritable>
        Configuration tokenizerConf = new Configuration(false);
        ChainMapper.addMapper(jobWordCount,
          TokenizerMapper.class,
          IntWritable.class,Text.class,
          Text.class, IntWritable.class,
          tokenizerConf);

		  //public static class IntSumReducer
       //extends Reducer<Text,IntWritable,Text,IntWritable>	   
        jobWordCount.setReducerClass(IntSumReducer.class);      
		jobWordCount.setOutputKeyClass(Text.class);
        jobWordCount.setOutputValueClass(IntWritable.class);
		
        //FileInputFormat.addInputPath(jobWordCount, new Path(args[0]));
        TextInputFormat.setInputPaths(jobWordCount, inputPath);
		
		//FileOutputFormat.setOutputPath(jobWordCount, new Path(args[1]));
		FileOutputFormat.setOutputPath(jobWordCount, tmpPath);
		// Set the output format to a sequence file
		//jobWordCount.setOutputFormatClass(SequenceFileOutputFormat.class);
		//SequenceFileOutputFormat.setOutputPath(jobWordCount, outputStage);

		int code = jobWordCount.waitForCompletion(true) ? 0 : 1;
		
		if (code == 0) {
			
			//Now that we have extracted column to sort
						
			Job orderJob = new Job(conf, "TopWords");
			orderJob.setJarByClass(WordCount2.class);
			
			// Here, use the identity mapper to output the key/value pairs in
			// the SequenceFile
			orderJob.setMapperClass(TopTenMapper.class);
			orderJob.setReducerClass(TopTenReducer.class);
			//********************
			//public static class TopTenMapper 
			//extends Mapper<Object, Text, NullWritable, Text>
			
			jobB.setMapOutputKeyClass(NullWritable.class);
			jobB.setMapOutputValueClass(Text.class);
			//********************
			// Set the number of reduce tasks to an appropriate number for the
			// amount of data being sorted
			orderJob.setNumReduceTasks(10);
			// Use Hadoop's TotalOrderPartitioner class
			//orderJob.setPartitionerClass(TotalOrderPartitioner.class);
			// Set the partition file
			//TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
			//		partitionFile);
			//********************
			//public static class TopTenReducer 
			//extends Reducer<NullWritable, Text, NullWritable, Text>
			
			//orderJob.setOutputKeyClass(Text.class);
			//orderJob.setOutputValueClass(IntWritable.class);  
			//********************		
			orderJob.setOutputKeyClass(NullWritable.class);
			orderJob.setOutputValueClass(Text.class);
			//********************
			// Set the input to the previous job's output
			//orderJob.setInputFormatClass(SequenceFileInputFormat.class);
			orderJob.setInputFormatClass(KeyValueTextInputFormat.class);
			orderJob.setOutputFormatClass(TextOutputFormat.class);
			
			//SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
			// Set the output path to the command line parameter
			TextOutputFormat.setOutputPath(orderJob, outputOrder);
			// Set the separator to an empty string
			//orderJob.getConfiguration().set(
			//		"mapred.textoutputformat.separator", "");
			// Use the InputSampler to go through the output of the previous
			// job, sample it, and create the partition file
			//InputSampler.writePartitionFile(orderJob,
			//		new InputSampler.RandomSampler(.1, 10000));
			  FileInputFormat.setInputPaths(orderJob, tmpPath);
			  FileOutputFormat.setOutputPath(orderJob, new Path(args[1]));

			  // Submit the job
			code = orderJob.waitForCompletion(true) ? 0 : 2;
		}

		// Clean up the partition file and the staging directory
		// FileSystem.get(new Configuration()).delete(partitionFile, false);
		// FileSystem.get(new Configuration()).delete(outputStage, true);
		System.exit(code);
		
        return (jobWordCount.waitForCompletion(true) ? 0 : 1);
    }   
} 
Success #stdin #stdout #stderr 0.01s 5292KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
Error: near line 1: near "/": syntax error
Error: near line 18: near "import": syntax error
Error: near line 20: near "import": syntax error
Error: near line 21: near "import": syntax error
Error: near line 22: near "import": syntax error
Error: near line 23: near "import": syntax error
Error: near line 24: near "import": syntax error
Error: near line 25: near "import": syntax error
Error: near line 26: near "import": syntax error
Error: near line 27: near "import": syntax error
Error: near line 28: near "import": syntax error
Error: near line 29: near "import": syntax error
Error: near line 30: near "import": syntax error
Error: near line 31: near "import": syntax error
Error: near line 32: near "import": syntax error
Error: near line 33: near "import": syntax error
Error: near line 34: near "import": syntax error
Error: near line 37: near "import": syntax error
Error: near line 38: near "import": syntax error
Error: near line 39: near "import": syntax error
Error: near line 40: near "import": syntax error
Error: near line 41: near "import": syntax error
Error: near line 42: near "import": syntax error
Error: near line 43: near "import": syntax error
Error: near line 45: near "public": syntax error
Error: near line 49: near "public": syntax error
Error: near line 51: near "System": syntax error
Error: near line 52: unrecognized token: "}"
Error: near line 57: near "return": syntax error
Error: near line 58: unrecognized token: "}"
Error: near line 65: near "/": syntax error
Error: near line 70: near "word": syntax error
Error: near line 71: near "/": syntax error
Error: near line 72: near "word": syntax error
Error: near line 73: near "/": syntax error
Error: near line 74: near "/": syntax error
Error: near line 75: near "/": syntax error
Error: near line 76: near "/": syntax error
Error: near line 77: near "/": syntax error
Error: near line 79: near "context": syntax error
Error: near line 80: unrecognized token: "}"
Error: near line 87: near "public": syntax error
Error: near line 89: near "/": syntax error
Error: near line 91: near "trimd": syntax error
Error: near line 92: near "context": syntax error
Error: near line 93: unrecognized token: "}"
Error: near line 99: near "public": syntax error
Error: near line 101: near "context": syntax error
Error: near line 102: unrecognized token: "}"
Error: near line 110: near "private": syntax error
Error: near line 111: near "private": syntax error
Error: near line 113: near "public": syntax error
Error: near line 116: near "Text": syntax error
Error: near line 117: near "for": syntax error
Error: near line 120: unrecognized token: "}"
Error: near line 122: near "context": syntax error
Error: near line 123: unrecognized token: "}"
Error: near line 131: near "public": syntax error
Error: near line 135: near "for": syntax error
Error: near line 137: unrecognized token: "}"
Error: near line 139: near "context": syntax error
Error: near line 140: unrecognized token: "}"
Error: near line 148: near "private": syntax error
Error: near line 149: near "public": syntax error
Error: near line 153: near "if": syntax error
Error: near line 155: unrecognized token: "}"
Error: near line 159: near "if": syntax error
Error: near line 161: near "/": syntax error
Error: near line 162: unrecognized token: "}"
Error: near line 171: unrecognized token: "}"
Error: near line 180: near "@Override": syntax error
Error: near line 186: near "topN": syntax error
Error: near line 188: near "if": syntax error
Error: near line 190: unrecognized token: "}"
Error: near line 195: unrecognized token: "}"
Error: near line 202: near "/": syntax error
Error: near line 204: near "Path": syntax error
Error: near line 205: near "fs": syntax error
Error: near line 207: near "Path": syntax error
Error: near line 208: near "/": syntax error
Error: near line 209: near "/": syntax error
Error: near line 210: near "Path": syntax error
Error: near line 211: near "Path": syntax error
Error: near line 212: near "/": syntax error
Error: near line 216: near "/": syntax error
Error: near line 218: near "/": syntax error
Error: near line 219: near "/": syntax error
Error: near line 220: near "jobWordCount": syntax error
Error: near line 221: near "/": syntax error
Error: near line 226: near "ChainMapper": syntax error
Error: near line 232: near "/": syntax error
Error: near line 235: near "ChainMapper": syntax error
Error: near line 241: near "/": syntax error
Error: near line 244: near "ChainMapper": syntax error
Error: near line 251: near "/": syntax error
Error: near line 254: near "ChainMapper": syntax error
Error: near line 260: near "/": syntax error
Error: near line 263: near "jobWordCount": syntax error
Error: near line 264: near "jobWordCount": syntax error
Error: near line 266: near "/": syntax error
Error: near line 267: near "TextInputFormat": syntax error
Error: near line 269: near "/": syntax error
Error: near line 270: near "FileOutputFormat": syntax error
Error: near line 271: near "/": syntax error
Error: near line 273: near "/": syntax error
Error: near line 275: near "int": syntax error
Error: near line 277: near "if": syntax error
Error: near line 282: near "orderJob": syntax error
Error: near line 284: near "/": syntax error
Error: near line 287: near "orderJob": syntax error
Error: near line 288: near "/": syntax error