Цепочка нескольких заданий MapReduce в Hadoop



во многих реальных ситуациях, когда вы применяете MapReduce, конечные алгоритмы заканчиваются несколькими шагами MapReduce.



т. е. Map1 , Reduce1 , Map2 , Reduce2 и так далее.



таким образом, у вас есть выход из последнего сокращения, который необходим в качестве входных данных для следующей карты.



промежуточные данные-это то, что вы (в целом) не хотите хранить после успешного завершения конвейера. Также потому, что эти промежуточные данные в целом некоторые структура данных (например, "карта" или "набор") Вы не хотите прикладывать слишком много усилий при написании и чтении этих пар ключ-значение.



каков рекомендуемый способ сделать это в Hadoop?



есть ли (простой) пример, который показывает, как правильно обрабатывать эти промежуточные данные, включая очистку после этого?

666   13  

13 ответов:

Я думаю, что этот учебник по сети разработчиков Yahoo поможет вам в этом:Цепочки Заданий

использовать JobClient.runJob(). Выходной путь данных из первого задания становится входным путем ко второму заданию. Они должны быть переданы в качестве аргументов для ваших заданий с соответствующим кодом, чтобы проанализировать их и настроить параметры для задания.

Я думаю, что вышеуказанный метод, однако, может быть таким, как теперь более старый MAPRED API сделал это, но он все равно должен работа. В новом API mapreduce будет аналогичный метод, но я не уверен, что это такое.

что касается удаления промежуточных данных после завершения задания, вы можете сделать это в своем коде. То, как я сделал это раньше, использует что-то вроде:

FileSystem.delete(Path f, boolean recursive);

где путь-это расположение на HDFS данных. Вы должны убедиться, что вы только удалить эти данные, как только ни одно другое задание не требует этого.

есть много способов вы можете сделать это.

(1) каскадные рабочих мест

создайте объект JobConf " job1 "для первого задания и установите все параметры с" input "в качестве inputdirectory и" temp " в качестве выходного каталога. Выполните это задание:

JobClient.run(job1).

непосредственно под ним создайте объект JobConf " job2 "для второго задания и установите все параметры с" temp "как inputdirectory и" output " как output directory. Выполните это задание:

JobClient.run(job2).

(2) создать два объекта JobConf и установить все параметры в них так же, как (1) за исключением того, что вы не используете JobClient.бежать.

затем создайте два объекта задания с jobconfs в качестве параметров:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

С помощью объекта jobControl можно указать зависимости заданий, а затем запустить задания:

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) Если вам нужна структура, похожая на Map+ / Reduce / Map*, вы можно использовать классы ChainMapper и ChainReducer, которые поставляются с Hadoop версии 0.19 и далее.

Ура

на самом деле есть несколько способов сделать это. Я сосредоточусь на двух.

