Snippet Java for Hadoop
With snippet Java for Hadoop, you can free your mind
Job template
public class Summarizer extends Configured implements Tool { private Configuration conf; public static void main(String[] args) throws Throwable { ToolRunner.run(new Configuration(), new Summarizer(), args); } @Override public int run(String[] args) throws Exception { conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); Job job = new Job(conf); job.setJarByClass(Summarizer.class); job.setJobName("Process Summarizer Job"); FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); FileInputFormat.setInputPathFilter(job, SummarizerPathFilter.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); FileOutputFormat.setCompressOutput(job, true); DistributedCache.addCacheFile(new Path(args[3]).toUri(), job.getConfiguration()); job.setMapperClass(SummarizerMapper.class); job.setReducerClass(SummarizerReducer.class); job.setNumReduceTasks(1); // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); boolean completion = job.waitForCompletion(true); FileSystem.get(new Path(actionTmp).toUri(), conf).delete(new Path(actionTmp), true); FileSystem.get(new Path(clickTmp).toUri(), conf).delete(new Path(clickTmp), true); return completion ? 1 : 0; } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; } }
MultipleInputs
a. Declare
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, FirstMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, SecondMapper.class);
b. Process mapper to pidual data - Add more pidual character in map output key
context.write(new Text("A" + value.toString()), null); context.write(new Text("B" + value.toString()), null);
c. pidual data by pidual character in reducer
if (value.charAt(0) == 'A') { first.add(value.toString().substring(1, value.length)); } else { second.add(value.toString().substring(1, value.length)); }
DistributedCache
a. Add file to cache (Job)
DistributedCache.addCacheFile(new Path(args[3]).toUri(), job.getConfiguration());
b. Get file from cache (Mapper, Reducer, Filter)
Path[] cachedFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); for (Path p : files) { BufferedReader reader = new BufferedReader(new FileReader(p.toString())); BufferedReader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(new File(p.toString()))))); String line = reader.readLine(); // pass first line while ((line = reader.readLine()) != null) { String[] lines = line.split(","); } // read line by line }
Pass parameters to mapper and reducer
a. Declare (Job)
conf.setLong("longNumber", 1987); conf.set("str", "string value");
b. Usage (Mapper, Reducer, Filter)
number = conf.getLong("longNumber", 0); string = conf.get("longNumber");
Filter template
public class ClickPathFilter extends Configured implements PathFilter { private Long fromTime, toTime; @Override public boolean accept(Path path) { if (path.toString().endsWith(".gz")) { SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd"); String[] items = path.toString().split("_"); int lastIndex = items.length - 1; Date clickDate = format.parse(items[lastIndex - 1]); int hour = Integer.parseInt(items[lastIndex].split("\\.")[0]); Long clickTime = clickDate.getTime() + hour * 3600000; if (clickTime >= fromTime && clickTime <= toTime) { return true; } else { return false; } } else { return true; } } @Override public void setConf(Configuration conf) { try { fromTime = conf.getLong("filterFromTime", 0); toTime = conf.getLong("filterToTime", 0); } catch (Exception e) { } } }
Get FileSystem from Path
Path p = new Path("s3n://some_bucket/some_file"); FileSystem fs = p.getFileSystem(someConf);
Read data from FileSystem
FileSystem fs = FileSystem.get(someConf); FileStatus[] status = fs.listStatus(new Path("s3n://some_bucket/some_file")); for (int i = 0; i < status.length; i++) { BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(status[i].getPath()))); BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(fs.open(status[i].getPath())))); }
Get file path in processing by map or reduce
// initialize FileSplit fileSplit = (FileSplit)context.getInputSplit(); // get file path fileSplit.getPath();
FsShell
// initialize FsShell command = new FsShell(conf); // remove s3 directory command.run(new String[] { "-rmr", tmp }); // move file or rename // rename if source file and destination file are same directory command.run(new String[] { "-mv", src, dst }); // upload file to s3 command.run(new String[] { "-put", src, dst }); // download file from s3 command.run(new String[] { "-get", src, dst });
ShellCommandExecutor
// create command String[] command = { "ls", "/usr" }; // initialize ShellCommandExecutor shell = new ShellCommandExecutor(command); // execute shell.execute();
StringUtils (Hadoop Utils)
// given an array of strings, return a comma-separated list of its elements public static String arrayToString(String[] strs) // format a percentage for presentation to the user public static String formatPercent(double done, int digits) // returns a collection of strings (comma seperated string values) public static Collection
getStringCollection(String str) // returns an arraylist of strings public static String[] getStrings(String str) // splits a comma separated value String, trimming leading and trailing whitespace on each value public static Collection getTrimmedStringCollection(String str) // concatenates strings, using a separator public static String join(CharSequence separator, Iterable strings) // split a string using the default separator public static String[] split(String str) // split a string using the given separator public static String[] split(String str, char escapeChar, char separator) Optimize MapReduce Performance
- REGULAR EXPRESSIONS: avoid Regular Expressions where possible
- STRING TOKENIZATION: use the StringUtils class
- OBJECT REUSE:
Text outputValue = new Text(); outputValue.set(parts[0]);
- STRING CONCATENATION: use the StringBuilder class
Counter
A named counter that tracks the progress of a map/reduce job. Counters represent global counters, defined either by the Map-Reduce framework or applications. Each Counter is named by an Enum and has a long for the value. Counters are bunched into Groups, each comprising of counters from a particular Enum class.
// get Counter value context.getCounter("z", "limit").getValue() // set Counter value context.getCounter("z", "limit").increment(1L);
Comments
Post a Comment