最近使用MapReduce,一个常用的需求是直接在另外一台服务器上提交一个MapReduce任务到Hadoop集群,网上很多的例子都是不可用的,折腾了好久终于搞定了,同时还要解决第三方依赖jar的问题.

Hadoop在提交job的时候,会通过RPC把本地jar及相关配置依赖上传到HDFS上,然后运行,示例如下.

首先是一个简单的MapReduce任务.Mapper

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.nova.frame.http.Http;
import com.nova.frame.utils.PropertiesParser;

public class SimpleMapper extends Mapper<LongWritable, Text, Text, Text> {
    private static final Logger logger = LoggerFactory
            .getLogger(SimpleMapper.class);

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        logger.info("SimpleMapper running...");
        logger.info("use config:{}", new PropertiesParser("system.properties")
                .getStringProperty("use"));
        String line = value.toString();
        logger.info("url:{}", line);
        String source = Http.get(line).text();
        logger.info("source:{}", source);
        context.write(new Text(line), new Text(source));
    }
}

Reducer

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleReducer extends Reducer<Text, Text, Text, Text> {
    private static final Logger logger = LoggerFactory
            .getLogger(SimpleReducer.class);

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        logger.info("SimpleReducer running...");
        context.write(key, new Text(values.iterator().next()));
    }
}

Job主类

import java.security.PrivilegedExceptionAction;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SimpleJob extends Configured implements Tool {

    @Override
    public int run(final String[] args) throws Exception {
        UserGroupInformation ugi = UserGroupInformation
                .createRemoteUser("hdfs");
        try {

            ugi.doAs(new PrivilegedExceptionAction<Void>() {

                @Override
                public Void run() throws Exception {
                    Configuration config = getConf();
                    Job job = new Job(config, "simple job");
                    job.setJarByClass(SimpleJob.class);
                    job.setMapperClass(SimpleMapper.class);
                    job.setReducerClass(SimpleReducer.class);
                    job.setOutputKeyClass(Text.class);
                    job.setOutputValueClass(Text.class);

                    FileSystem fileSystem = FileSystem.get(config);
                    String outPath = args[1];
                    Path path = new Path(outPath);
                    if (!fileSystem.exists(path)) {
                        System.out.println("File " + outPath
                                + " does not exists");
                    } else {
                        fileSystem.delete(new Path(outPath), true);
                    }

                    fileSystem.close();
                    
                    FileInputFormat.addInputPath(job, new Path(args[0]));
                    FileOutputFormat.setOutputPath(job, new Path(outPath));
                    job.waitForCompletion(true);

                    // JobClient.runJob(config);
                    return null;
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new SimpleJob(), args);
        System.exit(res);
    }

    /**
     * 提交一个任务.
     * 
     * @param in
     *            输入路径
     * @param out
     *            输出路径
     * @return
     * @throws Exception
     */
    public static int submitJob(Configuration configuration, String in,
            String out) throws Exception {
        int res = ToolRunner.run(configuration, new SimpleJob(), new String[] {
                in, out });
        return res;
    }
}

其中Mapper中依赖了一个第三方的类com.nova.frame.http.Http.
在Job主类中注意以下几点.

  1. 使用

UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");来运行任务,确保用户有权限操作HDFS

  1. job.setJarByClass(SimpleJob.class); 这句必须要有,要不然可能会找不到对应的Mapper和Reducer

调用类:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;

public class TestApp {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        Configuration config = new Configuration();
        config.set("fs.defaultFS", "hdfs://hadoop1:8020");
        config.set("mapred.job.tracker", "hadoop1:8021");
        DistributedCache.addFileToClassPath(new Path(
                "/spider-lib/novaframe-1.5-beta1-withresources.jar"), config);
        String out = "/out/test/";
        String in = "/in/url.txt";
        try {
            int res = SimpleJob.submitJob(config, in, out);
            System.out.println(res);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在调用类中,

 Configuration config = new Configuration();
 config.set("fs.defaultFS", "hdfs://hadoop1:8020");
 config.set("mapred.job.tracker", "hadoop1:8021");

config指定远程的hdfs和jobtracker.

通过

DistributedCache.addFileToClassPath(new Path(
                "/spider-lib/novaframe-1.5-beta1-withresources.jar"), config);

来添加第三方jar包依赖,关于DistributedCache可以参考文章Hadoop DistributedCache详解在这之前,需要先把第三方jar上传到HDFS中,这里我上传的目录是/spider-lib/目录,简单的上传代码如下:

FileSystem fileSystem = FileSystem.get(getConfig());
File libPath = new File(libFilePath);
Validate.checkArgument(libPath.exists(), "path '" + libFilePath+ "' do not exists");
Collection<File> files = FileUtils.listFiles(libPath,new String[] { "jar" }, true);
    for (File file : files) {
        Path path = new Path(SystemContext.getHDFSJobLibPath()+ file.getName());
        if (!fileSystem.exists(path)) {
            logger.info("add file to hdfs {} -> {}",file.getPath(), path);
            fileSystem.copyFromLocalFile(new Path(file.getPath()),path);
        }
    }

可以在每次程序启动的时候,检查下是否第三方文件已被上传,没有则上传.记得以上代码也要通过UserGroupInformation.doAs来执行.

Comments
Write a Comment