Natural language processing on computational cluster

The aim of the course is to introduce methods required in natural language processing (processing huge data sets in distributed environment and performing machine learning) and show how to effectively execute them on ÚFAL computational Linux cluster. The course will cover ÚFAL network and cluster architecture, SGE (Sun/Oracle/Son of Grid Engine), related Linux tools and best practices.

The course follows the outline in the ÚFAL wiki: Introduction to ÚFAL (you will need an ÚFAL wiki account to access that site; each ÚFAL PhD student is entitles to get a wiki account).

The whole course is taught in several first weeks of the semester. If you plan to attend the course, please contact any of the Guarantors listed below.


SIS code: NPFL118
Semester: winter
E-credits: 3
Examination: 0/2 C
Guarantors: Milan Straka, Martin Popel, Rudolf Rosa

Preliminary 2020 Schedule

when where who: what
Wednesday 23.9. 14:00-18:00 Zoom Ruda+Sangeet: intro to ÚFAL (non-IT topics), Linux network, CPU and GPU cluster
Tuesday 29.9. 10:00-12:00 Zoom Martin: CPU and GPU cluster Q&A, IT tricks, Python, Publications
Tuesday 29.9. 13:00-16:45 Zoom Ruda: PhD studies, GAUK, other non-IT topics
Wednesday 30.9. 10:00-16:00 Zoom Milan: SGE recap, SGE assignments, GPU scheduling
Thursday 1.10. 10:00-15:00 Zoom Milan: Spark, assignments
Thursday 1.10. 15:30-16:30 Zoom Martin: Publications, Q&A


In order to pass the course, you have to attend the meetings and do all the required assignments.

  • originally Sun Grid Engine, since 2001, open-source
  • in 2009 changed to Oracle Grid Engine, no longer open-source
  • several open-source forks, most importantly Son of Grid Engine


Initialization instructions are on ÚFAL wiki.

In addition, it is a good idea to create ~/.sge_request which can specify default arguments, with at least

-b y
# optionally also -m n, which disables default mail notification

Quick Intro

qsub: submit a job for execution

  • -b [y|n] binary or a script (always use -b y)
  • -cwd keep current working directory (probably you always want to use -cwd)
  • -v variable[=value] defines or redefines environment variable
  • -V export all environment variables
  • -N name set name of the job
  • -o outpath set file with the standard output of the job; default $JOB_NAME.o$JOB_ID
  • -e outpath set file with the standard error of the job; default $JOB_NAME.e$JOB_ID
  • -j [y|n] merge standard output and error
  • -sync [y|n] wait till the job finishes
  • -l resource1=value,resource2=value sets required resources
    • mem_free=2G required amount of memory
    • h_data=20G maximum amount of memory; stop job if exceeded
  • -q queue submits only to given queue
  • -pe parallen_environment machines uses a given parallel environment
  • -hold_jid comma_separated_job_list jobs that must finish before this job starts
  • -p priority sets the priority of your job; the default one is -100 and it has to be less or equal to zero
  • environmental variable JOB_ID
  • environmental variable JOB_NAME

Array Jobs

  • -t 1-n start array job with nn tasks numbered 1n1 … n
  • environmental variable SGE_TASK_ID
  • output and error files $JOB_NAME.[eo]$JOB_ID.$TASK_ID
  • -t m-n[:s] start array job with tasks m,m+s,,nm, m+s, …, n
  • environmental variables SGE_TASK_FIRST, SGE_TASK_LAST, SGE_TASK_STEPSIZE
  • -tc j run at most jj tasks simultaneously
  • -hold_jid_ad comma_separated_job_list array jobs that must finish before this job starts; task ii depends only on task ii of specified jobs


We currently have the following queues

  • cpu-ms.q, cpu-troja.q
  • gpu-ms.q, gpu-troja.q

The following queue specifications are available

  • queue_name: the queue_name can be a wildcard, i.e., cpu-*
  • queue_name@machine:
    • the machine can also be a wildcard
    • the machine can even be !(achilles3|hector*)

Parallel Environments

SGE supports jobs with multiple threads/machines. Default parallel environments are

  • -pe smp 16: ask for 16 threads on a single machine
  • -pe make 16: ask for 16 simultaneously running processes

qstat: list of running jobs

  • detailed information about a job can be obtained using qstat -j job_id
  • qstat -u user shows jobs of a given user, and qstat -u "*" shows all jobs
  • ~straka/bin.local/qs aggregates a lot of qstat outputs

