Hadoop & Príklady mapreduce: Vytvorenie prvého programu v Jave

Obsah:

Anonim

V tomto tutoriáli sa naučíte používať Hadoop s MapReduce examples. Použité vstupné údaje sú SalesJan2009.csv. Obsahuje informácie týkajúce sa predaja, ako je názov produktu, cena, spôsob platby, mesto, krajina klienta atď. Cieľom je zistiť počet predaných produktov v každej krajine.

V tomto návode sa naučíte

  • Prvý program Hadoop MapReduce
  • Vysvetlenie triedy SalesMapper
  • Vysvetlenie triedy SalesCountryReducer
  • Vysvetlenie triedy SalesCountryDriver

Prvý program Hadoop MapReduce

Teraz v tomto výučbe MapReduce vytvoríme náš prvý program Java MapReduce:

Údaje o predajiJan2009

Uistite sa, že máte nainštalovaný Hadoop. Predtým, ako začnete so skutočným procesom, zmeňte používateľa na „hduser“ (ID použité pri konfigurácii Hadoop, môžete prepnúť na ID používateľa, ktoré ste použili pri konfigurácii programovania Hadoop).

su - hduser_

Krok 1)

Vytvorte nový adresár s názvom MapReduceTutorial ako shwon v nižšie uvedenom príklade MapReduce

sudo mkdir MapReduceTutorial

Udeľte povolenia

sudo chmod -R 777 MapReduceTutorial

SalesMapper.java

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

SalesCountryReducer.java

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

SalesCountryDriver.java

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Súbory na stiahnutie tu

Skontrolujte povolenia všetkých týchto súborov

a ak chýbajú povolenia na čítanie, udeľte rovnaké

Krok 2)

Exportujte triednu cestu, ako je znázornené v nasledujúcom príklade Hadoop

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Krok 3)

Kompilujte súbory Java (tieto súbory sa nachádzajú v adresári Final-MapReduceHandsOn ). Súbory jeho triedy budú uložené v adresári balíka

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Toto varovanie je možné bezpečne ignorovať.

Táto kompilácia vytvorí adresár v aktuálnom adresári s názvom balíka určeným v zdrojovom súbore Java (tj v našom prípade tj. SalesCountry ) a vloží doň všetky skompilované súbory.

Krok 4)

Vytvorte nový súbor Manifest.txt

sudo gedit Manifest.txt

pridať k tomu nasledujúce riadky,

Main-Class: SalesCountry.SalesCountryDriver

SalesCountry.SalesCountryDriver je názov hlavnej triedy. Upozorňujeme, že na konci tohto riadku musíte stlačiť kláves Enter.

Krok 5)

Vytvorte súbor Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Skontrolujte, či je vytvorený súbor jar

Krok 6)

Spustite Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Krok 7)

Skopírujte súbor SalesJan2009.csv do ~ / inputMapReduce

Teraz pomocou príkazu dole skopírujte ~ / inputMapReduce na HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Toto varovanie môžeme pokojne ignorovať.

Skontrolujte, či je súbor skutočne skopírovaný alebo nie.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Krok 8)

Spustite úlohu MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Týmto sa na HDFS vytvorí výstupný adresár s názvom mapreduce_output_sales. Obsahom tohto adresára bude súbor obsahujúci predaj produktov v jednotlivých krajinách.

Krok 9)

Výsledok je možné vidieť cez príkazové rozhranie ako,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Výsledky je možné vidieť aj prostredníctvom webového rozhrania

Otvorte r vo webovom prehliadači.

Teraz zvoľte „Prehliadať súborový systém“ a prejdite na / mapreduce_output_sales

Otvorená časť-r-00000

Vysvetlenie triedy SalesMapper

V tejto časti budeme rozumieť implementácii triedy SalesMapper .

1. Začneme zadaním názvu balíka pre našu triedu. SalesCountry je názov nášho balíka. Upozorňujeme, že výstup z kompilácie bude SalesMapper.class smerovať do adresára s týmto názvom balíka: SalesCountry .

Nasleduje import balíkov knižnice.

Nasledujúca snímka zobrazuje implementáciu triedy SalesMapper -

Vysvetlenie vzorového kódu:

1. Definícia triedy SalesMapper -

