最近使用MapReduce,一个常用的需求是直接在另外一台服务器上提交一个MapReduce任务到Hadoop集群,网上很多的例子都是不可用的,折腾了好久终于搞定了,同时还要解决第三方依赖jar的问题.
Hadoop在提交job的时候,会通过RPC把本地jar及相关配置依赖上传到HDFS上,然后运行,示例如下.
首先是一个简单的MapReduce任务.Mapper
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nova.frame.http.Http;
import com.nova.frame.utils.PropertiesParser;
public class SimpleMapper extends Mapper<LongWritable, Text, Text, Text> {
private static final Logger logger = LoggerFactory
.getLogger(SimpleMapper.class);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
logger.info("SimpleMapper running...");
logger.info("use config:{}", new PropertiesParser("system.properties")
.getStringProperty("use"));
String line = value.toString();
logger.info("url:{}", line);
String source = Http.get(line).text();
logger.info("source:{}", source);
context.write(new Text(line), new Text(source));
}
}
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleReducer extends Reducer<Text, Text, Text, Text> {
private static final Logger logger = LoggerFactory
.getLogger(SimpleReducer.class);
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
logger.info("SimpleReducer running...");
context.write(key, new Text(values.iterator().next()));
}
}
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SimpleJob extends Configured implements Tool {
@Override
public int run(final String[] args) throws Exception {
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser("hdfs");
try {
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
Configuration config = getConf();
Job job = new Job(config, "simple job");
job.setJarByClass(SimpleJob.class);
job.setMapperClass(SimpleMapper.class);
job.setReducerClass(SimpleReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fileSystem = FileSystem.get(config);
String outPath = args[1];
Path path = new Path(outPath);
if (!fileSystem.exists(path)) {
System.out.println("File " + outPath
+ " does not exists");
} else {
fileSystem.delete(new Path(outPath), true);
}
fileSystem.close();
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(outPath));
job.waitForCompletion(true);
// JobClient.runJob(config);
return null;
}
});
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SimpleJob(), args);
System.exit(res);
}
/**
* 提交一个任务.
*
* @param in
* 输入路径
* @param out
* 输出路径
* @return
* @throws Exception
*/
public static int submitJob(Configuration configuration, String in,
String out) throws Exception {
int res = ToolRunner.run(configuration, new SimpleJob(), new String[] {
in, out });
return res;
}
}
其中Mapper
中依赖了一个第三方的类com.nova.frame.http.Http
.
在Job主类中注意以下几点.
- 使用
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");
来运行任务,确保用户有权限操作HDFS
job.setJarByClass(SimpleJob.class);
这句必须要有,要不然可能会找不到对应的Mapper和Reducer
调用类:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
public class TestApp {
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://hadoop1:8020");
config.set("mapred.job.tracker", "hadoop1:8021");
DistributedCache.addFileToClassPath(new Path(
"/spider-lib/novaframe-1.5-beta1-withresources.jar"), config);
String out = "/out/test/";
String in = "/in/url.txt";
try {
int res = SimpleJob.submitJob(config, in, out);
System.out.println(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
在调用类中,
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://hadoop1:8020");
config.set("mapred.job.tracker", "hadoop1:8021");
config指定远程的hdfs和jobtracker.
通过
DistributedCache.addFileToClassPath(new Path(
"/spider-lib/novaframe-1.5-beta1-withresources.jar"), config);
来添加第三方jar包依赖,关于DistributedCache可以参考文章Hadoop DistributedCache详解在这之前,需要先把第三方jar上传到HDFS中,这里我上传的目录是/spider-lib/目录,简单的上传代码如下:
FileSystem fileSystem = FileSystem.get(getConfig());
File libPath = new File(libFilePath);
Validate.checkArgument(libPath.exists(), "path '" + libFilePath+ "' do not exists");
Collection<File> files = FileUtils.listFiles(libPath,new String[] { "jar" }, true);
for (File file : files) {
Path path = new Path(SystemContext.getHDFSJobLibPath()+ file.getName());
if (!fileSystem.exists(path)) {
logger.info("add file to hdfs {} -> {}",file.getPath(), path);
fileSystem.copyFromLocalFile(new Path(file.getPath()),path);
}
}
可以在每次程序启动的时候,检查下是否第三方文件已被上传,没有则上传.记得以上代码也要通过UserGroupInformation.doAs
来执行.
Comments