# Natural language processing on computational cluster

The aim of the course is to introduce methods required in natural language processing (processing huge data sets in distributed environment and performing machine learning) and show how to effectively execute them on ÚFAL computational Linux cluster. The course will cover ÚFAL network and cluster architecture, Slurm, Spark, related Linux tools, and best practices.

The course follows the outline in the ÚFAL wiki: Introduction to ÚFAL (you will need an ÚFAL wiki account to access that site; each ÚFAL PhD student is entitles to get a wiki account).

The whole course is taught in several first weeks of the semester. If you plan to attend the course, please contact any of the Guarantors listed below.

SIS code: NPFL118
Semester: winter
E-credits: 3
Examination: 0/2 C
Guarantors: Milan Straka, Martin Popel, Rudolf Rosa

### Requirements

In order to pass the course, you have to attend the meetings and do all the required assignments.

Unless otherwise stated, teaching materials for this course are available under CC BY-SA 4.0.

This page describes a possible initial configuration of your Linux environment. You are of course free to modify it in any way you want :-)

## .profile

The .profile is run automatically when you log in (i.e., when you log in to your desktop of to a machine via SSH). Note that .bash_profile has precedence, so if you have it, .profile will not be used. Therefore, if you have .bash_profile, move its content to .profile and then remove .bash_profile (or the other way around).

# If you have a favourite editor, use it here instead of vim
export EDITOR=vim

# Add ~/bin to PATH; useful if you want to write own scripts
export PATH="$HOME/bin:$PATH"

# Add ~/.local/bin to PATH; directory of binaries installed by Python
export PATH="$HOME/.local/bin:$PATH"

# Add Spark to PATH
export PATH="/net/projects/spark/bin:/net/projects/spark/slurm:/net/projects/spark/sbt/bin:$PATH" # Export paths to CUDA 11.2 and cuDNN 8.1; written in Dec 2022, newer versions may be required later export LD_LIBRARY_PATH=/opt/cuda/11.2/lib64:/opt/cuda/11.2/cudnn/8.1.1/lib64:/opt/cuda/11.2/nccl/2.8.4/lib:/opt/cuda/11.2/extras/CUPTI/lib64:$LD_LIBRARY_PATH

# Make sure LC_CTYPE and LC_COLLATE are always the same; you can change it to a different one
export LC_CTYPE=cs_CZ.UTF-8 LC_COLLATE=cs_CZ.UTF-8

# Make sure TERM is set to a reasonable default
[ "$TERM" = linux ] && export TERM=xterm # Run .bashrc if running bash if [ -n "$BASH_VERSION" ]; then
[ -f ~/.bashrc ] && . ~/.bashrc
fi


## .bashrc

The .bashrc is run whenever you open a new terminal window. Note that it is customary for your .profile to also run .bashrc (the last three lines of the above file).

# If not running in interactive terminal, stop.
[ -z "$PS1" ] && return # Set larger history without duplicates, ignore commands starting with space export HISTCONTROL=ignorespace:erasedups HISTSIZE=8192 HISTFILESIZE=16384 shopt -s checkwinsize cmdhist histappend # Set color prompt PS1='${debian_chroot:+($debian_chroot)}$\033[01;32m$\u@\h$\033[00m$:$\033[01;34m$\w$\033[00m$\$ '

# enable color support of ls and also add handy aliases
if [ -x /usr/bin/dircolors ]; then
eval "dircolors -b"
alias ls='ls --color=auto'
fi

# enable color support of grep
export GREP_COLORS="ms=01;32:mc=01;32:sl=:cx=:fn=34:ln=36:bn=36:se=36"
alias grep='grep --color'

# enable color support of less
export LESS_TERMCAP_{md=$'\E[01;32m',me=$'\E[0m',us=$'\E[01;33m',ue=$'\E[0m'}

# enable programmable completion features
if [ -f /etc/bash_completion ]; then
. /etc/bash_completion
fi


## .screenrc

If you submit screen -D -m as an SGE job, screen starts without a connected terminal and cannot copy sane settings from it; it is therefore a good idea to have them in .screenrc.

altscreen
defbce on
defencoding utf-8
defflow off
defscrollback 65536
term xterm #or other, if you prefer
hardstatus on
hardstatus alwayslastline
hardstatus string "%{= bw}Screen: %Lw"
caption string "%?%F%{.R.}%?%3n %t%? [%h]%?"
setenv PROMPT_COMMAND "echo -ne '\ek\e\\\\\eksh '\${PWD##*/}'\e\\\\'" shelltitle "$ |bash"
startup_message off


## Quick Intro

### sbatch: submit a job for execution

