Hadoop - MapReduce


Advertisements

MapReduce é um quadro com que podemos escrever aplicativos para processar grandes quantidades de dados, em paralelo, em grandes aglomerados de hardware de consumo de forma fiável.

O que é MapReduce?

MapReduce é uma técnica de tratamento e um modelo de programa de computação distribuída baseada em java. O MapReduce algoritmo contém duas funções importantes, a saber: Mapa e reduzir. Mapa leva um conjunto de dados e converte-a em outro conjunto de dados, onde cada um dos elementos são decompostos em Tuplas (pares chave/valor). Em segundo lugar, reduzir tarefa, o que leva a saída, a partir de um mapa como uma entrada e combina esses dados Tuplas em um conjunto menor de Tuplas. Como a seqüência de o nome MapReduce, implica a redução tarefa é realizada sempre após o mapa.

A grande vantagem do MapReduce é que é fácil para processamento de dados em escala computação múltiplos nós. Sob o modelo MapReduce, o tratamento dos dados primitivos são chamados mapas e redutores. Decompondo aplicativo de processamento de dados em mapas e redutores às vezes é algo corriqueiro. Mas, uma vez que escrever um aplicativo do MapReduce, escalando o aplicativo para ser executado ao longo de centenas, milhares, ou mesmo dezenas de milhares de máquinas em um cluster é meramente uma mudança de configuração. Esta escalabilidade simples é o que tem atraído muitos programadores usam o MapReduce modelo.

O algoritmo

  • Geralmente MapReduce paradigma baseia-se no envio do computador de onde residem os dados!

  • MapReduce programa é executado em três fases, a saber: mapa palco, shuffle, e reduzir fase.

    • Mapa palco: o mapa ou mapper é o trabalho para processar os dados de entrada. Em geral, os dados de entrada é em forma de arquivo ou diretório e é armazenado no Hadoop file system (HDFS). O arquivo de entrada é passado para a função mapa linha por linha. O mapeador processa os dados e cria vários pequenos blocos de dados.

    • Reduzir fase: nesta fase, é a combinação do Shuffle e reduzir fase. O redutor é o trabalho para processar os dados que vem do mapa. Após o processamento, produz um novo conjunto de saída, que será armazenado no HDFS.

  • Durante a tarefa, MapReduce Hadoop envia o mapa e reduzir o tempo de execução de tarefas para os servidores no cluster.

  • O framework gerencia todos os detalhes do cruzamento de dados como, por exemplo, a emissão tarefas, verificar a conclusão da tarefa, e a cópia dos dados em torno do cluster entre os nós.

  • A maioria dos computadores tem lugar em nós com os dados em discos locais que reduz o tráfego de rede.

  • Após a conclusão das tarefas, o cluster reúne e reduz os dados para formar um resultado adequado, e envia-la de volta para o Hadoop server.

MapReduce Algorithm

As entradas e saídas (Java Perspective)

O MapReduce framework funciona com <chave, o valor> aos pares, ou seja, o quadro opiniões a entrada para o trabalho como um conjunto de <key, value> pares e produz um conjunto de <key, value> pares como a saída do trabalho, possivelmente de diferentes tipos.

A chave e o valor deve ser em classes forma serializada do quadro e, portanto, necessidade de implementar a interface gravável. Além disso, as principais classes para implementar o Writable-Comparable interface para facilitar a classificação do quadro. Os tipos de saída e entrada de MapReduce tarefa: (entrada) <k1, v1> -> mapa -> <k2, v2>-> reduzir -> <k3, v3> (saída).

Input Output
Mapa <k1, v1> list (<k2, v2>)
Reduzir <k2, list(v2)> list (<k3, v3>)

Terminologia

  • O payload - Aplicações implementar o mapa e a reduzir as funções, e forma o núcleo do trabalho.

  • Mapa - mapas Mapa entrada do pares chave/valor para um conjunto de chave intermediária/valor par.

  • NamedNode - nó que gerencia o Hadoop Distributed File System (HDFS).

  • DataNode - Nó onde os dados são apresentados com antecedência antes de qualquer tratamento.

  • MasterNode - Nó onde JobTracker executa e que aceita solicitações de trabalho de seus clientes.

  • SlaveNode - Nó onde Mapa e reduzir programa é executado.

  • JobTracker - Calendário trabalhos e rastreia a designar tarefas a tarefa tracker.

  • Task Tracker - As faixas a tarefa e relatórios de status JobTracker.

  • Trabalho - O programa é uma execução de um Mapper e redutor em um dataset.

  • Tarefa - uma execução de um mapa ou um redutor sobre uma fatia de dados.

  • Tarefa tentativa - uma instância particular de uma tentativa de executar uma tarefa em um SlaveNode.