verejná trieda SalesMapper rozširuje MapReduceBase implementuje Mapper {

Každá trieda mapovača musí byť rozšírená z triedy MapReduceBase a musí implementovať rozhranie Mapper .

2. Definovanie funkcie „mapa“ -

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

Hlavnou časťou triedy Mapper je metóda 'map ()', ktorá akceptuje štyri argumenty.

Pri každom volaní metódy „map ()“ sa odovzdá pár kľúč - hodnota ( „kľúč“ a „hodnota“ v tomto kóde).

Metóda „map ()“ začína rozdelením vstupného textu, ktorý sa prijíma ako argument. Na rozdelenie týchto riadkov na slová používa tokenizer.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Tu sa ako oddeľovač používa znak „,“ .

Potom sa vytvorí pár pomocou záznamu na 7. indexe poľa „SingleCountryData“ a hodnoty „1“ .

output.collect (nový text (SingleCountryData [7]), jeden);

Sme sa rozhodli rekord v 7. indexe, pretože potrebujeme Krajina údaje a nachádza sa v 7. index v poli , SingleCountryData ' .

Upozorňujeme, že naše vstupné dáta sú v nasledujúcej podobe (kde Zem je v 7 th index, s 0 ako predvolený index) -

Transaction_date, Product, Price, Payment_Type, Name, City, State, Country , Account_Created, Last_Login, Latitude, Longitude

Výstupom mapovača je opäť pár kľúč - hodnota, ktorý sa výstupuje metódou 'collect ()' v 'OutputCollector' .

Vysvetlenie triedy SalesCountryReducer

V tejto časti budeme rozumieť implementácii triedy SalesCountryReducer .

1. Začneme zadaním názvu balíka pre našu triedu. SalesCountry je názov out of package. Upozorňujeme, že výstup kompilácie, SalesCountryReducer.class , prejde do adresára s týmto názvom balíka: SalesCountry .

Nasleduje import balíkov knižnice.

Nasledujúca snímka zobrazuje implementáciu triedy SalesCountryReducer -

Vysvetlenie kódu:

1. Definícia triedy SalesCountryReducer -

verejná trieda SalesCountryReducer rozširuje MapReduceBase implementuje Reducer {

Tu sú prvé dva dátové typy „Text“ a „IntWritable“ dátovým typom vstupného páru kľúč - hodnota pre reduktor.

Výstup mapovača je vo forme , . Tento výstup mapovača sa stane vstupom do reduktora. Aby sme sa zosúladili s jeho dátovým typom, tu sa ako dátový typ používajú Text a IntWritable .

Posledné dva dátové typy, 'Text' a 'IntWritable', sú dátovým typom výstupu generovaného redukciou vo forme páru kľúč - hodnota.

Každá trieda reduktorov musí byť rozšírená z triedy MapReduceBase a musí implementovať rozhranie reduktora .

2. Definovanie funkcie „zmenšiť“

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

Vstupom do metódy redukovať () je kľúč so zoznamom viacerých hodnôt.

Napríklad v našom prípade to bude-

, , , , , .

Toto sa dáva reduktoru ako

Aby sme teda prijali argumenty tohto formulára, použijú sa prvé dva dátové typy, a to Text a Iterator . Text je dátový typ kľúča a Iterator je dátový typ pre zoznam hodnôt pre tento kľúč.

Ďalším argumentom je typ OutputCollector , ktorý zhromažďuje výstup redukčnej fázy.

Metóda redukovať () začína kopírovaním kľúčovej hodnoty a inicializáciou počtu frekvencií na 0.

Textový kľúč = t_key; int frequencyForCountry = 0;

Potom pomocou slučky „ while“ iterujeme zoznamom hodnôt spojených s kľúčom a vypočítame konečnú frekvenciu súčtom všetkých hodnôt.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Teraz posunieme výsledok do výstupného kolektora v podobe kľúča a získaného počtu frekvencií .

Nižšie uvedený kód to robí-

output.collect(key, new IntWritable(frequencyForCountry));

Vysvetlenie triedy SalesCountryDriver

V tejto časti budeme rozumieť implementácii triedy SalesCountryDriver

1. Začneme zadaním názvu balíka pre našu triedu. SalesCountry je názov out of package. Upozorňujeme, že výstup kompilácie, SalesCountryDriver.class , pôjde do adresára s týmto názvom balíka: SalesCountry .

Tu je riadok so špecifikáciou názvu balíka, za ktorým nasleduje kód na import balíkov knižnice.

2. Definujte triedu ovládačov, ktorá vytvorí novú úlohu klienta, konfiguračný objekt a inzeruje triedy Mapper a Reducer.

Trieda vodičov je zodpovedná za nastavenie našej úlohy MapReduce tak, aby bežala v Hadoop. V tejto triede špecifikujeme názov úlohy, dátový typ vstupu / výstupu a názvy tried mapovačov a reduktorov .

3. V nasledujúcom fragmente kódu nastavíme vstupné a výstupné adresáre, ktoré sa používajú na spotrebovanie vstupnej množiny údajov a na produkciu výstupu.

arg [0] a arg [1] sú argumenty príkazového riadku zadané s príkazom zadaným v MapReduce, tj,

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Spustite našu prácu

Pod kódom spustite vykonávanie úlohy MapReduce-

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}