The aim of the course is to introduce methods required in natural language processing (processing huge data sets in distributed environment and performing machine learning) and show how to effectively execute them on ÚFAL computational Linux cluster. The course will cover ÚFAL network and cluster architecture, SGE (Sun/Oracle/Son of Grid Engine), related Linux tools and best practices.
The course follows the outline in the ÚFAL wiki: Introduction to ÚFAL (you will need an ÚFAL wiki account to access that site; each ÚFAL PhD student is entitles to get a wiki account).
The whole course is taught in several first weeks of the semester. If you plan to attend the course, please contact any of the Guarantors listed below.
SIS code: NPFL118
Semester: winter
E-credits: 3
Examination: 0/2 C
Guarantors: Milan Straka,
Martin Popel,
Rudolf Rosa
when | where | who: what |
---|---|---|
Wednesday 23.9. 14:00-18:00 | Zoom | Ruda+Sangeet: intro to ÚFAL (non-IT topics), Linux network, CPU and GPU cluster |
Tuesday 29.9. 10:00-12:00 | Zoom | Martin: CPU and GPU cluster Q&A, IT tricks, Python, Publications |
Tuesday 29.9. 13:00-16:45 | Zoom | Ruda: PhD studies, GAUK, other non-IT topics |
Wednesday 30.9. 10:00-16:00 | Zoom | Milan: SGE recap, SGE assignments, GPU scheduling |
Thursday 1.10. 10:00-15:00 | Zoom | Milan: Spark, assignments |
Thursday 1.10. 15:30-16:30 | Zoom | Martin: Publications, Q&A |
In order to pass the course, you have to attend the meetings and do all the required assignments.
This page describes a possible initial configuration of your Linux environment. You are of course free to modify it in any way you want :-)
The .profile
is run automatically when you log in (i.e., when you log in
to your desktop of to a machine via SSH). Note that .bash_profile
has
precedence, so if you have it, .profile
will not be used. Therefore,
if you have .bash_profile
, move its content to .profile
and then remove
.bash_profile
(or the other way around).
# Umask for UFAL gives writable access to the group
umask 002
# If you have a favourite editor, use it here instead of vim
export EDITOR=vim
# Initialize SGE
[ -f /opt/LRC/common/settings.sh ] && . /opt/LRC/common/settings.sh
# Add ~/bin to PATH; useful if you want to write own scripts
export PATH="$HOME/bin:$PATH"
# Add ~/.local/bin to PATH; directory of binaries installed by Python
export PATH="$HOME/.local/bin:$PATH"
# Add Spark to PATH
export PATH="/net/projects/spark/bin:/net/projects/spark/sge:/net/projects/spark/sbt/bin:$PATH"
# Export paths to CUDA 10.1 and cuDNN 7.6; written in Oct 2020, newer versions may be required later
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/cuda/10.1/lib64:/opt/cuda/10.1/cudnn/7.6/lib64
# Make sure LC_CTYPE and LC_COLLATE are always the same; you can change it to a different one
export LC_CTYPE=cs_CZ.UTF-8 LC_COLLATE=cs_CZ.UTF-8
# Make sure TERM is set to a reasonable default
[ "$TERM" = linux ] && export TERM=xterm
# Run .bashrc if running bash
if [ -n "$BASH_VERSION" ]; then
[ -f ~/.bashrc ] && . ~/.bashrc
fi
The .bashrc
is run whenever you open a new terminal window. Note that
it is customary for your .profile
to also run .bashrc
(the last
three lines of the above file).
# If not running in interactive terminal, stop.
[ -z "$PS1" ] && return
# Set larger history without duplicates, ignore commands starting with space
export HISTCONTROL=ignorespace:erasedups HISTSIZE=8192 HISTFILESIZE=16384
shopt -s checkwinsize cmdhist histappend
# Set color prompt
PS1='${debian_chroot:+($debian_chroot)}\[\033[01;32m\]\u@\h\[\033[00m\]:\[\033[01;34m\]\w\[\033[00m\]\$ '
# enable color support of ls and also add handy aliases
if [ -x /usr/bin/dircolors ]; then
eval "`dircolors -b`"
alias ls='ls --color=auto'
fi
# enable color support of grep
export GREP_COLORS="ms=01;32:mc=01;32:sl=:cx=:fn=34:ln=36:bn=36:se=36"
alias grep='grep --color'
# enable color support of less
export LESS_TERMCAP_{md=$'\E[01;32m',me=$'\E[0m',us=$'\E[01;33m',ue=$'\E[0m'}
# enable programmable completion features
if [ -f /etc/bash_completion ]; then
. /etc/bash_completion
fi
The .sge_request
file contains default options for SGE commands. The
following options are a very sane default; you might consider also
adding -m n
to disable main notification (or some other variant of -m
,
see man qsub
).
-b y
-cwd
If you submit screen -D -m
as an SGE job, screen
starts without
a connected terminal and cannot copy sane settings from it; it is
therefore a good idea to have them in .screenrc
.
altscreen
defbce on
defencoding utf-8
defflow off
defscrollback 65536
term xterm #or other, if you prefer
hardstatus on
hardstatus alwayslastline
hardstatus string "%{= bw}Screen: %Lw"
caption string "%?%F%{.R.}%?%3n %t%? [%h]%?"
setenv PROMPT_COMMAND "echo -ne '\ek\e\\\\\eksh '\${PWD##*/}'\e\\\\'"
shelltitle "$ |bash"
startup_message off
See the .profile
in the Config tab or
initialization instructions on ÚFAL wiki.
qsub
: submit a job for execution-b [y|n]
binary or a script (always use -b y
)-cwd
keep current working directory (probably you always want to use -cwd
)-v variable[=value]
defines or redefines environment variable-V
export all environment variables-N name
set name of the job-o outpath
set file with the standard output of the job; default $JOB_NAME.o$JOB_ID
-e outpath
set file with the standard error of the job; default $JOB_NAME.e$JOB_ID
-j [y|n]
merge standard output and error-sync [y|n]
wait till the job finishes-l resource1=value,resource2=value
sets required resources
mem_free=2G
required amount of memoryh_data=20G
maximum amount of memory; stop job if exceeded-q queue
submits only to given queue-pe parallen_environment machines
uses a given parallel environment-hold_jid comma_separated_job_list
jobs that must finish before this job starts-p priority
sets the priority of your job; the default one is -100 and it has to be less or equal to zeroJOB_ID
JOB_NAME
-t 1-n
start array job with tasks numbered SGE_TASK_ID
$JOB_NAME.[eo]$JOB_ID.$TASK_ID
-t m-n[:s]
start array job with tasks SGE_TASK_FIRST
, SGE_TASK_LAST
, SGE_TASK_STEPSIZE
-tc j
run at most tasks simultaneously-hold_jid_ad comma_separated_job_list
array jobs that must finish before this job starts; task depends only on task of specified jobsWe currently have the following queues
cpu-ms.q
, cpu-troja.q
gpu-ms.q
, gpu-troja.q
The following queue specifications are available
queue_name
: the queue_name
can be a wildcard, i.e., cpu-*
queue_name@machine
:
machine
can also be a wildcardmachine
can even be !(achilles3|hector*)
SGE supports jobs with multiple threads/machines. Default parallel environments are
-pe smp 16
: ask for 16 threads on a single machine-pe make 16
: ask for 16 simultaneously running processesqstat
: list of running jobsqstat -j job_id
qstat -u user
shows jobs of a given user, and qstat -u "*"
shows all jobs~straka/bin.local/qs
aggregates a lot of qstat
outputsqalter
: modify properties of a submitted/running job with given idqalter 123 -tc 10
qdel
: stops jobs with given idsqdel job_id.task_id
removes only one taskqrsh
: start an interactive shellssh
sol[1-9]
and qrsh
from therescreen -D -m
using qsub
~straka/.screenrc
for some defaultsThe following script processes a Wikipedia file described in Assignments and returns sorted list of article names.
articles.sh
(available in /net/data/npfl118/examples/
)#!/bin/bash
set -e
# Parse arguments
[ "$#" -ge 3 ] || { echo Usage: "$0 input_file outdir tasks [conc_tasks]" >&2;exit 1;}
input_file="$1"
output_dir="$2"
tasks="$3"
conc_tasks="$4"
# Check that input file exists and get its size
[ -f "$input_file" ] || { echo File $input_file does not exist >&2; exit 1; }
# Check that output dir does not exist and create it
[ -d "$output_dir" ] && { echo Directory $output_dir already exists >&2; exit 1;}
mkdir -p "$output_dir"
# Run distributed computations
qsub -cwd -b y -sync y -o "$output_dir" -e "$output_dir" -t 1-"$tasks" ${conc_tasks:+-tc $conc_tasks} \
./articles_distributed.sh "$tasks" "$input_file" "$output_dir"/articles.txt
# Merge all results
sort -m `seq -f "$output_dir/articles.txt.%g" 1 "$tasks"` > "$output_dir"/articles.txt
rm `seq -f "$output_dir/articles.txt.%g" 1 "$tasks"`
articles_distributed.sh
(available in /net/data/npfl118/examples/
)#!/bin/bash
set -e
# Parse arguments
[ "$#" -ge 3 ] || { echo Usage: $0 total_tasks input output_file >&2; exit 1;}
tasks="$1"
input_file="$2"
output_file="$3"
# Parse SGE_TASK_ID and compute file offset
[ -n "$SGE_TASK_ID" ] || { echo Variable SGE_TASK_ID is not set >&2; exit 1; }
task="$SGE_TASK_ID"
output_file="$output_file.$task"
# Run computation outputting to temporary file
tmp_file="`mktemp`"
trap "rm -f \"$tmp_file\"" EXIT
split -n l/$task/$tasks "$input_file" | cut -f1 | sort > "$tmp_file"
# On success move temporary file to output
mv "$tmp_file" "$output_file"
We give only a quick overview here, more detailed treatment of the GPU cluster can be found in ÚFAL wiki.
GPU jobs are scheduled as SGE jobs, but in special queues gpu-ms.q
and gpu-troja.q
.
You need to specify how many GPUs and of what kind you want, using
-l gpu=3
: ask for 3 GPUs on a single machine-l gpu=1,gpu_ram=8G
: ask for at least 8GB GPUDuring execution, CUDA_VISIBILE_DEVICES
is set to the allocated GPUs.
Note that qrsh
jobs by default does not read the environment variables created
by SGE, so you need to use qrsh -l ... -pty yes bash -l
.
Then, you need a framework which can use the GPU, and you also need to set paths correctly:
export LD_LIBRARY_PATH="/opt/cuda/11.2/lib64:/opt/cuda/11.2/cudnn/8.1.1/lib64:/opt/cuda/11.2/nccl/2.8.4/lib:/opt/cuda/11.2/extras/CUPTI/lib64:$LD_LIBRARY_PATH"
Spark is a framework for distributed computations. Natively it works in Python, Scala and Java.
Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.
You need to set PATH
variable to include Spark binaries, see
.profile
in the Config tab.
An interactive ipython shell can be started using
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
(use pip3 install --user ipython
if you do not have ipython3
).
Such shell will use current cluster, starting a local cluster with as many threads as cores.
To create a distributed cluster using SGE, you can run one of the following commands:
spark-qrsh workers memory_per_worker
: start Spark cluster and run qrsh
inside itspark-qsub workers memory_per_worker command [arguments...]
: start Spark
cluster and execute the given command inside itStart by running spark-qrsh 50 1G
. When the cluster starts, it prints a URL
where it can be monitored. After the cluster starts, execute
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
.
Then, try running the following:
(sc.textFile("/net/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda c1, c2: c1 + c2)
.sortBy(lambda word_count: word_count[1], ascending=False)
.take(10))
To execute a script instead of running from an interactive shell, you need to
create the SparkContext
manually:
word_count.py
(available in /net/data/npfl118/examples/
)#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("input", type=str, help="Input file/path")
parser.add_argument("output", type=str, help="Output directory")
args = parser.parse_args()
import pyspark
sc = pyspark.SparkContext()
input = sc.textFile(args.input, 3*sc.defaultParallelism)
words = input.flatMap(lambda line: line.split())
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2)
sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False)
sorted.saveAsTextFile(args.output)
You can run such script using
spark-submit script.py input_path output_path
If you want to use a specific virtual environment or python2
, you can use
PYSPARK_PYTHON=path_to_python_in_virtual_env_or_python2 spark-submit ...
sc.textFile
,
sc.parallelize
, …rdd.collect
,
rdd.take
,
rdd.saveAsTextFile
,
rdd.coalesce
,
rdd.repartition
, …rdd.map
,
rdd.flatMap
,
rdd.count
,
rdd.distinct
,
rdd.sortByKey
,
rdd.reduceByKey
,
rdd.groupByKey
,
rdd.sample
, …rdd.cache
,
rdd.unpersist
,
rdd.pipe
, …For the assignments, you can find the input data in /net/data/npfl118
.
Most assignments use the following Wikipedia data:
wiki/cs/wiki.txt
: Czech Wikipedia data (Sep 2009), file size 195MB, 124k articles.wiki/en/wiki.txt
: English Wikipedia data (Sep 2009), File size 4.9BG, 2.9M articles.wiki/cs/wiki-small.txt
, wiki/en/wiki-small.txt
: First 1000 articles of the
respective Wikipedias.The files are in UTF-8 and contain one article per line. Article name is
separated by a \t
character from the article content.
required
Template: You can start with /net/data/npfl118/examples/{articles.sh,articles_distributed.sh}
Implement a SGE distributed job to create a list of unique words used in the articles. Convert the article texts to lowercase to ignore case.
Because the article data is not tokenized, use the provided
/net/data/npfl118/wiki/{cs,en}/tokenizer
, which reads untokenized UTF-8 text from
standard input and produces tokenized UTF-8 text on standard output. It
preserves line breaks and separates tokens on each line by exactly one space.
optional
In a distributed way, compute inverted index – for every lemma from the articles, compute ascending
(article id, ascending positions of occurrences as word indices) pairs. In
order to do so, number the articles using consecutive integers and produce also
a list of articles representing this mapping (the article on line is the
article with id ; you can use the example articles.sh
).
The output should be a file with the list of articles ordered by article id, and a file with one lemma on a line in this format:
lemma \t article_id \t space separated occurence indices \t article_id \t space separated occurence indices ...
Both the article_id
s and the occurence indices
should be in ascending order.
To generate the lemmas, use the provided
/net/data/npfl118/wiki/{cs,en}/lemmatizer
, which again reads untokenized UTF-8
text and outputs the space separated lemmas on the output, preserving line
breaks.
required
Template: /net/data/npfl118/assignments/gpu_multiplication.py
Install TensorFlow 2.6 in a virtual environment:
python3 -m venv venv
venv/bin/python -m pip install --upgrade pip setuptools
venv/bin/python -m pip install tensorflow==2.7
Also, make sure you have added CUDA and cuDNN to your paths, see the end of the SGE page.
Finally, use /net/data/npfl118/assignments/gpu_multiplication.py
to measure how long it
takes to compute matrix multiplication, both on a CPU and GPU version.
The given script measures the required time for all given matrix dimensions.
Finally, estimate the speedup for this task of using:
required
Template: /net/data/npfl118/assignments/spark_lemmas.py
Using the provided /net/data/npfl118/wiki/{cs,en}/lemmatizer
, generate
list of 100 most frequent lemmas in Czech and English wiki on standard output.
To utilize the lemmatizer, use rdd.pipe
.
required
Template: /net/data/npfl118/assignments/spark_anagrams.py
Two words are anagrams if one is a character permutation of the other (ignoring case).
For a given wiki language, find all anagram classes that contain at least different words (a parameter of the script). Output each anagram class (unique words with the same character permutation) on a separate line.
Use the /net/data/npfl118/wiki/{cs,en}/tokenizer
, to tokenize the input,
again using rdd.pipe
.
either this or spark_kmeans
required
Template: /net/data/npfl118/assignments/spark_inverted_index.py
Compute the inverted index in the format described in inverted_index
assignment.
either this or spark_inverted_index
required
Template: /net/data/npfl118/assignments/spark_kmeans.py
Implement the basic variant of the K-means clustering algorithm.
Use the following data:
Path | Number of points | Number of dimensions | Number of clusters |
---|---|---|---|
/net/data/npfl118/points/points-small.txt |
10000 | 50 | 50 |
/net/data/npfl118/points/points-medium.txt |
100000 | 100 | 100 |
/net/data/npfl118/points/points-large.txt |
500000 | 200 | 200 |
Run for the given number of iterations. Print the distance by which the centers move each iteration, and stop the algorithm if this distance is less than a given epsilon. Once finished, print the found centers.
Introduction to ÚFAL at ÚFAL wiki (you will need an ÚFAL wiki account to access that site; each ÚFAL PhD student is entitles to get a wiki account).