Cenário de Exemplo

Abaixo está os dados referentes ao consumo de energia eléctrica de uma organização. Ela contém o consumo elétrico mensal e a média anual de vários anos.

Jan Fev Mar Abr May Jun Jul Ago Set Oct Nov Dec Avg
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Se os dados acima é dado como entrada, temos de escrever aplicativos para processar e produzir resultados como, por exemplo, encontrar o ano de utilização máxima, ano de uso mínimo e assim por diante. Este é um walkover para programadores com número finito de registros. Eles simplesmente irão escrever a lógica de produzir a saída desejada e passe os dados para a aplicação.

Mas, acho que os dados que representam o consumo eléctrico de todos os latifúndios indústrias de um determinado estado, desde a sua formação.

Quando queremos escrever aplicativos para processar tais dados em massa.

  • Eles vão levar uma grande quantidade de tempo para executar.
  • Não será um excesso de tráfego de rede quando passamos os dados da fonte para servidor de rede e assim por diante.

Para resolver estes problemas, temos o MapReduce.

Os dados de entrada

Os dados acima referidos é guardada como sample.txt dada como entrada. O arquivo de entrada olha como mostrado abaixo.

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

Exemplo de um programa

Abaixo está o programa para a amostra de dados usando MapReduce.

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits 
{ 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   { 
      
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      Reporter reporter) throws IOException 
      { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens())
            {
               lasttoken=s.nextToken();
            } 
            
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   } 
   
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements 
   Reducer< Text, IntWritable, Text, IntWritable > 
   {  
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
         OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
         { 
            int maxavg=30; 
            int val=Integer.MIN_VALUE; 
            
            while (values.hasNext()) 
            { 
               if((val=values.next().get())>maxavg) 
               { 
                  output.collect(key, new IntWritable(val)); 
               } 
            } 
          } 
    }  
   
   
   //Main function 
   public static void main(String args[])throws Exception 
   { 
      JobConf conf = new JobConf(Eleunits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
} 

Salve o programa acima referido como ProcessUnits.java. A compilação e a execução do programa é explicado abaixo.

Elaboração e execução do programa das Unidades de Processo

Deixe-nos supor que estamos no diretório home de um Hadoop user (e.g. /home/hadoop).

Siga as etapas abaixo para compilar e executar o programa.

Passo 1

O comando seguinte é criar um diretório para armazenar as classes java compiladas.

$ mkdir units 

Passo 2

DownloadHadoop-core-1.2.1.jar,que é usado para compilar e executar o MapReduce programa. Visite o link a seguir http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1 para fazer o download do jar. Vamos assumir a pasta download é /home/hadoop/.

Passo 3

Os comandos a seguir são usados para compilar o ProcessUnits.java programa e criando um copo para o programa.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ . 

Passo 4

O seguinte comando é usado para criar uma entrada no diretório HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir 

Passo 5

O seguinte comando é utilizado para copiar o arquivo de entrada denominado sample.txt diretório de entrada HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 

Passo 6

O comando a seguir é utilizado para verificar os arquivos do directório de entrada.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/ 

Passo 7

O comando a seguir é utilizado para executar o aplicativo Eleunit_max a arquivos de entrada do directório de entrada.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 

Aguarde um pouco até que o arquivo seja executado. Depois da execução, como mostrado abaixo, o resultado irá conter o número de grupos, o número do mapa as tarefas, o número do redutor tarefas, etc.

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
File System Counters 
 
FILE: Number of bytes read=61 
FILE: Number of bytes written=279400 
FILE: Number of read operations=0 
FILE: Number of large read operations=0   
FILE: Number of write operations=0 
HDFS: Number of bytes read=546 
HDFS: Number of bytes written=40 
HDFS: Number of read operations=9 
HDFS: Number of large read operations=0 
HDFS: Number of write operations=2 Job Counters 


   Launched map tasks=2  
   Launched reduce tasks=1 
   Data-local map tasks=2  
   Total time spent by all maps in occupied slots (ms)=146137 
   Total time spent by all reduces in occupied slots (ms)=441   
   Total time spent by all map tasks (ms)=14613 
   Total time spent by all reduce tasks (ms)=44120 
   Total vcore-seconds taken by all map tasks=146137 
   
   Total vcore-seconds taken by all reduce tasks=44120 
   Total megabyte-seconds taken by all map tasks=149644288 
   Total megabyte-seconds taken by all reduce tasks=45178880 
   
Map-Reduce Framework 
 
Map input records=5  
   Map output records=5   
   Map output bytes=45  
   Map output materialized bytes=67  
   Input split bytes=208 
   Combine input records=5  
   Combine output records=5 
   Reduce input groups=5  
   Reduce shuffle bytes=6  
   Reduce input records=5  
   Reduce output records=5  
   Spilled Records=10  
   Shuffled Maps =2  
   Failed Shuffles=0  
   Merged Map outputs=2  
   GC time elapsed (ms)=948  
   CPU time spent (ms)=5160  
   Physical memory (bytes) snapshot=47749120  
   Virtual memory (bytes) snapshot=2899349504  
   Total committed heap usage (bytes)=277684224
     
File Output Format Counters 
 
   Bytes Written=40 

Passo 8

O comando a seguir é utilizado para verificar se o consequente os arquivos na pasta de saída.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/ 

Passo 9

O seguinte comando é utilizado para ver o resultado na Part-00000 arquivo. Este arquivo é gerado pela HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 

Abaixo está o resultado gerado pelo MapReduce programa.

1981    34 
1984    40 
1985    45 

Passo 10

O seguinte comando é utilizado para copiar a pasta de saída de HDFS no sistema de arquivos local para análise.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 

Comandos Importantes

Todos Hadoop comandos são invocadas por $HADOOP_HOME/bin/hadoop comando. A Hadoop script sem quaisquer argumentos imprime a descrição de todos os comandos.

Uso: hadoop [--config confdir] COMMAND

A tabela a seguir mostra as opções disponíveis e a sua descrição.

Opções Descrição
Namenode -format Formata o DFS filesystem.
Secondarynamenode Executa o DFS namenode secundários.
Namenode Executa o DFS namenode.
Datanode É executado um DFS datanode.
Dfsadmin É executado um DFS cliente admin.
Mradmin Executa um Map-Reduce cliente admin.
FSCK Executa um utilitário de verificação arquivos DFS.
FS Arquivos genéricos corre um usuário cliente.
Balancer É executado um cluster equilibrando utilitário.
OIV Aplica-se o off-line fsimage viewer para um fsimage.
Fetchdt Busca uma delegação token do NameNode.
Jobtracker Executa o MapReduce job Tracker nó.
Tubos Executa um trabalho tubos.
Tasktracker Executa um MapReduce task Tracker nó.
Historyserver Executa trabalho história autônoma servidores como um daemon.
Trabalho Manipula o MapReduce postos de trabalho.
Fila Recebe informações sobre JobQueues.
Versão Imprime a versão.
JAR <jar> É executado um arquivo jar.
distcp <srcurl> <desturl> Copia arquivos ou diretórios recursivamente.
distcp2 <srcurl> <desturl> DistCp versão 2.
Archive -archiveName NOME -p Cria a hadoop archive.
<parent path> <src>* <dest>
Classpath Imprime a caminho de classe necessária para se obter a Hadoop copo e as bibliotecas necessárias.
Daemonlog Get/Set o nível de log para cada daemon

Como interagir com MapReduce Empregos

Uso: hadoop trabalho [GENERIC_OPTIONS]

A seguir estão as opções genéricas disponíveis no Hadoop trabalho.

GENERIC_OPTIONS Descrição
-Apresentar <job-file> Apresente o trabalho.
O estado <job-id> Imprime o mapa e reduzir percentual de conclusão e todos os contadores de trabalhos.
Contador <job-id> <group-name> <countername> Imprime o valor do contador.
-Kill <job-id> Mata o trabalho.
-Eventos <job-id> <fromevent-#> <#-of-events> Imprime os detalhes dos eventos recebidos pelo jobtracker para o intervalo.
-História [todos] <jobOutputDir> - history < jobOutputDir> Imprime os detalhes do trabalho, falha e mortos dica detalhes. Mais detalhes sobre o trabalho, tais como tarefas bem-sucedidas e tarefa tentativas feitas para cada tarefa pode ser visualizado por meio da especificação do [todos] opção.
-Lista [todas] Mostra todos os trabalhos. A lista mostra apenas os trabalhos que ainda não foram concluídos.
-kill-task <task-id> Mata a tarefa. Mortas as tarefas não são contadas contra tentativas falhadas.
-fail-task <task-id> Falhar a missão. As tarefas não são contados de acordo com
Tentativas falhadas.
Defina a prioridade <job-id> <priority> Muda a prioridade do trabalho. Admitidos os valores de prioridade são VERY_HIGH, ALTO, NORMAL E BAIXO, VERY_LOW

Para ver o status do trabalho

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 

Para ver a história da saída do trabalho-dir

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 

Para matar o trabalho

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004 
Advertisements