один через Риффл (http://github.com/cwensel/riffle) Библиотека аннотаций для идентификации зависимых объектов и их "выполнения" в зависимом (топологическом) порядке.

или вы можете использовать Каскад (и MapReduceFlow) в Каскадировании (http://www.cascading.org/). Будущая версия будет поддерживать аннотации Riffle, но теперь она отлично работает с raw MR JobConf джобс.

вариант заключается в том, чтобы вообще не управлять Mr jobs вручную, а разрабатывать приложение с помощью каскадного API. Затем jobconf и цепочка заданий обрабатываются внутренне через каскадный планировщик и классы потока.

таким образом, вы проводите свое время, сосредоточившись на своей проблеме, а не на механике управления заданиями Hadoop и т. д. Вы можете даже наложить различные языки сверху (например, clojure или jruby), чтобы еще больше упростить разработку и приложения. http://www.cascading.org/modules.html

Я сделал цепочку работы с использованием объектов JobConf один за другим. Я взял пример WordCount для цепочки заданий. Одно задание вычисляет, сколько раз слово a повторяется в данном выводе. Второе задание принимает первый вывод задания в качестве входных данных и вычисляет общее количество слов в данном входе. Ниже приведен код, который должен быть помещен в класс драйвера.

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

команда для запуска этих заданий:

bin / hadoop jar TotalWords.

мы должны дать конечное имя задания для команды. В приведенном выше случае это TotalWords.

вы можете использовать oozie для обработки barch ваших заданий MapReduce. http://issues.apache.org/jira/browse/HADOOP-5303

вы можете запустить Mr chain таким образом, как указано в коде.

ОБРАТИТЕ ВНИМАНИЕ: только код драйвера был предоставлен

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

ПОСЛЕДОВАТЕЛЬНОСТЬ

(JOB1)КАРТА - > УМЕНЬШИТЬ-> (JOB2)карте
Это было сделано для сортировки ключей, но есть еще несколько способов, таких как использование treemap
Тем не менее, я хочу сосредоточить ваше внимание на том, как работа была прикована!!
Спасибо

в проекте Apache Mahout есть примеры, которые связывают вместе несколько заданий MapReduce. Один из примеров можно найти по адресу:

RecommenderJob.java

http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob

мы можем использовать waitForCompletion(true) метод задания для определения зависимости между заданиями.

в моем сценарии у меня было 3 задания, которые зависели друг от друга. В классе драйверов я использовал приведенный ниже код, и он работает так, как ожидалось.

public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        CCJobExecution ccJobExecution = new CCJobExecution();

        Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
        Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
        Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);

        System.out.println("****************Started Executing distanceTimeFraudJob ================");
        distanceTimeFraudJob.submit();
        if(distanceTimeFraudJob.waitForCompletion(true))
        {
            System.out.println("=================Completed DistanceTimeFraudJob================= ");
            System.out.println("=================Started Executing spendingFraudJob ================");
            spendingFraudJob.submit();
            if(spendingFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed spendingFraudJob================= ");
                System.out.println("=================Started locationFraudJob================= ");
                locationFraudJob.submit();
                if(locationFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed locationFraudJob=================");
                }
            }
        }
    }

новый класс org.апаш.платформа Hadoop.mapreduce.движение за освобождение.цепь.ChainMapper помогите этому сценарию

хотя существуют сложные серверные движки рабочего процесса Hadoop, например, oozie, у меня есть простая библиотека java, которая позволяет выполнять несколько заданий Hadoop в качестве рабочего процесса. Конфигурация задания и рабочий процесс, определяющие зависимость между заданиями, настраиваются в файле JSON. Все внешне настраивается и не требует каких-либо изменений в существующей реализации map reduce, чтобы быть частью рабочего процесса.

подробности можно найти здесь. Исходный код и опарник доступны внутри на GitHub.

http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

П.

Я думаю, что oozie помогает последующим заданиям получать входные данные непосредственно из предыдущего задания. Это позволяет избежать операции ввода-вывода выполняются с jobcontrol.

Если вы хотите программно связать свои задания, вы будете использовать JobControl. Использование довольно просто:

    JobControl jobControl = new JobControl(name);

после этого вы добавляете экземпляры ControlledJob. ControlledJob определяет задание с его зависимостями, таким образом автоматически подключая входы и выходы, чтобы соответствовать "цепочке" заданий.

    jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));

    jobControl.run();

начинается цепь. Вы хотите, чтобы положить, что в потоке speerate. Это позволяет проверить состояние вашей цепи, пока она работает:

    while (!jobControl.allFinished()) {
        System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
        System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
        System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
        List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
        System.out.println("Jobs in success state: " + successfulJobList.size());
        List<ControlledJob> failedJobList = jobControl.getFailedJobList();
        System.out.println("Jobs in failed state: " + failedJobList.size());
    }

Как вы упомянули в своем требовании, что вы хотите, чтобы o/p MRJob1 был i / p MRJob2 и т. д., Вы можете рассмотреть возможность использования рабочего процесса oozie для этого usecase. Также вы можете рассмотреть возможность записи промежуточных данных в HDFS, поскольку они будут использоваться следующим MRJob. И после завершения задания вы можете очистить ваши промежуточные данные.

<start to="mr-action1"/>
<action name="mr-action1">
   <!-- action for MRJob1-->
   <!-- set output path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="mr-action2">
   <!-- action for MRJob2-->
   <!-- set input path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="success">
        <!-- action for success-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="fail">
        <!-- action for fail-->
    <ok to="end"/>
    <error to="end"/>
</action>

<end name="end"/>

Comments

    Ничего не найдено.