bersama catatan peribadi & teknikalnya.

Pemasangan & Konfigurasi Hadoop (Distribusi Pseudo) dan Latihan: WordCount2

VM Fedora


geeky stuff
#hadoop | #java-8

Saya cuba menyediakan Hadoop dengan kluster nod tunggal (Single Node Cluster) di VM Fedora. Pada hemat saya, mungkin tak akan mampu lagilah untuk saya usahakan penyediaan pengoperasian berdistribusi penuh bersama Kerberos sebagai kaedah pengesahan kerana Hadoop memerlukan sistem yang berprestasi tinggi.

Berdasarkan jawapan yang disediakan oleh platform-platform AI, saya ringkaskan syarat minimum sistem untuk mengehoskan Hadoop dengan kluster nod tunggal, memandangkan perkakasan komputer riba saya hanya mampu menampung sumber berskala kecil:

PerkakasanMinimumSarananVM saya
RAM:4GB8GB6GB
CPU:2 cores4 cores6 cores
Storan:50GB100GB50GB

Servis: Apache Hadoop versi 3.4.0
Versi Java: Java 8 (dimuat turun dari laman web Oracle)
Pengguna: hadoop


Rujukan untuk contoh perintah baris bagi menjalankan VM disediakan melalui pautan yang tersenarai di bahagian bawah halaman ini.

Walaubagaimanapun, terdapat perbezaan dalam cara saya bersambung ke Internet kerana saya menggunakan kaedah manual melalui peranti TUNTAP dengan alamat IP statik, berbanding kaedah biasa yang menetapkan alamat IP secara automatik melalui DHCP. Contoh konfigurasi boleh didapati dalam penulisan saya yang bertajuk ‘Pemahaman Asas Kerberos dan SSH.’


1. SSH setup (sistem Hos & VM)

  • Suka untuk saya gunakan resolusi hostname dengan mengikat alamat IP mesin saya (IPv4) kepada nama hos yang ditetapkan dengan menyunting fail /etc/hosts seperti berikut:
192.168.0.100   single.node.loc snode

  • ~/.ssh/config:
# global options
Host *
  IdentitiesOnly yes
  IdentityFile ~/.ssh/id_rsa

## VMs
# with tuntap (kernel virtual network device)
Host snode 
  HostName single.node.loc
  User hadoop

Pastikan sudah memasang pakej openssh dan servis sshd sudah dimulakan sebelum meneruskan proses di bawah ini.

ssh snode 
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 0600 ~/.ssh/authorized_keys

2. Pakej prasyarat

Pasang pakej pdsh. Pakej ini amat dicadang pemasangannya oleh Hadoop untuk mendapatkan pengurusan sumber SSH yang lebih baik. Saya pasang dari sumber dengan mengklon repositori GitHub-nya untuk mendapatkan konfigurasi bersama SSH yang tidak disediakan sebagai tetapan lalai oleh Fedora melalui repo rasminya.

Pasang pakej-pakej yang diperlukan untuk binaan dari sumber:

sudo dnf install autoconf libtool

Klon dan bina:

# inside $HOME
mkdir Build && cd Build

# Clone the repository.
git clone https://github.com/chaos/pdsh.git

# Get into the directory.
cd pdsh

# Run the following commands to compile and install. (Good Luck!)
./bootstrap
./configure --with-ssh
sudo make
sudo make install

### Confirm the installation by verifying its version.
pdsh -V
pdsh-2.35
rcmd modules: ssh,rsh,exec (default: rsh)
misc modules: (none)

Pemasangan Java: Muat turun Java SE Development Kit 8u421 (fail jdk-8u421-linux-x64.rpm) yang memerlukan pengguna untuk log masuk ke akaun Oracle dan pasang dengan perintah baris:

sudo dnf install jdk-8u421-linux-x64.rpm

3. Pemasangan Hadoop

  • Muat turun pakej binari dari laman web rasmi Hadoop dan ekstrak pakej kompres itu ke direktori rumah pengguna dengan perintah tar:
wget -P ~/Downloads https://dlcdn.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz   # contoh
tar -xvzf Downloads/hadoop-3.4.0.tar.gz

  • Edit fail .bashrc untuk menambah laluan:
export JAVA_HOME="/usr/lib/jvm/jdk-1.8.0_421-oracle-x64"
export HADOOP_HOME="$HOME/hadoop-3.4.0"
export HADOOP_CLASSPATH="$JAVA_HOME/lib/tools.jar"
export PATH="$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin"

  • Aktifkan perubahan.