qalter: modify properties of a submitted/running job with given id

  • only some properties can be modified, i.e., qalter 123 -tc 10

qdel: stops jobs with given ids

  • for array jobs, qdel job_id.task_id removes only one task

qrsh: start an interactive shell

  • think ssh
  • unfortunately, the job is stopped when connection is broken
    • you can have a screen/tmux/… on sol[1-9] and qrsh from there
    • you can submit screen -D -m using qsub
      • look in ~straka/.screenrc for some defaults

Simple Array Job Example

The following script processes a Wikipedia file described in Assignments and returns sorted list of article names.

  • (available in /net/data/npfl118/examples/)

set -e

# Parse arguments
[ "$#" -ge 3 ] || { echo Usage: "$0 input_file outdir tasks [conc_tasks]" >&2;exit 1;}

# Check that input file exists and get its size
[ -f "$input_file" ] || { echo File $input_file does not exist >&2; exit 1; }

# Check that output dir does not exist and create it
[ -d "$output_dir" ] && { echo Directory $output_dir already exists >&2; exit 1;}
mkdir -p "$output_dir"

# Run distributed computations
qsub -cwd -b y -sync y -o "$output_dir" -e "$output_dir" -t 1-"$tasks" ${conc_tasks:+-tc $conc_tasks} \
  ./ "$tasks" "$input_file" "$output_dir"/articles.txt

# Merge all results
sort -m `seq -f "$output_dir/articles.txt.%g" 1 "$tasks"` > "$output_dir"/articles.txt
rm `seq -f "$output_dir/articles.txt.%g" 1 "$tasks"`
  • (available in /net/data/npfl118/examples/)

set -e

# Parse arguments
[ "$#" -ge 3 ] || { echo Usage: $0 total_tasks input output_file >&2; exit 1;}

# Parse SGE_TASK_ID and compute file offset
[ -n "$SGE_TASK_ID" ] || { echo Variable SGE_TASK_ID is not set >&2; exit 1; }

# Run computation outputting to temporary file
trap "rm -f \"$tmp_file\"" EXIT

split -n l/$task/$tasks "$input_file" | cut -f1 | sort > "$tmp_file"

# On success move temporary file to output
mv "$tmp_file" "$output_file"


We give only a quick overview here, more detailed treatment of the GPU cluster can be found in ÚFAL wiki.

GPU jobs are scheduled as SGE jobs, but in special queues gpu-ms.q and gpu-troja.q. You need to specify how many GPUs and of what kind you want, using

  • -l gpu=3: ask for 3 GPUs on a single machine
  • -l gpu=1,gpu_ram=8G: ask for at least 8GB GPU

During execution, CUDA_VISIBILE_DEVICES is set to the allocated GPUs. Note that qrsh jobs by default does not read the environment variables created by SGE, so you need to use qrsh -l ... -pty yes bash -l.

Then, you need a framework which can use the GPU, and you also need to set paths correctly:

  • To use CUDA 10.1 and cuDNN 7.6, use export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/cuda/10.1/lib64:/opt/cuda/10.1/cudnn/7.6/lib64:/cuda/10.1/extras/CUPTI/lib64

Spark is a framework for distributed computations. Natively it works in Python, Scala and Java.

Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.


You need to add the following to your .profile (or other suitable place):

export PATH="/net/projects/spark/bin:/net/projects/spark/sge:$PATH"


An interactive ipython shell can be started using

PYSPARK_DRIVER_PYTHON=ipython3 pyspark

(use pip3 install --user ipython if you do not have ipython3).

Such shell will use current cluster, starting a local cluster with as many threads as cores.

To create a distributed cluster using SGE, you can run one of the following commands:

  • spark-qrsh workers memory_per_worker: start Spark cluster and run qrsh inside it
  • spark-qsub workers memory_per_worker command [arguments...]: start Spark cluster and execute the given command inside it


Start by running spark-qrsh 50 1G. When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute PYSPARK_DRIVER_PYTHON=ipython3 pyspark.

Then, try running the following:

(sc.textFile("/net/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
   .flatMap(lambda line: line.split())
   .map(lambda word: (word, 1))
   .reduceByKey(lambda c1, c2: c1 + c2)
   .sortBy(lambda word_count: word_count[1], ascending=False)

Running a Script

To execute a script instead of running from an interactive shell, you need to create the SparkContext manually:

  • (available in /net/data/npfl118/examples/)
#!/usr/bin/env python
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("input", type=str, help="Input file/path")
parser.add_argument("output", type=str, help="Output directory")
args = parser.parse_args()

import pyspark

sc = pyspark.SparkContext()
input = sc.textFile(args.input, 3*sc.defaultParallelism)
words = input.flatMap(lambda line: line.split())
counts = word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2)
sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False)

