V tomto tutoriáli sa naučíte používať Hadoop s MapReduce examples. Použité vstupné údaje sú SalesJan2009.csv. Obsahuje informácie týkajúce sa predaja, ako je názov produktu, cena, spôsob platby, mesto, krajina klienta atď. Cieľom je zistiť počet predaných produktov v každej krajine.
V tomto návode sa naučíte
- Prvý program Hadoop MapReduce
- Vysvetlenie triedy SalesMapper
- Vysvetlenie triedy SalesCountryReducer
- Vysvetlenie triedy SalesCountryDriver
Prvý program Hadoop MapReduce
Teraz v tomto výučbe MapReduce vytvoríme náš prvý program Java MapReduce:
Uistite sa, že máte nainštalovaný Hadoop. Predtým, ako začnete so skutočným procesom, zmeňte používateľa na „hduser“ (ID použité pri konfigurácii Hadoop, môžete prepnúť na ID používateľa, ktoré ste použili pri konfigurácii programovania Hadoop).
su - hduser_
Krok 1)
Vytvorte nový adresár s názvom MapReduceTutorial ako shwon v nižšie uvedenom príklade MapReduce
sudo mkdir MapReduceTutorial
Udeľte povolenia
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}
SalesCountryReducer.java
package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer{public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}
SalesCountryDriver.java
package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}
Súbory na stiahnutie tu
Skontrolujte povolenia všetkých týchto súborov
a ak chýbajú povolenia na čítanie, udeľte rovnaké
Krok 2)
Exportujte triednu cestu, ako je znázornené v nasledujúcom príklade Hadoop
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Krok 3)
Kompilujte súbory Java (tieto súbory sa nachádzajú v adresári Final-MapReduceHandsOn ). Súbory jeho triedy budú uložené v adresári balíka
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Toto varovanie je možné bezpečne ignorovať.
Táto kompilácia vytvorí adresár v aktuálnom adresári s názvom balíka určeným v zdrojovom súbore Java (tj v našom prípade tj. SalesCountry ) a vloží doň všetky skompilované súbory.
Krok 4)
Vytvorte nový súbor Manifest.txt
sudo gedit Manifest.txt
pridať k tomu nasledujúce riadky,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver je názov hlavnej triedy. Upozorňujeme, že na konci tohto riadku musíte stlačiť kláves Enter.
Krok 5)
Vytvorte súbor Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Skontrolujte, či je vytvorený súbor jar
Krok 6)
Spustite Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Krok 7)
Skopírujte súbor SalesJan2009.csv do ~ / inputMapReduce
Teraz pomocou príkazu dole skopírujte ~ / inputMapReduce na HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Toto varovanie môžeme pokojne ignorovať.
Skontrolujte, či je súbor skutočne skopírovaný alebo nie.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Krok 8)
Spustite úlohu MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Týmto sa na HDFS vytvorí výstupný adresár s názvom mapreduce_output_sales. Obsahom tohto adresára bude súbor obsahujúci predaj produktov v jednotlivých krajinách.
Krok 9)
Výsledok je možné vidieť cez príkazové rozhranie ako,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Výsledky je možné vidieť aj prostredníctvom webového rozhrania
Otvorte r vo webovom prehliadači.
Teraz zvoľte „Prehliadať súborový systém“ a prejdite na / mapreduce_output_sales
Otvorená časť-r-00000
Vysvetlenie triedy SalesMapper
V tejto časti budeme rozumieť implementácii triedy SalesMapper .
1. Začneme zadaním názvu balíka pre našu triedu. SalesCountry je názov nášho balíka. Upozorňujeme, že výstup z kompilácie bude SalesMapper.class smerovať do adresára s týmto názvom balíka: SalesCountry .
Nasleduje import balíkov knižnice.
Nasledujúca snímka zobrazuje implementáciu triedy SalesMapper -
Vysvetlenie vzorového kódu:
1. Definícia triedy SalesMapper -
verejná trieda SalesMapper rozširuje MapReduceBase implementuje Mapper
Každá trieda mapovača musí byť rozšírená z triedy MapReduceBase a musí implementovať rozhranie Mapper .
2. Definovanie funkcie „mapa“ -
public void map(LongWritable key,Text value,OutputCollectoroutput,Reporter reporter) throws IOException
Hlavnou časťou triedy Mapper je metóda 'map ()', ktorá akceptuje štyri argumenty.
Pri každom volaní metódy „map ()“ sa odovzdá pár kľúč - hodnota ( „kľúč“ a „hodnota“ v tomto kóde).
Metóda „map ()“ začína rozdelením vstupného textu, ktorý sa prijíma ako argument. Na rozdelenie týchto riadkov na slová používa tokenizer.
String valueString = value.toString();String[] SingleCountryData = valueString.split(",");
Tu sa ako oddeľovač používa znak „,“ .
Potom sa vytvorí pár pomocou záznamu na 7. indexe poľa „SingleCountryData“ a hodnoty „1“ .
output.collect (nový text (SingleCountryData [7]), jeden);
Sme sa rozhodli rekord v 7. indexe, pretože potrebujeme Krajina údaje a nachádza sa v 7. index v poli , SingleCountryData ' .
Upozorňujeme, že naše vstupné dáta sú v nasledujúcej podobe (kde Zem je v 7 th index, s 0 ako predvolený index) -
Transaction_date, Product, Price, Payment_Type, Name, City, State, Country , Account_Created, Last_Login, Latitude, Longitude
Výstupom mapovača je opäť pár kľúč - hodnota, ktorý sa výstupuje metódou 'collect ()' v 'OutputCollector' .
Vysvetlenie triedy SalesCountryReducer
V tejto časti budeme rozumieť implementácii triedy SalesCountryReducer .
1. Začneme zadaním názvu balíka pre našu triedu. SalesCountry je názov out of package. Upozorňujeme, že výstup kompilácie, SalesCountryReducer.class , prejde do adresára s týmto názvom balíka: SalesCountry .
Nasleduje import balíkov knižnice.
Nasledujúca snímka zobrazuje implementáciu triedy SalesCountryReducer -
Vysvetlenie kódu:
1. Definícia triedy SalesCountryReducer -
verejná trieda SalesCountryReducer rozširuje MapReduceBase implementuje Reducer
Tu sú prvé dva dátové typy „Text“ a „IntWritable“ dátovým typom vstupného páru kľúč - hodnota pre reduktor.
Výstup mapovača je vo forme
Posledné dva dátové typy, 'Text' a 'IntWritable', sú dátovým typom výstupu generovaného redukciou vo forme páru kľúč - hodnota.
Každá trieda reduktorov musí byť rozšírená z triedy MapReduceBase a musí implementovať rozhranie reduktora .
2. Definovanie funkcie „zmenšiť“
public void reduce( Text t_key,Iteratorvalues,OutputCollector output,Reporter reporter) throws IOException {
Vstupom do metódy redukovať () je kľúč so zoznamom viacerých hodnôt.
Napríklad v našom prípade to bude-
Toto sa dáva reduktoru ako
Aby sme teda prijali argumenty tohto formulára, použijú sa prvé dva dátové typy, a to Text a Iterator
Ďalším argumentom je typ OutputCollector
Metóda redukovať () začína kopírovaním kľúčovej hodnoty a inicializáciou počtu frekvencií na 0.
Textový kľúč = t_key; int frequencyForCountry = 0;
Potom pomocou slučky „ while“ iterujeme zoznamom hodnôt spojených s kľúčom a vypočítame konečnú frekvenciu súčtom všetkých hodnôt.
while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}
Teraz posunieme výsledok do výstupného kolektora v podobe kľúča a získaného počtu frekvencií .
Nižšie uvedený kód to robí-
output.collect(key, new IntWritable(frequencyForCountry));
Vysvetlenie triedy SalesCountryDriver
V tejto časti budeme rozumieť implementácii triedy SalesCountryDriver
1. Začneme zadaním názvu balíka pre našu triedu. SalesCountry je názov out of package. Upozorňujeme, že výstup kompilácie, SalesCountryDriver.class , pôjde do adresára s týmto názvom balíka: SalesCountry .
Tu je riadok so špecifikáciou názvu balíka, za ktorým nasleduje kód na import balíkov knižnice.
2. Definujte triedu ovládačov, ktorá vytvorí novú úlohu klienta, konfiguračný objekt a inzeruje triedy Mapper a Reducer.
Trieda vodičov je zodpovedná za nastavenie našej úlohy MapReduce tak, aby bežala v Hadoop. V tejto triede špecifikujeme názov úlohy, dátový typ vstupu / výstupu a názvy tried mapovačov a reduktorov .
3. V nasledujúcom fragmente kódu nastavíme vstupné a výstupné adresáre, ktoré sa používajú na spotrebovanie vstupnej množiny údajov a na produkciu výstupu.
arg [0] a arg [1] sú argumenty príkazového riadku zadané s príkazom zadaným v MapReduce, tj,
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Spustite našu prácu
Pod kódom spustite vykonávanie úlohy MapReduce-
try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}