博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce数据筛选
阅读量:7080 次
发布时间:2019-06-28

本文共 6898 字,大约阅读时间需要 22 分钟。

需求:

编写MapReduce程序算出高峰时间段(如9-10点)哪张表被访问的最频繁的表,以及这段时间访问这张表最多的用户,以及这个用户访问这张表的总时间开销。

测试数据:


TableName(表名),Time(时间),User(用户),TimeSpan(时间开销)

t003 6:00 u002 180

t003 7:00 u002 180
t003 7:08 u002 180
t003 7:25 u002 180
t002 8:00 u002 180
t001 8:00 u001 240

t001 9:00 u002 300

t001 9:11 u001 240
t003 9:26 u001 180
t001 9:39 u001 300
*t001 10:00 u001 200


代码

方法一:

package com.table.main;import java.io.IOException;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableUsed {    public static class MRMapper extends Mapper
{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().substring(1).split("\\s+"); Long time = Long.parseLong(split[1].charAt(0) + ""); // 筛选9-10点使用过的表 if (time == 9 || time == 10) { context.write(new Text(split[0]), new Text(split[2] + ":" + split[3])); } } } public static class MRReducer extends Reducer
{ // 存放使用量最大的表的表名及用户 public static HashMap
> map = new HashMap
>(); // 最大用使用量 public static int max_used_num = 0; // 使用量最大的表 public static String table = ""; protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { HashMap
user_map = new HashMap
(); int table_used_num = 0; for (Text t : values) { table_used_num++; String[] split = t.toString().split(":"); // 如map中已经存在的用户则把使用时间叠加 不存在则添加该用户 if (user_map.get(split[0]) == null) { user_map.put(split[0], Integer.parseInt(split[1])); } else { Integer use_time = user_map.get(split[0]); use_time += Integer.parseInt(split[1]); user_map.put(split[0], use_time); } } if (table_used_num > max_used_num) { map.put(key.toString(), user_map); table = key.toString(); max_used_num = table_used_num; } } protected void cleanup(Context context) throws IOException, InterruptedException { // 循环map,查出使用时间最长的用户信息 HashMap
map2 = map.get(table); int max = 0; String max_used_user = ""; for (HashMap.Entry
m : map2.entrySet()) { if (m.getValue() > max) { max = m.getValue(); max_used_user = m.getKey(); } } context.write(new Text(table), new Text("\t" + max_used_user + "\t" + map2.get(max_used_user))); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TableUsed.class); job.setMapperClass(MRMapper.class); job.setReducerClass(MRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop5:9000/input/table_time.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop5:9000/output/put2")); System.out.println(job.waitForCompletion(true) ? 1 : 0); }}
缺点:只算出使用时间最长的用户,没有判断该用户是否是使用次数最多的

方法二:

package com.table.main;import java.io.IOException;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableUsed {    public static class MRMapper extends Mapper
{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().substring(1).split("\\s+"); Long time = Long.parseLong(split[1].charAt(0) + ""); // 筛选9-10点使用过的表 if (time == 9 || time == 10) { context.write(new Text(split[0]), new Text(split[2] + ":" + split[3])); } } } public static class MRReducer extends Reducer
{ // 表的最大使用次数 使用该表最多的用户 public static int max_used_num = 0, max_user_used = 0; // 使用量最大的表 使用该表最多的用户名 public static String max_used_table = "", user_name = ""; // 使用次数最多的用户的 使用时间 public static Integer user_used_time = 0; protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { HashMap
user_map = new HashMap
(); HashMap
user_used_map = new HashMap
(); int table_used_num = 0;// 表的使用次数 Integer use_num = 0;// 用户使用次数 Integer use_time = 0;//使用时间 String username = "";//用户名 for (Text t : values) { table_used_num++; String[] split = t.toString().split(":"); // 如map中已经存在的用户则把使用时间叠加 不存在则添加该用户 if (user_map.get(split[0]) == null) { user_map.put(split[0], Integer.parseInt(split[1])); user_used_map.put(split[0], 1); } else { use_time = user_map.get(split[0]); use_time += Integer.parseInt(split[1]); user_map.put(split[0], use_time); use_num = user_used_map.get(split[0]); use_num ++; user_used_map.put(split[0], use_num); } /** * 判断该用户是否为此表使用次数最多的, * 是则存进user_map和user_used_map,否则不存; * 由于只需要求使用量最多的用户,因此使用量不是最多用户没有必要存在于map中 */ if (use_num > max_user_used) { username = split[0]; max_user_used = use_num; user_used_time = use_time; //此处也可以不remove() user_used_map.remove(split[0]); user_map.remove(split[0]); } } if (table_used_num > max_used_num) { max_used_table = key.toString(); max_used_num = table_used_num; user_name = username; } } protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new Text(max_used_table), new Text(max_user_used + "\t" + user_name + "\t" + user_used_time)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TableUsed.class); job.setMapperClass(MRMapper.class); job.setReducerClass(MRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop5:9000/input/table_time.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop5:9000/output/put6")); System.out.println(job.waitForCompletion(true) ? 1 : 0); }}

转载于:https://www.cnblogs.com/lyjing/p/8406834.html

你可能感兴趣的文章
大型网站技术架构(六)网站的伸缩性架构
查看>>
MyBatis学习总结(四)——解决字段名与实体类属性名不相同的冲突
查看>>
MySQL基本
查看>>
《未测试》如何使用自己已经编译过的lamp安装cacti nagios zabbix
查看>>
我的友情链接
查看>>
dns配置
查看>>
Windows服务器时间不断修改(时间不同步已解决)
查看>>
多表查询
查看>>
【第49题】【062题库】2019年OCP认证062考试新题
查看>>
理解作用域(引擎,编译器,作用域)
查看>>
获取网页数据的例子
查看>>
struts2的配置文件
查看>>
JSP第5次测试---测试分析
查看>>
tomcat容器
查看>>
BZOJ 1858 序列操作 (线段树)
查看>>
Nginx配置文件详细说明
查看>>
同时可以修改时间和日期的datetime_select and 有关时间的转换
查看>>
IOS Orientation, 想怎么转就怎么转~~~
查看>>
Finding Lines
查看>>
服务提供者及门面
查看>>