You can run such script using

spark-submit input_path output_path

If you want to use a specific virtual environment or python2, you can use

PYSPARK_PYTHON=path_to_python_in_virtual_env_or_python2 spark-submit ...

Basic Methods

Further Pointers

For the assignments, you can find the input data ini /net/data/npfl118. Notably, there are:

  • Wikipedia data:

    • wiki/cs/wiki.txt: Czech Wikipedia data (Sep 2009), file size 195MB, 124k articles.
    • wiki/en/wiki.txt: English Wikipedia data (Sep 2009), File size 4.9BG, 2.9M articles.
    • wiki/cs/wiki-small.txt, wiki/en/wiki-small.txt: first 1000 articles

    The files are in UTF-8 and contain one article per line. Article name is separated by a \t character from the article content.


 required Template: You can start with /net/data/npfl118/examples/{,}

Implement a SGE distributed job to create a list of unique words used in the articles. Convert the article texts to lowercase to ignore case.

Because the article data is not tokenized, use the provided /net/data/npfl118/wiki/{cs,en}/tokenizer, which reads untokenized UTF-8 text from standard input and produces tokenized UTF-8 text on standard output. It preserves line breaks and separates tokens on each line by exactly one space.



In a distributed way, compute inverted index – for every lemma from the articles, compute ascending (article id, ascending positions of occurrences as word indices) pairs. In order to do so, number the articles using consecutive integers and produce also a list of articles representing this mapping (the article on line ii is the article with id ii; you can use the example

The output should be a file with the list of articles ordered by article id, and a file with one lemma on a line in this format:

lemma \t article_id \t space separated occurence indices \t article_id \t space separated occurence indices ...

Both the article_ids and the occurence indices should be in ascending order.

To generate the lemmas, use the provided /net/data/npfl118/wiki/{cs,en}/lemmatizer, which again reads untokenized UTF-8 text and outputs the space separated lemmas on the output, preserving line breaks.


 required Template: /net/data/npfl118/assignments/

Install TensorFlow 2.3 in a virtual environment:

  • python3 -m venv venv
  • venv/bin/python -m pip install --upgrade pip setuptools
  • venv/bin/python -m pip install tensorflow==2.3.1

Also, make sure you have added CUDA and cuDNN to your paths, see the end of the SGE page.

Finally, use /net/data/npfl118/assignments/ to measure how long it takes to compute matrix multiplication, both on a CPU and GPU version. The given script measures the required time for all given matrix dimensions.

  • For CPU version with 1 thread, use dimensions up to 10000 with step 1000.
  • For CPU version with 8 threads, use dimensions up to 10000 with step 1000.
  • For GPU version, use dimensions up to 20000 with step 1000.

Finally, estimate the speedup for this task of using:

  • 8 CPU threads instead of 1 CPU thread;
  • GPU instead of 1 CPU thread.


 required Template: /net/data/npfl118/assignments/

Using the provided /net/data/npfl118/wiki/{cs,en}/lemmatizer, generate list of 100 most frequent lemmas in Czech and English wiki on standard output.

To utilize the lemmatizer, use rdd.pipe.


 required Template: /net/data/npfl118/assignments/

Two words are anagrams if one is a character permutation of the other (ignoring case).

For a given wiki language, find all anagram classes that contain at least AA words (a parameter of the script). Output each anagram class (unique words with the same character permutation) on a separate line.

Use the /net/data/npfl118/wiki/{cs,en}/tokenizer, to tokenize the input, again using rdd.pipe.


 either this or spark_kmeans required Template: /net/data/npfl118/assignments/

Compute the inverted index in the format described in inverted_index assignment.


 either this or spark_inverted_index required Template: /net/data/npfl118/assignments/

Implement the basic variant of the K-means clustering algorithm.

Use the following data:

Path Number of points Number of dimensions Number of clusters
/net/data/npfl118/points/points-small.txt 10000 50 50
/net/data/npfl118/points/points-medium.txt 100000 100 100
/net/data/npfl118/points/points-large.txt 500000 200 200

Run for the given number of iterations. Print the distance by which the centers move each iteration, and stop the algorithm if this distance is less than a given epsilon. Once finished, print the found centers.