任务4.1使用Eclipse创建MapReduce工程
配置服务端环境
vi ./yarn-site.xml
<!-- 固定写法,不写yarn会报错 -->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<property>
<name>yarn.application.classpath</name>
<value>/usr/local/hadoop-3.3.1/etc/hadoop,
/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/common/*,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*,
/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*,
/usr/local/hadoop-3.3.1/share/hadoop/yarn,
/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/yarn/*</value>
</property>
<name>yarn.application.classpath</name>
<value>/usr/local/hadoop-3.3.1/etc/hadoop,
/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/common/*,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*,
/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*,
/usr/local/hadoop-3.3.1/share/hadoop/yarn,
/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/yarn/*</value>
</property>
vi ./mapred-site.xml
<property>
<name>mapreduce.application.classpath</name>
<value>/usr/local/hadoop-3.3.1/etc/hadoop,
/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/common/*,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*,
/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*,
/usr/local/hadoop-3.3.1/share/hadoop/yarn,
/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/yarn/*</value>
</property>
<name>mapreduce.application.classpath</name>
<value>/usr/local/hadoop-3.3.1/etc/hadoop,
/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/common/*,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*,
/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*,
/usr/local/hadoop-3.3.1/share/hadoop/yarn,
/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*,
/usr/local/hadoop-3.3.1/share/hadoop/yarn/*</value>
</property>
任务4.2通过源码初识MapReduce编程
任务4.3编程实现日期统计访问次数
编写代码
添加Mapper类
public static class MyMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.split(","); //指定空格为分隔符,组成数组
String keyOutput = array[1]; //提取数组中的访问日期做为Key
context.write(new Text(keyOutput), one); //组成键值对
}
}
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.split(","); //指定空格为分隔符,组成数组
String keyOutput = array[1]; //提取数组中的访问日期做为Key
context.write(new Text(keyOutput), one); //组成键值对
}
}
添加Reducer类
public static class MyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0; //定义累加器,初始值为0
for (IntWritable val : values) {
sum += val.get(); // 将相同键的所有值进行累加
}
result.set(sum);
context.write(key,result);
}
}
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0; //定义累加器,初始值为0
for (IntWritable val : values) {
sum += val.get(); // 将相同键的所有值进行累加
}
result.set(sum);
context.write(key,result);
}
}
添加Driver代码
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs= new String[]{"/user/myname/user_login.txt","/user/myname/output_AccessCount"};
}// myname要改为自已的姓名拼音
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "Daily Access Count");
job.setJarByClass(dailyAccessCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs= new String[]{"/user/myname/user_login.txt","/user/myname/output_AccessCount"};
}// myname要改为自已的姓名拼音
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "Daily Access Count");
job.setJarByClass(dailyAccessCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
任务4.4编程实现按访问次数排序
编写代码
添加Mapper类
public static class MyMapper
extends Mapper<Object, Text,IntWritable,Text>{
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String lines = value.toString();
String array[] = lines.split("\t"); //指定tab为分隔符,组成数组
int keyOutput = Integer.parseInt(array[1]); //提取访问次数做为Key
String valueOutput = array[0]; //提取访问日期做为Values
context.write(new IntWritable(keyOutput), new Text(valueOutput));
}
}
extends Mapper<Object, Text,IntWritable,Text>{
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String lines = value.toString();
String array[] = lines.split("\t"); //指定tab为分隔符,组成数组
int keyOutput = Integer.parseInt(array[1]); //提取访问次数做为Key
String valueOutput = array[0]; //提取访问日期做为Values
context.write(new IntWritable(keyOutput), new Text(valueOutput));
}
}
添加Driver代码
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs= new String[]{"/user/myname/output_AccessCount","/user/myname/output_TimeSort"};
}// myname要改为自已的姓名拼音
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "Access Time Sort");
job.setJarByClass(accessTimesSort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs= new String[]{"/user/myname/output_AccessCount","/user/myname/output_TimeSort"};
}// myname要改为自已的姓名拼音
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "Access Time Sort");
job.setJarByClass(accessTimesSort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
其它参考
实验室x86_32位机上调试
wordcount v2
Mapper类
官方文档
Modifier and Type Method and Description
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
Called once at the end of the task.
protected void map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)
Called once for each key/value pair in the input split.
void run(org.apache.hadoop.mapreduce.Mapper.Context context)
Expert users can override this method for more complete control over the execution of the Mapper.
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
Called once at the beginning of the task.
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
Called once at the end of the task.
protected void map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)
Called once for each key/value pair in the input split.
void run(org.apache.hadoop.mapreduce.Mapper.Context context)
Expert users can override this method for more complete control over the execution of the Mapper.
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
Called once at the beginning of the task.
实训1获取成绩表的最高分记录
常见问题
导出的jar在集群上运行报jdk版本不符
It is indirectly referenced from required .class files
Couldn't preview the file. NetworkError: Failed to execute 'send' on 'XMLHttpRequest': Failed to load
yarn站点观察不到任务
观察master的 resoucesmanager日志发现
Invalid resource request! Cannot allocate containers as requested resource is greater than maximum allowed allocation. Requested resource type=[memory-mb], Requested resource=<memory:1536, vCores:1>, maximum allowed allocation=<memory:1024, vCores:2>, please note that maximum allowed allocation is calculated by scheduler based on maximum resource of registered NodeManagers, which might be less than configured maximum allocation=<memory:1024, vCores:4>
运行wordcount时提示slf4j-log4j12-1.7.30重复
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
since CPU usage is not yet available
mapreduce中文乱码问题
用Hadoop处理数据的时候,发现输出的时候,总是会出现乱码,这是因为Hadoop在设计编码的时候,是写死的。默认是UTF-8,所以当你处理的文件编码格式不是为UTF-8的时候,比如为GBK格式,那么就会输出的时候就会出现乱码。
解决问题,就是转码,确定数据都是以UTF-8的编码格式在运行。
在map端从文件中读取一行数据的时候,把他转为UTF-8格式。例如:
我的文件是GBK格式的则:
// 把数据以GBK的格式读过来
String line = new String(value.getBytes(),0,value.getLength(),"GBK");
在map端从文件中读取一行数据的时候,把他转为UTF-8格式。例如:
我的文件是GBK格式的则:
// 把数据以GBK的格式读过来
String line = new String(value.getBytes(),0,value.getLength(),"GBK");
例如:实验:获取每科最高分的MAP
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = new String(value.getBytes(),0,value.getLength(),"GBK"); //为了处理乱码
String[] values= line.toString().trim().split(" ");
cource.set(values[0]);
score.set(Integer.parseInt(values[1]));
context.write(cource, score); //这里可能出现乱码,如果不进行 GBK转码的
}
throws IOException, InterruptedException {
String line = new String(value.getBytes(),0,value.getLength(),"GBK"); //为了处理乱码
String[] values= line.toString().trim().split(" ");
cource.set(values[0]);
score.set(Integer.parseInt(values[1]));
context.write(cource, score); //这里可能出现乱码,如果不进行 GBK转码的
}