Of Apache Spark, Hadoop, Vagrant, VirtualBox and IBM Bluemix Services – Part 5 – Spark applications in PIA project

The main reason for choosing Spark was a second project which we developed for the course “Programming Intelligent Applications”. For this project we wanted to implement a framework which is able to monitor important events (e.g. terror, natural disasters) on the world through Twitter.

To separate important tweets from others we use Latent Dirichlet Allocation (LDA) which is an algorithm for topic modelling. LDA is able to extract distinct topics through relations between words. An example for such a relation would be that the word “terror” is likely to be used with the word “attack”. This information is enough to generate separate topics for a bunch of documents, in our case tweets. Therefore, the need to annotate training data to learn word distributions for topics is not necessary and LDA is an algorithm which can learn completely unsupervised. For most machine learning applications the performance is getting better with more training data, and this also applies for our application.

The most important part was to set up a pipeline which is able to fetch tweets and regularly start a new LDA training with all the collected data. After every finished training the best available model should be evaluated through handpicked evaluation data and used for future filtering of important tweets. Completely autonomous, without any need of human supervision.

However, a vast amount of data need to be processed. Tweets need to be preprocessed for the training and after that the training should be as fast as possible to generate new and better models in feasible time. Both the preprocessing and the training with LDA are algorithms which can be distributed and therefore they are very suitable for clusters.

Spark Applications

How many Tweets have location information?

At the beginning of the project we had to choose the strategy how we want to extract the location of a tweet. One possibility was to use the location information which was sent with the tweet. To evaluate if this is a expressive metric, we checked how many tweets actually have information about their location. In this case, location means the location of the event the tweet refers to and not the users location.

For the training of our model we collected about 40GB of tweets. We uploaded this data into our HDFS of IBM Bluemix and with the help of SparkSQL we could analyze the tweets very fast and easily. The result was that only ~17% of our tweets from the trainings data had an information about the location which was not meaningful enough. So we had to look for an alternative way to extract the location of tweets. The source code can be recognized on our GitHub repository.

Topic Modelling

In our project in PIA we implemented topic modelling (LDA) with the python library gensim. For test cases we tried to reimplement the project with Apache Spark to calculate our model on a computer cluster. Therefore we used the Apache Spark machine learing library (MLlib). Since we don’t want to go into the depths of the machine learning algorithm, we explain some code in the following and outline why the respective step is necessary.

The usage of the distributed implementation of LDA is quite simple.

int numTopics = 3;
LDA lda = new LDA().setK(numTopics).setMaxIterations(100);
LDAModel model = lda.run(documents);

There is a class called LDA in the Spark machine learning library (MLlib) which has to be instantiated. Then we could set the parameters like the number of topics which should be found and the maximum amount of iterations. On this object we call a method run which expects the training data as parameters. This method returns the machine learning model, which then can be used to classify a tweet to a certain topic. The challenge was the editing of the data (the variable documents) which will be passed to the run method as parameter. Through the functional style programming API in Apache Spark this could be realized very elegantly. The data we had to preprocess were tweets stored as json files. A tweet json object has a lot of information like the actual text, user, location, date etc.

Firstly, we had to create a corpus of the tweets of the training data. The corpus consists of tuples. A tuple again consists of a word which is occurred in the training data and the number of occurrences in the training data. To achieve this each tweet has to be tokenized and preprocessed.

DataFrame dfTweets = sqlContext.read().json("hdfs://.../user/tweets/*.json");
dfTweets.registerTempTable("tweets");
DataFrame dfTweetsText = sqlContext.sql("SELECT text FROM tweets");

// preprocess tweets
JavaRDD<List<String>> tokenized = dfTweetsText.javaRDD()
    .map(row -> {
        // get text and transform to lowercase
        String text = row.getString(0).toLowerCase();
        // text remove html markup
        text = Jsoup.parse(text).text();
        // remove emojis
        text = unicodeOutliers.matcher(text).replaceAll(" ");
        return text;
    })
    .map(text -> Arrays.asList(text.trim().split(" "))) // split tweet in words
    .filter(words -> words.size() > 3); //remove tweets with less/eq than 3 words

The code above is reduced version of the original, because this would go beyond the scope. First, we read the json files from the HDFS via the Spark SQL Context and register a table “tweets”. Like mentioned earlier a tweet json has many attributes. Hence the tweets data frame has a lot of columns. Therefore we create a new data frame with only the text column and save it in the variable dfTweetsText (line 3). On this object we call the method javaRDD which leads to that the data sets of the data frame will be transformed to an RDD which can be processed with high-level functions. In the first map transformation the string will be transformed to lowercase as well as html mark-up and emojis will be removed. In the next step each tweet will be split into words. Finally, we filter out the tweets that have less than three words after preprocessing, because these tweets have no semantic value anymore. The result of this preprocessing is an RDD consisting of lists of strings. A list of the RDD represents a tweet and the entries of a list are the words of a tweet.
In the next step we create the corpus. Like mentioned above the corpus consists of key-value pairs where the key is the word and the value the occurrences of the word in all tweets. We transform the in the previous code snippet created tokenized RDD.

// create corpus
JavaPairRDD<String, Integer> corpus = tokenized.flatMap(words -> words)
                                  .mapToPair(word -> new Tuple2<String, Integer>(word,1)) // build pair of word and integer
                                  .reduceByKey((a,b) -> a+b) // sum up the word occurences
                                  .cache();

With the method flatMap the rdd with lists of words will be flattened to a stream of words. In step 2 tuples will be formed with the method mapToPair. In the reduceByKey step the values of all tuples with the same key will be accumulated. The result is the desired corpus which will be cached. In the next step the vector representation of the documents (tweets) which the LDA algorithm expects could be created with the corpus and the tokenized object. As we have done could be seen on our GitHub project, because at this point we just want to gave a short view of implementing machine learning applications with Spark. We find Spark is very pleasant for implementing distributed machine learning applications.

In the finally part of this article series we discuss Apache Spark and Apache Hadoop and how they relate to each other.

Part 6 – Apache Spark and/vs Hadoop?

Leave a Reply

Your email address will not be published. Required fields are marked *