• -J, --job-name=name set name of the job
• -o, --output=outpath set file with the standard output of the job; default slurm-%j.out
• -e, --error=outpath set file with the standard error of the job; default is to merge with the standard output
• -W, --wait wait till the job finishes
• -n, --ntasks=number number of tasks, allocated on possibly different machines
• --spread-job spread the tasks over many nodes to evenly distribute them
• -c, --cpus-per-task=number number of CPUs per task
• -G, --gpus=number number of GPUs for the whole job
• --gpus-per-task=number number of GPUs per task
• --mem=size[KMGT] memory per node
• --mem-per-cpu=size[KMGT] memory per CPU
• --mem-per-gpu=size[KMGT] memory per GPU
• -p partition[,partition2,...] submit only to a given partition (queue)
• -w, --nodelist=node[,node[1-8],...] submit only to the given nodes (machines)
• -q priority sets the priority of your job; low, normal, high are available, with normal being the default
• -d, --dependency=afterok:job_id[:job_id] run when the mentioned jobs successfully finish
• --mail-type=[NONE|BEGIN|END|FAIL|REQUEUE|ALL|ARRAY_TASKS|...] send a notification email on the specified events
• environmental variable SLURM_JOB_ID
• environmental variable SLURM_JOB_NAME

#### Array Jobs

• -a, --array=1-n start array job with $n$ tasks numbered $1 … n$
• environmental variable SLURM_ARRAY_TASK_ID
• output file slurm-%A_%a.out
• -a, --array=m-n[:s] start array job with tasks $m, m+s, …, n$
• environmental variables SLURM_ARRAY_TASK_MIN, SLURM_ARRAY_TASK_MAX, SLURM_ARRAY_TASK_STEP
• -a, --array=m-n[:s]%p run at most $p$ tasks simultaneously
• scontrol update ArrayTaskThrottle=p JobId=job_id changes the limit of simultaneously running tasks of the given array job
• -d, --dependency=aftercorr:job_id[:job_id] run when the corresponding array task finished successfully

#### Partitions

We currently have the following partitions

• cpu-ms, cpu-troja
• gpu-ms, gpu-troja

### squeue: list of running jobs

• all users by default
• squeue --me only me
• squeue --user=user[,user2,...] show the given users

### scontrol show job -d $job_id: detailed information about a job ### ~straka/bin.local/sq: show overview of the whole cluster • ~straka/bin.local/sq cpu shows only cpu partitions • ~straka/bin.local/sq gpu shows only gpu partitions ### scontrol update: modify properties of a submitted/running job • only some properties can be modified, of course ### scancel: stops jobs • job_id [job_id2 ...] stop the jobs with the given IDs • -n, --name=name, --jobname=name stop jobs with the given name • for array jobs, scancel job_id_array_id removes only one task ### srun: start an interactive shell • srun --pty bash runs an interactive terminal (think ssh) • --pty connects not just standard input, output, and error, but also the pseudo terminal • if you want a "lasting" interactive terminal, you can submit screen -D -m using sbatch ## Simple Array Job Example The following script processes a Wikipedia file described in Assignments and returns sorted list of article names. • articles.sh (available in /net/data/npfl118/examples/) #!/bin/bash set -e # Parse arguments [ "$#" -ge 3 ] || { echo Usage: "$0 input_file outdir tasks [conc_tasks]" >&2; exit 1; } input_file="$1"
output_dir="$2" tasks="$3"
conc_tasks="$4" # Check that input file exists and get its size [ -f "$input_file" ] || { echo File $input_file does not exist >&2; exit 1; } # Check that output dir does not exist and create it [ -d "$output_dir" ] && { echo Directory $output_dir already exists >&2; exit 1; } mkdir -p "$output_dir"

# Run distributed computations
sbatch --wait -o "$output_dir/task-%a.log" -a 1-"$tasks"${conc_tasks:+%$conc_tasks} \
./articles_distributed.sh "$tasks" "$input_file" "$output_dir"/articles.txt # Merge all results sort -m seq -f "$output_dir/articles.txt.%g" 1 "$tasks" > "$output_dir"/articles.txt
rm seq -f "$output_dir/articles.txt.%g" 1 "$tasks"

• articles_distributed.sh (available in /net/data/npfl118/examples/)
#!/bin/bash

set -e

# Parse arguments
[ "$#" -ge 3 ] || { echo Usage:$0 total_tasks input output_file >&2; exit 1; }
tasks="$1" input_file="$2"
output_file="$3" # Parse SLURM_ARRAY_TASK_ID and compute file offset [ -n "$SLURM_ARRAY_TASK_ID" ] || { echo Variable SLURM_ARRAY_TASK_ID is not set >&2; exit 1; }
task="$SLURM_ARRAY_TASK_ID" output_file="$output_file.$task" # Run computation outputting to temporary file tmp_file="mktemp" trap "rm -f \"$tmp_file\"" EXIT