source ~/.bashrc

4. Pengurusan FireWall di VM

  • Memandangkan saya jalankan tugasan melalui SSH ke VM dari sistem hos, saya gunakan UFW sebagai pengurus FireWall untuk mendapatkan paparan web di hos.
sudo dnf install ufw
sudo systemctl enable --now ufw
sudo ufw enable
sudo ufw allow 8000:19999/tcp

5. Konfigurasi Hadoop

  • Sunting $HADOOP_HOME/etc/hadoop/hadoop-env.sh dan tambahkan baris di bawah:
export JAVA_HOME="/usr/lib/jvm/jdk-1.8.0_421-oracle-x64"

  • $HADOOP_HOME/etc/hadoop/core-site.xml:
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://snode:9000</value>
    </property>
</configuration>

  • $HADOOP_HOME/etc/hadoop/hdfs-site.xml:
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>namenodes</value>
    </property>
    <property>
        <name>dfs.namenode.http-address</name>
        <value>snode:9870</value>
    </property>
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>snode:9868</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>datanodes</value>
    </property>
    <property>
        <name>dfs.datanode.http.address</name>
        <value>snode:9864</value>
    </property>
</configuration>

  • $HADOOP_HOME/etc/hadoop/mapred-site.xml:
<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>mapreduce.application.classpath</name>
        <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>snode:19888</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.intermediate-done-dir</name>
        <value>mr-history/tmp</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.done-dir</name>
        <value>mr-history/done</value>
    </property>
</configuration>

  • $HADOOP_HOME/etc/hadoop/yarn-site.xml:
<configuration>
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>snode</value>
    </property>
    <property>
        <name>yarn.resourcemanager.webapp.address</name>
        <value>snode:8088</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_HOME,PATH,LANG,TZ,HADOOP_MAPRED_HOME</value>
    </property>
    <property>
        <name>yarn.nodemanager.webapp.address</name>
        <value>snode:8042</value>
    </property>
</configuration>

  • $HADOOP_HOME/etc/hadoop/workers:
snode

6. Mulakan servis

  • Format NameNode:
hdfs namenode -format

  • Mulakan daemon HDFS dan YARN (contoh output):
start-dfs.sh && start-yarn.sh && mapred --daemon start historyserver
Starting namenodes on [snode]
Starting datanodes
Starting secondary namenodes [snode]
Starting resourcemanager
Starting nodemanagers

  • Semak proses-proses yang berlangsung dan contoh output:
sudo jps
2289 NodeManager
1970 SecondaryNameNode
1749 DataNode
2758 Jps
1639 NameNode
2190 ResourceManager
2671 JobHistoryServer

7. Praktis 1 (using a built-in example program called grep)

  • Tambah direktori pengguna ke dalam Distributed filesystem:
# Return to $HOME
cd ~

# Create the user directory. 
hdfs dfs -mkdir -p /user/hadoop

# It will display 'mr-history' dir.
hdfs dfs -ls /user/hadoop

# Create the directory for the test.  
hdfs dfs -mkdir -p /user/hadoop/p1_intro/input

# Copy files from the local fs to the distributed fs.
hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml /user/hadoop/p1_intro/input

# Run a MapReduce job.
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar \
 grep /user/hadoop/p1_intro/input /user/hadoop/p1_intro/output \
 'dfs[a-z.]+'

  • Semak direktori:
hdfs dfs -ls -C /user/hadoop/p1_intro
hdfs dfs -ls -C /user/hadoop/p1_intro/output
/user/hadoop/p1_intro/output/_SUCCESS
/user/hadoop/p1_intro/output/part-r-00000

  • Output:
hdfs dfs -cat /user/hadoop/p1_intro/output/part-r-00000
1	dfsadmin
1	dfs.replication
1	dfs.namenode.secondary.http
1	dfs.namenode.name.dir
1	dfs.namenode.http
1	dfs.datanode.http.address
1	dfs.datanode.data.dir

  • Atau salin fail pengeluaran ke sistem lokal:
# OR copy the outputs from the distributed fs to the local fs for observation.
hdfs dfs -get /user/hadoop/p1_intro/output output1

  • Sambung dengan latihan seterusnya.

