Snippet Java for Hadoop

With snippet Java for Hadoop, you can free your mind

  1. Job template

    public class Summarizer extends Configured implements Tool {
      private Configuration conf;
      public static void main(String[] args) throws Throwable {
        ToolRunner.run(new Configuration(), new Summarizer(), args);
      }
      @Override
      public int run(String[] args) throws Exception {
        conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
        Job job = new Job(conf);
        job.setJarByClass(Summarizer.class);
        job.setJobName("Process Summarizer Job");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileInputFormat.setInputPathFilter(job, SummarizerPathFilter.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        FileOutputFormat.setCompressOutput(job, true);
        DistributedCache.addCacheFile(new Path(args[3]).toUri(), job.getConfiguration());
        job.setMapperClass(SummarizerMapper.class);
        job.setReducerClass(SummarizerReducer.class);
        job.setNumReduceTasks(1);
        // job.setMapOutputKeyClass(Text.class);
        // job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        boolean completion = job.waitForCompletion(true);
        FileSystem.get(new Path(actionTmp).toUri(), conf).delete(new Path(actionTmp), true);
        FileSystem.get(new Path(clickTmp).toUri(), conf).delete(new Path(clickTmp), true);
        return completion ? 1 : 0;
      }
      @Override
      public Configuration getConf() {
        return conf;
      }
      @Override
      public void setConf(Configuration conf) {
        this.conf = conf;
      }
    }
    
  2. MultipleInputs

    a. Declare

    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, FirstMapper.class);
    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, SecondMapper.class);
    

    b. Process mapper to pidual data - Add more pidual character in map output key

    context.write(new Text("A" + value.toString()), null);
    context.write(new Text("B" + value.toString()), null);
    

    c. pidual data by pidual character in reducer

    if (value.charAt(0) == 'A') {
      first.add(value.toString().substring(1, value.length));
    } else {
      second.add(value.toString().substring(1, value.length));
    }
    
  3. DistributedCache

    a. Add file to cache (Job)

    DistributedCache.addCacheFile(new Path(args[3]).toUri(), job.getConfiguration());
    

    b. Get file from cache (Mapper, Reducer, Filter)

    Path[] cachedFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
    for (Path p : files) {
      BufferedReader reader = new BufferedReader(new FileReader(p.toString()));
      BufferedReader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(new File(p.toString())))));
      String line = reader.readLine(); // pass first line
      while ((line = reader.readLine()) != null) { String[] lines = line.split(","); } // read line by line
    }
    
  4. Pass parameters to mapper and reducer

    a. Declare (Job)

    conf.setLong("longNumber", 1987);
    conf.set("str", "string value");
    

    b. Usage (Mapper, Reducer, Filter)

    number = conf.getLong("longNumber", 0);
    string = conf.get("longNumber");
    
  5. Filter template

    public class ClickPathFilter extends Configured implements PathFilter {
      private Long fromTime, toTime;
      @Override
      public boolean accept(Path path) {
        if (path.toString().endsWith(".gz")) {
          SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
          String[] items = path.toString().split("_");
          int lastIndex = items.length - 1;
          Date clickDate = format.parse(items[lastIndex - 1]);
          int hour = Integer.parseInt(items[lastIndex].split("\\.")[0]);
          Long clickTime = clickDate.getTime() + hour * 3600000;
          if (clickTime >= fromTime && clickTime <= toTime) {
            return true;
          } else {
            return false;
          }
        } else {
          return true;
        }
      }
      @Override
      public void setConf(Configuration conf) {
        try {
          fromTime = conf.getLong("filterFromTime", 0);
          toTime = conf.getLong("filterToTime", 0);
        } catch (Exception e) {
        }
     }
    }
    
  6. Get FileSystem from Path

    Path p = new Path("s3n://some_bucket/some_file");
    FileSystem fs = p.getFileSystem(someConf);
    
  7. Read data from FileSystem

    FileSystem fs = FileSystem.get(someConf);
    FileStatus[] status = fs.listStatus(new Path("s3n://some_bucket/some_file"));
    for (int i = 0; i < status.length; i++) {
      BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
      BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(fs.open(status[i].getPath()))));
    }
    
  8. Get file path in processing by map or reduce

    // initialize
    FileSplit fileSplit = (FileSplit)context.getInputSplit();
    // get file path
    fileSplit.getPath();
    
  9. FsShell

    // initialize
    FsShell command = new FsShell(conf);
    // remove s3 directory
    command.run(new String[] { "-rmr", tmp });
    // move file or rename
    // rename if source file and destination file are same directory
    command.run(new String[] { "-mv", src, dst });
    // upload file to s3
    command.run(new String[] { "-put", src, dst });
    // download file from s3
    command.run(new String[] { "-get", src, dst });
    
  10. ShellCommandExecutor

    // create command
    String[] command = { "ls", "/usr" };
    // initialize
    ShellCommandExecutor shell = new ShellCommandExecutor(command);
    // execute
    shell.execute();
    
  11. StringUtils (Hadoop Utils)

    // given an array of strings, return a comma-separated list of its elements
    public static String arrayToString(String[] strs)
    // format a percentage for presentation to the user
    public static String formatPercent(double done, int digits)
    // returns a collection of strings (comma seperated string values)
    public static Collection getStringCollection(String str)
    // returns an arraylist of strings
    public static String[] getStrings(String str)
    // splits a comma separated value String, trimming leading and trailing whitespace on each value
    public static Collection getTrimmedStringCollection(String str)
    // concatenates strings, using a separator
    public static String join(CharSequence separator, Iterable strings)
    // split a string using the default separator
    public static String[] split(String str)
    // split a string using the given separator
    public static String[] split(String str, char escapeChar, char separator)
    
  12. Optimize MapReduce Performance

    • REGULAR EXPRESSIONS: avoid Regular Expressions where possible
    • STRING TOKENIZATION: use the StringUtils class
    • OBJECT REUSE:
      Text outputValue = new Text();
      outputValue.set(parts[0]);
      
    • STRING CONCATENATION: use the StringBuilder class
  13. Counter

    A named counter that tracks the progress of a map/reduce job. Counters represent global counters, defined either by the Map-Reduce framework or applications. Each Counter is named by an Enum and has a long for the value. Counters are bunched into Groups, each comprising of counters from a particular Enum class.

    // get Counter value
    context.getCounter("z", "limit").getValue()
    // set Counter value
    context.getCounter("z", "limit").increment(1L);
    

Comments

Popular posts from this blog

Reduce TIME_WAIT Socket Connections