split -n l/$task/$tasks "$input_file" | cut -f1 | sort > "$tmp_file"

# On success move temporary file to output
mv "$tmp_file" "$output_file"


## GPUs

We give only a quick overview here, more detailed treatment of the GPU cluster can be found in ÚFAL wiki.

GPU jobs are scheduled as usual jobs, but in gpu-ms or gpu-troja partition. You need to specify how many GPUs and of what kind you want, using

• -G, --gpus=number number of GPUs for the whole job
• --gpus-per-task=number number of GPUs per task
• -C, --constraint=gpuramXXG: only GPUs with the given RAM (11,16,24,40,48) are considered

During execution, CUDA_VISIBILE_DEVICES is set to the allocated GPUs.

Then, you need a framework which can use the GPU, and you also need to set paths correctly:

• To use CUDA 11.2 and cuDNN 8.1, use
export LD_LIBRARY_PATH=/opt/cuda/11.2/lib64:/opt/cuda/11.2/cudnn/8.1.1/lib64:/opt/cuda/11.2/nccl/2.8.4/lib:/opt/cuda/11.2/extras/CUPTI/lib64:\$LD_LIBRARY_PATH

• Alternatively, you might use modules. You can list available modules using
module avail

an enable specific modules using for example
module load cuda/11.2

See https://modules.readthedocs.io/en/latest/ for reference.

Spark is a framework for distributed computations. Natively it works in Python, Scala and Java.

Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.

### Initialization

You need to set PATH variable to include Spark binaries, see .profile in the Config tab.

### Running

An interactive ipython shell can be started using

PYSPARK_DRIVER_PYTHON=ipython3 pyspark


(use pip3 install --user ipython if you do not have ipython3).

Such shell will use current cluster, starting a local cluster with as many threads as cores.

To create a distributed cluster using SGE, you can run one of the following commands:

• spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]: start Spark cluster and perform a srun inside it
• spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]: start Spark cluster and execute the given command inside it

### Example

Start by running spark-srun 50 2G. When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute PYSPARK_DRIVER_PYTHON=ipython3 pyspark.

Then, try running the following:

(sc.textFile("/net/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda c1, c2: c1 + c2)
.sortBy(lambda word_count: word_count[1], ascending=False)
.take(10))


#### Running a Script

To execute a script instead of running from an interactive shell, you need to create the SparkContext manually:

• word_count.py (available in /net/data/npfl118/examples/)
#!/usr/bin/env python
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("input", type=str, help="Input file/path")
parser.add_argument("output", type=str, help="Output directory")
args = parser.parse_args()

import pyspark

sc = pyspark.SparkContext()
input = sc.textFile(args.input, 3*sc.defaultParallelism)
words = input.flatMap(lambda line: line.split())
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2)
sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False)
sorted.saveAsTextFile(args.output)


You can run such script using

spark-submit script.py input_path output_path


If you want to use a specific virtual environment, you can use

PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...


### No Cheating

• Cheating is strictly prohibited and any student found cheating will be punished. The punishment can involve failing the whole course, or, in grave cases, being expelled from the faculty.
• Discussing homework assignments with your classmates is OK. Sharing code is not OK (unless explicitly allowed); by default, you must complete the assignments yourself.
• All students involved in cheating will be punished. If you share your assignment with a friend, both you and your friend will be punished.

### Input Data

For the assignments, you can find the input data in /net/data/npfl118. Most assignments use the following Wikipedia data:

• wiki/cs/wiki.txt: Czech Wikipedia data (Sep 2009), file size 195MB, 124k articles.
• wiki/en/wiki.txt: English Wikipedia data (Sep 2009), File size 4.9BG, 2.9M articles.
• wiki/cs/wiki-small.txt, wiki/en/wiki-small.txt: First 1000 articles of the respective Wikipedias.

The files are in UTF-8 and contain one article per line. Article name is separated by a \t character from the article content.

### unique_words

required Template: You can start with /net/data/npfl118/examples/{articles.sh,articles_distributed.sh}

Implement a Slurm distributed job to create a list of unique words used in the article texts (the article titles are not considered part of article texts). Convert the texts to lowercase to ignore case (and make sure the lowercasing works also for non-ASCII characters).

Because the article data is not tokenized, use the provided /net/data/npfl118/wiki/{cs,en}/tokenizer, which reads untokenized UTF-8 text from standard input and produces tokenized UTF-8 text on standard output. It preserves line breaks and separates tokens on each line by exactly one space.

### inverted_index

optional