8. Praktis 2 (WordCount2)

  • Penyediaan:
# Create the directories for Practice 2.
hdfs dfs -mkdir -p /user/hadoop/p2_wordcount2/input

# Create two files; file1 & file2.
touch file1 file2  # residing in the local fs.
echo 'Hello World, Bye World!' > file1
echo 'Hello Hadoop, Goodbye to hadoop.' > file2

# Move the files from the local fs to the distributed fs.
hdfs dfs -moveFromLocal file1 /user/hadoop/p2_wordcount2/input
hdfs dfs -moveFromLocal file2 /user/hadoop/p2_wordcount2/input

### Confirm the moved files.
hdfs dfs -ls -C /user/hadoop/p2_wordcount2/input
/user/hadoop/p2_wordcount2/input/file1
/user/hadoop/p2_wordcount2/input/file2

  • Penyediaan awal untuk menggunakan ciri DistributedCache dalam dua kerja terakhir nanti:
touch patterns.txt

# Create patterns.txt with specified lines.
echo -e '\\.\n\\,\n\\!\nto' > patterns.txt

### Confirm the contents of patterns.txt.
cat patterns.txt
\.
\,
\!
to

  • Pindahkan fail di atas dari sistem lokal ke sistem distribusi:
hdfs dfs -moveFromLocal patterns.txt /user/hadoop/p2_wordcount2

### Display directories and files in p2_wordcount2.
hdfs dfs -ls -C /user/hadoop/p2_wordcount2
/user/hadoop/p2_wordcount2/input
/user/hadoop/p2_wordcount2/patterns.txt

  • Salin kod Java ini ke dalam fail WordCount2.java (available in Hadoop documentation as Tutorial):
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

  • Semak kod terlebih dahulu dengan perintah less:
less WordCount2.java

  • Penyusunan:
# Compile the WordCount2 Java source code.
hadoop com.sun.tools.javac.Main WordCount2.java

# Package the compiled Java classes into a JAR file.
jar cf wc.jar WordCount*.class

### List compiled files and jars.
ls ~/ | grep -E 'wc\.jar|WordCount2.*\.class'
wc.jar
WordCount2$IntSumReducer.class
WordCount2$TokenizerMapper$CountersEnum.class
WordCount2$TokenizerMapper.class
WordCount2.class

  • Jalankan kerja MapReduce tanpa ciri DistributedCache:
hadoop jar wc.jar WordCount2 \
 /user/hadoop/p2_wordcount2/input /user/hadoop/p2_wordcount2/output1

### Check generated files and view output.
hdfs dfs -cat /user/hadoop/p2_wordcount2/output1/part-r-00000
Bye	1
Goodbye	1
Hadoop,	1
Hello	2
World!	1
World,	1
hadoop.	1
to	1

  • Perhatikan perubahan output untuk proses MapReduce di bawah:
    = dengan menetapkan sensitiviti bagi jenis huruf kepada benar;
    = dengan mengaktifkan ciri DistributedCache melalui opsyen -skip.
hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true \
 /user/hadoop/p2_wordcount2/input /user/hadoop/p2_wordcount2/output2 \
 -skip /user/hadoop/p2_wordcount2/patterns.txt

### Observe the output.
hdfs dfs -cat /user/hadoop/p2_wordcount2/output2/part-r-00000
Bye	1
Goodbye	1
Hadoop	1
Hello	2
World	2
hadoop	1

  • Begitu juga untuk proses MapReduce di bawah:
    = dengan menetapkan sensitiviti bagi jenis huruf kepada tidak benar;
    = dengan mengaktifkan ciri DistributedCache melalui opsyen -skip.
hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false \
 /user/hadoop/p2_wordcount2/input /user/hadoop/p2_wordcount2/output3 \
 -skip /user/hadoop/p2_wordcount2/patterns.txt

### Observe the output.
hdfs dfs -cat /user/hadoop/p2_wordcount2/output3/part-r-00000
bye	1
goodbye	1
hadoop	2
hello	2
world	2

  • Paparan web di:
  1. NameNode: http://snode:9870/

  2. ResourceManager: http://snode:8088/

  3. MapReduce JobHistory Server: http://snode:19888/

  • Setelah selesai, hentikan daemon YARN dan HDFS:
mapred --daemon stop historyserver && stop-yarn.sh && stop-dfs.sh
exit



Kali terakhir dikemaskini:
Top