需求:
编写MapReduce程序算出高峰时间段(如9-10点)哪张表被访问的最频繁的表,以及这段时间访问这张表最多的用户,以及这个用户访问这张表的总时间开销。
测试数据:
TableName(表名),Time(时间),User(用户),TimeSpan(时间开销)
t003 6:00 u002 180t003 7:00 u002 180
t003 7:08 u002 180t003 7:25 u002 180t002 8:00 u002 180t001 8:00 u001 240t001 9:00 u002 300t001 9:11 u001 240
t003 9:26 u001 180t001 9:39 u001 300 *t001 10:00 u001 200代码
方法一:
package com.table.main;import java.io.IOException;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableUsed { public static class MRMapper extends Mapper{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().substring(1).split("\\s+"); Long time = Long.parseLong(split[1].charAt(0) + ""); // 筛选9-10点使用过的表 if (time == 9 || time == 10) { context.write(new Text(split[0]), new Text(split[2] + ":" + split[3])); } } } public static class MRReducer extends Reducer { // 存放使用量最大的表的表名及用户 public static HashMap > map = new HashMap >(); // 最大用使用量 public static int max_used_num = 0; // 使用量最大的表 public static String table = ""; protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { HashMap user_map = new HashMap (); int table_used_num = 0; for (Text t : values) { table_used_num++; String[] split = t.toString().split(":"); // 如map中已经存在的用户则把使用时间叠加 不存在则添加该用户 if (user_map.get(split[0]) == null) { user_map.put(split[0], Integer.parseInt(split[1])); } else { Integer use_time = user_map.get(split[0]); use_time += Integer.parseInt(split[1]); user_map.put(split[0], use_time); } } if (table_used_num > max_used_num) { map.put(key.toString(), user_map); table = key.toString(); max_used_num = table_used_num; } } protected void cleanup(Context context) throws IOException, InterruptedException { // 循环map,查出使用时间最长的用户信息 HashMap map2 = map.get(table); int max = 0; String max_used_user = ""; for (HashMap.Entry m : map2.entrySet()) { if (m.getValue() > max) { max = m.getValue(); max_used_user = m.getKey(); } } context.write(new Text(table), new Text("\t" + max_used_user + "\t" + map2.get(max_used_user))); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TableUsed.class); job.setMapperClass(MRMapper.class); job.setReducerClass(MRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop5:9000/input/table_time.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop5:9000/output/put2")); System.out.println(job.waitForCompletion(true) ? 1 : 0); }}
缺点:只算出使用时间最长的用户,没有判断该用户是否是使用次数最多的
方法二:
package com.table.main;import java.io.IOException;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableUsed { public static class MRMapper extends Mapper{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().substring(1).split("\\s+"); Long time = Long.parseLong(split[1].charAt(0) + ""); // 筛选9-10点使用过的表 if (time == 9 || time == 10) { context.write(new Text(split[0]), new Text(split[2] + ":" + split[3])); } } } public static class MRReducer extends Reducer { // 表的最大使用次数 使用该表最多的用户 public static int max_used_num = 0, max_user_used = 0; // 使用量最大的表 使用该表最多的用户名 public static String max_used_table = "", user_name = ""; // 使用次数最多的用户的 使用时间 public static Integer user_used_time = 0; protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { HashMap user_map = new HashMap (); HashMap user_used_map = new HashMap (); int table_used_num = 0;// 表的使用次数 Integer use_num = 0;// 用户使用次数 Integer use_time = 0;//使用时间 String username = "";//用户名 for (Text t : values) { table_used_num++; String[] split = t.toString().split(":"); // 如map中已经存在的用户则把使用时间叠加 不存在则添加该用户 if (user_map.get(split[0]) == null) { user_map.put(split[0], Integer.parseInt(split[1])); user_used_map.put(split[0], 1); } else { use_time = user_map.get(split[0]); use_time += Integer.parseInt(split[1]); user_map.put(split[0], use_time); use_num = user_used_map.get(split[0]); use_num ++; user_used_map.put(split[0], use_num); } /** * 判断该用户是否为此表使用次数最多的, * 是则存进user_map和user_used_map,否则不存; * 由于只需要求使用量最多的用户,因此使用量不是最多用户没有必要存在于map中 */ if (use_num > max_user_used) { username = split[0]; max_user_used = use_num; user_used_time = use_time; //此处也可以不remove() user_used_map.remove(split[0]); user_map.remove(split[0]); } } if (table_used_num > max_used_num) { max_used_table = key.toString(); max_used_num = table_used_num; user_name = username; } } protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new Text(max_used_table), new Text(max_user_used + "\t" + user_name + "\t" + user_used_time)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TableUsed.class); job.setMapperClass(MRMapper.class); job.setReducerClass(MRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop5:9000/input/table_time.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop5:9000/output/put6")); System.out.println(job.waitForCompletion(true) ? 1 : 0); }}