In a distributed way, compute inverted index – for every lemma from the articles, compute ascending (article id, ascending positions of occurrences as word indices) pairs. In order to do so, number the articles using consecutive integers and produce also a list of articles representing this mapping (the article on line $i$ is the article with id $i$; you can use the example articles.sh).

The output should be a file with the list of articles ordered by article id, and a file with one lemma on a line in this format:

lemma \t article_id \t space separated occurrence indices \t article_id \t space separated occurrence indices ...


Both the article_ids and the occurrence indices should be in ascending order.

To generate the lemmas, use the provided /net/data/npfl118/wiki/{cs,en}/lemmatizer, which again reads untokenized UTF-8 text and outputs the space separated lemmas on the output, preserving line breaks.

### gpu_multiplication

required Template: /net/data/npfl118/assignments/gpu_multiplication.py

Install TensorFlow 2.11 in a virtual environment:

• python3 -m venv venv
• venv/bin/python -m pip install --upgrade pip setuptools
• venv/bin/python -m pip install tensorflow==2.11

Also, make sure you have added CUDA and cuDNN to your paths, see the end of the Slurm page.

Finally, use /net/data/npfl118/assignments/gpu_multiplication.py to measure how long it takes to compute matrix multiplication, both on a CPU and GPU version. The given script measures the required time for all given matrix dimensions.

• For CPU version with 1 thread, use dimensions up to 10000 with step 1000.
• For CPU version with 8 threads, use dimensions up to 10000 with step 1000.
• For GPU version, use dimensions up to 20000 with step 1000.

Finally, estimate the speedup for this task of using:

• GPU instead of 1 CPU thread.

### spark_lemmas

required Template: /net/data/npfl118/assignments/spark_lemmas.py

Using the provided /net/data/npfl118/wiki/{cs,en}/lemmatizer, generate list of 100 most frequent lemmas in Czech and English wiki on standard output.

To utilize the lemmatizer, use rdd.pipe.

### spark_anagrams

required Template: /net/data/npfl118/assignments/spark_anagrams.py

Two words are anagrams if one is a character permutation of the other (ignoring case).

For a given wiki language, find all anagram classes that contain at least $A$ different words (a parameter of the script). Output each anagram class (unique words with the same character permutation) on a separate line.

Use the /net/data/npfl118/wiki/{cs,en}/tokenizer, to tokenize the input, again using rdd.pipe.

### spark_inverted_index

either this or spark_kmeans required Template: /net/data/npfl118/assignments/spark_inverted_index.py

In a distributed way, compute an inverted index – for every lemma from the articles, compute ascending (article id, ascending positions of occurrences as word indices) pairs. In order to do so, number the articles (in any order) using consecutive integers, and produce also a file (or several files) with a list of articles representing this mapping (the article on line $i$ is the article with id $i$).

The output should be

1. a single file with the list of articles ordered by article id;

2. the inverted index consisting of several files (the whole index could be large, so it should be split into files of tens or hundreds of MBs). Each file should contain several lemmas, each on a single line in this format:

lemma \t article_id \t space separated occurrence indices \t article_id \t space separated occurrence indices ...


where both the article_ids and the occurrence indices are in ascending order.

Lemmas should be sorted alphabetically (inside files and also across files), but we allow a lemma to appear in multiple consecutive files; in that case, the article_ids should be in ascending order even across all lemma lines.

To be able to perform the computation in a distributed way, each worker should alway process data of a constant size. Therefore:

• you cannot fit the whole dictionary mapping article names to ids in memory (to avoid it, see for example zipWithIndex);
• you cannot fit all articles and all occurrences of a single lemma in memory at once (that is why a lemma can appear in multiple consecutive files; the required output format can be generated for example using mapPartitions);
• on the other hand, you can fit a single article in memory (and therefore also occurrences of a lemma in a single document), and you can also fit one of the index files (i.e., an RDD partition) in memory.

To generate the lemmas, use the provided /net/data/npfl118/wiki/{cs,en}/lemmatizer, which again reads untokenized UTF-8 text and outputs the space separated lemmas on the output, preserving line breaks.

### spark_kmeans

either this or spark_inverted_index required Template: /net/data/npfl118/assignments/spark_kmeans.py

Implement the basic variant of the K-means clustering algorithm.

Use the following data:

Path Number of points Number of dimensions Number of clusters
/net/data/npfl118/points/points-small.txt 10000 50 50
/net/data/npfl118/points/points-medium.txt 100000 100 100
/net/data/npfl118/points/points-large.txt 500000 200 200

Run for the given number of iterations. Print the distance by which the centers move each iteration, and stop the algorithm if this distance is less than a given epsilon. Once finished, print the found centers.