// JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera
// PATH=/usr/java/jdk1.7.0_67-cloudera/bin
// export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

// hadoop
// jar cf WordCount2.jar WordCount2*.class
// hadoop jar WordCount2.jar WordCount2 big.txt outWCBig
// hadoop jar WordCount2.jar WordCount2 t8.shakespeare.txt outWCShake

// t8.shakespeare

// Remove the previous results.
// $ hadoop fs -rm -r -f /user/cloudera/wordcount/output

import* ;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.jobWordCount;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class WordCount2 extends Configured implements Tool
	private static final Logger LOG = Logger.getLogger(WordCount2.class);
	public static void main(String[] args) throws Exception {
        int exitCode = WordCount2(), args);
	public static boolean isNullOrEmpty(String str) {
        if(str != null && !str.trim().isEmpty())
            return false;
        return true;
  public static class PunctuationMapper 
      extends Mapper<Object, Text, NullWritable, Text> {

    private Text punctd = new Text();  
	//private static fin//al String PunctuationMarks="\"\'\\[\\]\\\\!$&@~#%:;`<>(){}/!|?*-+=^,.";
	private static final String PunctuationMarks="\"\\[\\]\\\\!$&@~#%:;`<>(){}/!|?*-+=^,.";
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String word= value.toString();
	  word=word.replaceAll("-", "");
	  //word=word.replaceAll("'", "");
	  word=word.replaceAll("["+PunctuationMarks+"]", " ");
	  //word = word.replaceAll("[^a-zA-Z\\s+]", " ").toLowerCase();
	  //word = curr.trim();
	  //punctd.set(Regex.Replace(word.toString(),[^\w\d\s-]," "));
	  //String word_punc=word.replaceAll("[^\\w\\d\\s-]", " ");
	  //word=word.replace( "/\s\s+/g", "" );//     ->  390ms
	  context.write(new IntWritable(1), punctd);	  
   public static class TrimMapper 
      extends Mapper<Object, Text, NullWritable, Text> {

    private Text trimd = new Text();  	
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String word= value.toString().trim();	  	  
	  //word=word.replaceAll( "/\s\s+/g", "" );//     ->  390ms
      word=word.replaceAll("^ +| +$|( )+", "$1");
	  context.write(new IntWritable(1), trimd);	  
   public static class LowerCaseMapper 
      extends Mapper<Object, Text, NullWritable, Text> {

    private Text lowercased = new Text();
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        context.write(new IntWritable(1), lowercased);

  public static class TokenizerMapper
       extends Mapper<IntWritable, Text, Text, IntWritable>{
	private static final java.util.regex.Pattern WORD_BOUNDARY = java.util.regex.Pattern.compile("\\s");
	private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(IntWritable key, Text lineText, Context context
                    ) throws IOException, InterruptedException {
	  String line = lineText.toString();
      Text currentWord = new Text();
      for (String word : WORD_BOUNDARY.split(line)) {
        if (WordCount2.isNullOrEmpty(word)) {
            currentWord = new Text(word);

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key_word, Iterable<IntWritable> counts,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable count : counts) {
        sum += count.get();
      context.write(key_word, result);
  public static class TopTenMapper extends Mapper<Object, Text, NullWritable, Text>
        private TreeMap<Integer, Text> topN = new TreeMap<Integer, Text>(); //Collections.reverseOrder()

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context)
               throws IOException, InterruptedException {
            // (word, count) tuple
            String[] words = value.toString().split("\t") ;
                if (words.length < 2) {

            topN.put(Integer.parseInt(words[1]), new Text(value));

            if (topN.size() > 10) {

        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            for (Text t : topN.values()) {
                context.write(NullWritable.get(), t);
  public static class TopTenReducer extends
            Reducer<NullWritable, Text, NullWritable, Text> {

        private TreeMap<Integer, Text> topN = new TreeMap<Integer, Text>();

        public void reduce(NullWritable key, Iterable<Text> values,
                           Context context) throws IOException, InterruptedException {
            for (Text value : values) {
                String[] words = value.toString().split("\t") ;

                topN.put(Integer.parseInt(words[1]), new Text(value));

                if (topN.size() > 10) {

            for (Text word : topN.descendingMap().values()) {
                context.write(NullWritable.get(), word);

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
		FileSystem fs = FileSystem.get(conf);
		Path tmpPath = new Path("/w1/tmp");
		fs.delete(tmpPath, true);
		Path inputPath = new Path(args[0]);
		//Path partitionFile = new Path(args[1] + "_partitions.lst");
		//Path outputStage = new Path(args[1] + "_staging");
		Path outputStage = new Path("/w1/tmp");
		Path outputOrder = new Path(args[1]);

        args = new GenericOptionsParser(conf, args).getRemainingArgs();
		// creating a word count jobWordCount
        jobWordCount jobWordCount = jobWordCount.getInstance(conf,"wordcount");
		//jobWordCount jobWordCount = jobWordCount.getInstance(getConf(), "wordcount");
    // Use TextInputFormat, the default unless jobWordCount.setInputFormatClass is used
		//public static class PunctuationMapper 
		//extends Mapper<Object, Text, NullWritable, Text> 
		Configuration punctuationMapperConf = new Configuration(false);
          Object.class, Text.class,
          IntWritable.class, Text.class,
		  //public static class TrimMapper 
      //extends Mapper<Object, Text, NullWritable, Text>
		  Configuration trimMapperConf = new Configuration(false);
          Object.class, Text.class,
          IntWritable.class, Text.class,
		//public static class LowerCaseMapper 
      //extends Mapper<Object, Text, NullWritable, Text> 		
		Configuration lowerCaseMapperConf = new Configuration(false);
          Object.class, Text.class,
		  //IntWritable.class, Text.class,
          IntWritable.class, Text.class,
		//public static class TokenizerMapper
      // extends Mapper<IntWritable, Text, Text, IntWritable>
        Configuration tokenizerConf = new Configuration(false);
          Text.class, IntWritable.class,

		  //public static class IntSumReducer
       //extends Reducer<Text,IntWritable,Text,IntWritable>	   
        //FileInputFormat.addInputPath(jobWordCount, new Path(args[0]));
        TextInputFormat.setInputPaths(jobWordCount, inputPath);
		//FileOutputFormat.setOutputPath(jobWordCount, new Path(args[1]));
		FileOutputFormat.setOutputPath(jobWordCount, tmpPath);
		// Set the output format to a sequence file
		//SequenceFileOutputFormat.setOutputPath(jobWordCount, outputStage);

		int code = jobWordCount.waitForCompletion(true) ? 0 : 1;
		if (code == 0) {
			//Now that we have extracted column to sort
			Job orderJob = new Job(conf, "TopWords");
			// Here, use the identity mapper to output the key/value pairs in
			// the SequenceFile
			//public static class TopTenMapper 
			//extends Mapper<Object, Text, NullWritable, Text>
			// Set the number of reduce tasks to an appropriate number for the
			// amount of data being sorted
			// Use Hadoop's TotalOrderPartitioner class
			// Set the partition file
			//		partitionFile);
			//public static class TopTenReducer 
			//extends Reducer<NullWritable, Text, NullWritable, Text>
			// Set the input to the previous job's output
			//SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
			// Set the output path to the command line parameter
			TextOutputFormat.setOutputPath(orderJob, outputOrder);
			// Set the separator to an empty string
			//		"mapred.textoutputformat.separator", "");
			// Use the InputSampler to go through the output of the previous
			// job, sample it, and create the partition file
			//		new InputSampler.RandomSampler(.1, 10000));
			  FileInputFormat.setInputPaths(orderJob, tmpPath);
			  FileOutputFormat.setOutputPath(orderJob, new Path(args[1]));

			  // Submit the job
			code = orderJob.waitForCompletion(true) ? 0 : 2;

		// Clean up the partition file and the staging directory
		// FileSystem.get(new Configuration()).delete(partitionFile, false);
		// FileSystem.get(new Configuration()).delete(outputStage, true);
        return (jobWordCount.waitForCompletion(true) ? 0 : 1);
