MapReduce é um quadro com que podemos escrever aplicativos para processar grandes quantidades de dados, em paralelo, em grandes aglomerados de hardware de consumo de forma fiável.
MapReduce é uma técnica de tratamento e um modelo de programa de computação distribuída baseada em java. O MapReduce algoritmo contém duas funções importantes, a saber: Mapa e reduzir. Mapa leva um conjunto de dados e converte-a em outro conjunto de dados, onde cada um dos elementos são decompostos em Tuplas (pares chave/valor). Em segundo lugar, reduzir tarefa, o que leva a saída, a partir de um mapa como uma entrada e combina esses dados Tuplas em um conjunto menor de Tuplas. Como a seqüência de o nome MapReduce, implica a redução tarefa é realizada sempre após o mapa.
A grande vantagem do MapReduce é que é fácil para processamento de dados em escala computação múltiplos nós. Sob o modelo MapReduce, o tratamento dos dados primitivos são chamados mapas e redutores. Decompondo aplicativo de processamento de dados em mapas e redutores às vezes é algo corriqueiro. Mas, uma vez que escrever um aplicativo do MapReduce, escalando o aplicativo para ser executado ao longo de centenas, milhares, ou mesmo dezenas de milhares de máquinas em um cluster é meramente uma mudança de configuração. Esta escalabilidade simples é o que tem atraído muitos programadores usam o MapReduce modelo.
Geralmente MapReduce paradigma baseia-se no envio do computador de onde residem os dados!
MapReduce programa é executado em três fases, a saber: mapa palco, shuffle, e reduzir fase.
Mapa palco: o mapa ou mapper é o trabalho para processar os dados de entrada. Em geral, os dados de entrada é em forma de arquivo ou diretório e é armazenado no Hadoop file system (HDFS). O arquivo de entrada é passado para a função mapa linha por linha. O mapeador processa os dados e cria vários pequenos blocos de dados.
Reduzir fase: nesta fase, é a combinação do Shuffle e reduzir fase. O redutor é o trabalho para processar os dados que vem do mapa. Após o processamento, produz um novo conjunto de saída, que será armazenado no HDFS.
Durante a tarefa, MapReduce Hadoop envia o mapa e reduzir o tempo de execução de tarefas para os servidores no cluster.
O framework gerencia todos os detalhes do cruzamento de dados como, por exemplo, a emissão tarefas, verificar a conclusão da tarefa, e a cópia dos dados em torno do cluster entre os nós.
A maioria dos computadores tem lugar em nós com os dados em discos locais que reduz o tráfego de rede.
Após a conclusão das tarefas, o cluster reúne e reduz os dados para formar um resultado adequado, e envia-la de volta para o Hadoop server.
O MapReduce framework funciona com <chave, o valor> aos pares, ou seja, o quadro opiniões a entrada para o trabalho como um conjunto de <key, value> pares e produz um conjunto de <key, value> pares como a saída do trabalho, possivelmente de diferentes tipos.
A chave e o valor deve ser em classes forma serializada do quadro e, portanto, necessidade de implementar a interface gravável. Além disso, as principais classes para implementar o Writable-Comparable interface para facilitar a classificação do quadro. Os tipos de saída e entrada de MapReduce tarefa: (entrada) <k1, v1> -> mapa -> <k2, v2>-> reduzir -> <k3, v3> (saída).
Input | Output | |
---|---|---|
Mapa | <k1, v1> | list (<k2, v2>) |
Reduzir | <k2, list(v2)> | list (<k3, v3>) |
O payload - Aplicações implementar o mapa e a reduzir as funções, e forma o núcleo do trabalho.
Mapa - mapas Mapa entrada do pares chave/valor para um conjunto de chave intermediária/valor par.
NamedNode - nó que gerencia o Hadoop Distributed File System (HDFS).
DataNode - Nó onde os dados são apresentados com antecedência antes de qualquer tratamento.
MasterNode - Nó onde JobTracker executa e que aceita solicitações de trabalho de seus clientes.
SlaveNode - Nó onde Mapa e reduzir programa é executado.
JobTracker - Calendário trabalhos e rastreia a designar tarefas a tarefa tracker.
Task Tracker - As faixas a tarefa e relatórios de status JobTracker.
Trabalho - O programa é uma execução de um Mapper e redutor em um dataset.
Tarefa - uma execução de um mapa ou um redutor sobre uma fatia de dados.
Tarefa tentativa - uma instância particular de uma tentativa de executar uma tarefa em um SlaveNode.
Abaixo está os dados referentes ao consumo de energia eléctrica de uma organização. Ela contém o consumo elétrico mensal e a média anual de vários anos.
Jan | Fev | Mar | Abr | May | Jun | Jul | Ago | Set | Oct | Nov | Dec | Avg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Se os dados acima é dado como entrada, temos de escrever aplicativos para processar e produzir resultados como, por exemplo, encontrar o ano de utilização máxima, ano de uso mínimo e assim por diante. Este é um walkover para programadores com número finito de registros. Eles simplesmente irão escrever a lógica de produzir a saída desejada e passe os dados para a aplicação.
Mas, acho que os dados que representam o consumo eléctrico de todos os latifúndios indústrias de um determinado estado, desde a sua formação.
Quando queremos escrever aplicativos para processar tais dados em massa.
Para resolver estes problemas, temos o MapReduce.
Os dados acima referidos é guardada como sample.txt dada como entrada. O arquivo de entrada olha como mostrado abaixo.
1979 23 23 2 43 24 25 26 26 26 26 25 26 25 1980 26 27 28 28 28 30 31 31 31 30 30 30 29 1981 31 32 32 32 33 34 35 36 36 34 34 34 34 1984 39 38 39 39 39 41 42 43 40 39 38 38 40 1985 38 39 39 39 39 41 41 41 00 40 39 39 45
Abaixo está o programa para a amostra de dados usando MapReduce.
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"\t"); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(Eleunits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
Salve o programa acima referido como ProcessUnits.java. A compilação e a execução do programa é explicado abaixo.
Deixe-nos supor que estamos no diretório home de um Hadoop user (e.g. /home/hadoop).
Siga as etapas abaixo para compilar e executar o programa.
O comando seguinte é criar um diretório para armazenar as classes java compiladas.
$ mkdir units
DownloadHadoop-core-1.2.1.jar,que é usado para compilar e executar o MapReduce programa. Visite o link a seguir http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1 para fazer o download do jar. Vamos assumir a pasta download é /home/hadoop/.
Os comandos a seguir são usados para compilar o ProcessUnits.java programa e criando um copo para o programa.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
O seguinte comando é usado para criar uma entrada no diretório HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
O seguinte comando é utilizado para copiar o arquivo de entrada denominado sample.txt diretório de entrada HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
O comando a seguir é utilizado para verificar os arquivos do directório de entrada.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
O comando a seguir é utilizado para executar o aplicativo Eleunit_max a arquivos de entrada do directório de entrada.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Aguarde um pouco até que o arquivo seja executado. Depois da execução, como mostrado abaixo, o resultado irá conter o número de grupos, o número do mapa as tarefas, o número do redutor tarefas, etc.
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=61 FILE: Number of bytes written=279400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=546 HDFS: Number of bytes written=40 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=146137 Total time spent by all reduces in occupied slots (ms)=441 Total time spent by all map tasks (ms)=14613 Total time spent by all reduce tasks (ms)=44120 Total vcore-seconds taken by all map tasks=146137 Total vcore-seconds taken by all reduce tasks=44120 Total megabyte-seconds taken by all map tasks=149644288 Total megabyte-seconds taken by all reduce tasks=45178880 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=67 Input split bytes=208 Combine input records=5 Combine output records=5 Reduce input groups=5 Reduce shuffle bytes=6 Reduce input records=5 Reduce output records=5 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=948 CPU time spent (ms)=5160 Physical memory (bytes) snapshot=47749120 Virtual memory (bytes) snapshot=2899349504 Total committed heap usage (bytes)=277684224 File Output Format Counters Bytes Written=40
O comando a seguir é utilizado para verificar se o consequente os arquivos na pasta de saída.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
O seguinte comando é utilizado para ver o resultado na Part-00000 arquivo. Este arquivo é gerado pela HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Abaixo está o resultado gerado pelo MapReduce programa.
1981 34 1984 40 1985 45
O seguinte comando é utilizado para copiar a pasta de saída de HDFS no sistema de arquivos local para análise.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
Todos Hadoop comandos são invocadas por $HADOOP_HOME/bin/hadoop comando. A Hadoop script sem quaisquer argumentos imprime a descrição de todos os comandos.
Uso: hadoop [--config confdir] COMMAND
A tabela a seguir mostra as opções disponíveis e a sua descrição.
Opções | Descrição |
---|---|
Namenode -format | Formata o DFS filesystem. |
Secondarynamenode | Executa o DFS namenode secundários. |
Namenode | Executa o DFS namenode. |
Datanode | É executado um DFS datanode. |
Dfsadmin | É executado um DFS cliente admin. |
Mradmin | Executa um Map-Reduce cliente admin. |
FSCK | Executa um utilitário de verificação arquivos DFS. |
FS | Arquivos genéricos corre um usuário cliente. |
Balancer | É executado um cluster equilibrando utilitário. |
OIV | Aplica-se o off-line fsimage viewer para um fsimage. |
Fetchdt | Busca uma delegação token do NameNode. |
Jobtracker | Executa o MapReduce job Tracker nó. |
Tubos | Executa um trabalho tubos. |
Tasktracker | Executa um MapReduce task Tracker nó. |
Historyserver | Executa trabalho história autônoma servidores como um daemon. |
Trabalho | Manipula o MapReduce postos de trabalho. |
Fila | Recebe informações sobre JobQueues. |
Versão | Imprime a versão. |
JAR <jar> | É executado um arquivo jar. |
distcp <srcurl> <desturl> | Copia arquivos ou diretórios recursivamente. |
distcp2 <srcurl> <desturl> | DistCp versão 2. |
Archive -archiveName NOME -p | Cria a hadoop archive. |
<parent path> <src>* <dest> | |
Classpath | Imprime a caminho de classe necessária para se obter a Hadoop copo e as bibliotecas necessárias. |
Daemonlog | Get/Set o nível de log para cada daemon |
Uso: hadoop trabalho [GENERIC_OPTIONS]
A seguir estão as opções genéricas disponíveis no Hadoop trabalho.
GENERIC_OPTIONS | Descrição |
---|---|
-Apresentar <job-file> | Apresente o trabalho. |
O estado <job-id> | Imprime o mapa e reduzir percentual de conclusão e todos os contadores de trabalhos. |
Contador <job-id> <group-name> <countername> | Imprime o valor do contador. |
-Kill <job-id> | Mata o trabalho. |
-Eventos <job-id> <fromevent-#> <#-of-events> | Imprime os detalhes dos eventos recebidos pelo jobtracker para o intervalo. |
-História [todos] <jobOutputDir> - history < jobOutputDir> | Imprime os detalhes do trabalho, falha e mortos dica detalhes. Mais detalhes sobre o trabalho, tais como tarefas bem-sucedidas e tarefa tentativas feitas para cada tarefa pode ser visualizado por meio da especificação do [todos] opção. |
-Lista [todas] | Mostra todos os trabalhos. A lista mostra apenas os trabalhos que ainda não foram concluídos. |
-kill-task <task-id> | Mata a tarefa. Mortas as tarefas não são contadas contra tentativas falhadas. |
-fail-task <task-id> | Falhar a missão. As tarefas não são contados de acordo com |
Tentativas falhadas. | |
Defina a prioridade <job-id> <priority> | Muda a prioridade do trabalho. Admitidos os valores de prioridade são VERY_HIGH, ALTO, NORMAL E BAIXO, VERY_LOW |
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g. $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004