JavaScript Performance optimization with respect to the upcoming WebAssembly standard

Tim Tenckhoff | Computer Science and Media

1. Introduction

Speed and performance of the (worldwide) web advanced considerably over the last decades. With the development of sites more heavily reliant on JavaScript (JS Optimization, 2018), the consideration of actions to optimize the speed and performance of web applications grows in importance. This blogpost aims to summarize practical techniques to enhance the performance of JavaScript applications and provides a comparative outlook regarding the upcoming WebAssembly standard.

Let´s jump into hyperspeed…

  1. Introduction
    1. JavaScript 
    2. Why optimize? 
  2. How can the JavaScript Speed and Performance be increased?
    1. Interaction with Host Objects
    2. Dependency Management
    3. Event Binding
    4. Efficient Iterations
    5. Syntax 
  3. Is WebAssembly replacing JavaScript?
  4. Sources


Back in the days, in 1993 a company called Netscape was founded in America. Netscape aimed to exploit the potential of the burgeoning World Wide Web and created their own web browser Netscape Navigator (Speaking JavaScript 2014). As Netscape realized in 1995, that the web needed to become more dynamic, they decided to develop a scripting language with a syntax similar to Java’s to rule out other existing languages. In May 1995, the prototype of this language was written by the freshly hired software developer Brendan Eich within 10 days. The initial name of the created code was Mocha, which was later changed by the marketing to LiveScript. In December 1995, it was finally renamed to JavaScript to benefit from Java’s popularity (Speaking JavaScript 2014).

Today, every time the functionality of a web page exceeds to just display static content, e.g. by timely content updates, animated graphics, an interactive map or the submission of an input formular, JavaScript is probably involved to solve the given complexity. Thus it is the third layer cake of the standard web technologies, which adds dynamic behavior to an existing markup structure (e.g. defining paragraphs, headings, and data tables in HTML), customized with a definition of style rules (e.g. defining paragraphs, headings, and data tables in CSS). If a web browser loads a webpage, the JavaScript code is executed by the browser’s engine, after the HTML and CSS have been assembled and rendered into a web page (JavaScript 2019). This ensures that the content of the required page is already in place before the JavaScript code starts to run, as seen in Figure 1.

Figure 1: The execution of HTML, CSS and JavaScript in the Browser (JS Optimization, 2018)

The script language often abbreviated as JS includes a curly-bracket syntax, dynamic typing, object orientation, and first-class functions. Initially only implemented in and for web browsers on the client side, today’s JavaScript engines are now embedded in many other types of host software, including web servers (e.g. NodeJS), databases, or other non-web use-cases (e.g. PDF software).

Why optimize?

Speaking about the performance optimization in networks, most developers think about it in terms of the download and execution cost – sending more bytes of JavaScript code takes longer, depending on the users internet connection (JS Optimization 2018). Therefore, it can be said beforehand, that it generally makes sense to reduce the transferred network traffic, especially in areas where the available network connection type of users might not be 4G or Wi-Fi. But one of JavaScript’s heaviest costs regarding the performance is also the parse/compile time. The Google Chrome browser visualizes the time spend in these phases in the performance panel as seen in Figure 2. But why is it even important to optimize this time?

Figure 2: Parse and compile time of JavaScript (JS Optimization 2018)

The answer is, that if more time is spent parsing/compiling, there might be a significant delay in the user’s interaction with the website. The longer it takes to parse and compile the code, the longer it takes until a site becomes interactive. According to a comparison in an article by Google Developer Addy Osmani, JS is more likely to negatively impact a pages interactivity than other equivalently sized resources (JS Optimization 2018). As an example seen in Figure 3, the resource processing time of 170kb JavaScript bytes and the same amount of JPEG bytes require the same amount of 3,4 seconds network transmission time. But as the resource processing time to decode the image (0,064 seconds) and rasterize paint in the image (0,028 seconds) are relatively low, it takes much longer to parse the JavaScript code (~2 seconds) and execute it (~1,5 seconds). Due the fact that website users differ not only regarding their provided network connection, but also in the hardware they have, older hardware may also have a bad influence on increased execution time. This shows how much more JS can potentially delay the interactivity of a website due to parse, compile and execution costs and proves the need for optimization.

Figure 3: The difference between JavaScript and JPEG resource processing (JS Optimization 2018)

2. How can the JavaScript Speed and Performance be increased?

The question appearing at this point is the following: What needs to be practically done during the development of JavaScript to optimize websites or web applications regarding speed and interactivity. The depths of the world wide web expose several blogs and articles about this kind of optimization. The following section aims to sum up found techniques categorized by the underlying performance problem:

Interaction with Host Objects

The interaction with “evil” host objects needs to be avoided as Steven de Salas says in his blog (25 Techniques 2012)

As described before, JavaScript code is compiled by the browser’s scripting engine to machine code – offering an extraordinary increase in performance and speed. However, the interaction with host objects (in the browser) outside this native JavaScript environment raises a loss in performance, especially if these host objects are screen-rendered DOM (Document Object Model) objects (25 Techniques 2012). To prevent this, the interaction with these evil host objects needs to be minimized. Let’s take a look at how this can be done.

Rotation via CSS

A first useful approach is the usage of CSS-classes for DOM animations or interactions. Unlike JavaScript code, solutions like e.g. CSS3 Transitions, @keyframes, :before, :after are highly optimized by the browser (25 Techniques 2012). As shown above, the rotation of the small white square can either be animated by addressing the id via document.getElementById(“rotateThisJS”), triggered by a button starting the rotate() function below…

Figure 4: Rotation via JavaScript (How not to…)

…or by adding a CSS class (as seen in Figure 5) to the <div> element which works much more efficiently, especially if the respective website contains several animations. Except from animations, CSS is also able to handle interactions like e.g. hovering elements, thus it can be said, that the way JavaScript is optimized this time is not to use it in certain cases.

Figure 5: Animation via CSS

Speaking of selectors to pick DOM elements, the usage of jQuery allows a highly specific selection of elements based on tag names, classes and CSS (25 Techniques 2012). But according to the online blog article by Steven de Salas, it is important to be aware that this approach involves the potential of several iterations through underlying DOM elements to find the respective match. He states that this can be improved by picking nodes by ID. An example can be seen in Figure 6:

Figure 6: Different ways of DOM element selection

What also increases the (DOM interaction) performance, is to store references to browser objects during instantiation. If it can be expected, that the respective website is not going to change after instantiation, references to the DOM structure should be stored initially, when the page is created not only when they are needed. It is generally a bad idea to instantiate references DOM objects over and over again. That’s why it is rather advisable to create few references to objects during instantiation which are needed several times (25 Techniques 2012). If no reference to a DOM object has been stored and needs to be instantiated within a function, a local variable containing a reference to the required DOM object can be created. This speeds up the iteration considerably as the local variable is stored in the fastest and most accessible part of the stack (25 Techniques 2012).

The general amount of DOM-elements is also a criteria with respect to the performance. As the time, used for changes in the DOM is proportional to the complexity of the rendered HTML, this should also be considered as an important performance factor (Speeding up 2010).

Another important aspect regarding the DOM interaction is to batch (style) changes. Every DOM change causes the browser to do a re-rendering of the whole UI (25 Techniques 2012). Therefore, it should be avoided to apply each style change separately. The ideal approach to prevent this, is to do changes in one step, for example by adding a CSS class. The different approaches can be seen in Figure 7 below.

Figure 7: How to batch changes in DOM

Additionally it is recommended to build DOM elements separately before adding them to a website. As said before, every DOM requires a re-rendering. If a part of the DOM is built “off-line” the impact of appending it in one go is much smaller (25 Techniques 2012). Another approach is to buffer DOM content in scrollable <div> elements and to remove elements from the DOM that are not displayed on the screen, for example, outside the visible area of a scrollable <div>. These nodes are then reattached if necessary (Speeding up 2010).

Dependency Management

Figure 8: Checking the dependency Management of

Looking at different pages in the www, e.g. the cities’s website of Stuttgart, it can be observed that the screen rendering is delayed for the user until all script dependencies are fully loaded. As seen in Figure 8, some dependencies cause the delayed download of other dependencies that in return have to wait for each other. To solve this problem the active management and reduction of the dependency payload are a core part of performance optimization.

One approach to do so, is to reduce the general dependency on libraries to a minimum (25 Techniques 2012). This can be done by using as much in-browser technology as possible. For example the usage of document.getElementById(‘element-ID’) instead of using (and including) the jQuery library. Before adding a library to the project, it makes sense to evaluate whether all the included features are needed, or if single features can be extracted from the library and added separately. If this is the case, it is of course important to check whether the adopted code is subject to a license – to credit and acknowledge the author is recommended in any case (25 Techniques 2012).

Another important approach is the combination of multiple JavaScript files to bundled ones. The reason behind this is, that one network request with e.g. 10kb of data is transferred much faster than 10 requests with 1kb each (25 Techniques 2012). This difference is caused by lower bandwidth usage and network latency of the single request. To save additional traffic, the combined bundle files can also be minified and compressed afterwards. Minification tools, such as UglifyJS or babel-minify remove comments and whitespacing from the code. Compression tools as e.g. gzip or Brotli are able to compress text-based resources to smaller memory size. (JS Optimization 2018)

A further way to optimize the dependency management, is the usage of a post-load dependency manager for libraries and modules. Tools like Webpack or RequireJS allow, that the layout and frame of a website appears before all of the content is downloaded, by post-loading the required files in the background (JS Optimization 2018). This gives users a few extra seconds to familiarise themselves with the page (25 Techniques 2012).

By maximizing the usage of caching, the browser downloads the needed dependencies only at the first call and otherwise accesses the local copy. This can be done by manually adding eTags to files that need to be cached, and putting *.js files to cache into static URI locations. This communicates the browser to prefer the cached copy of scripts for all pages after the initial one (25 Techniques 2012).

Event Binding

To create interactive and responsive web applications, event binding and handling is an essential part. However, event bindings are hard to track due to their ‘hidden’ execution and can potentially cause performance degradation e.g. if they are fired repeatedly (25 Techniques 2012). Therefore it is important to keep track of the event execution throughout various use cases of the developed code to make sure that events are not fired multiple times or bind unnecessary resources (25 Techniques 2012).

To do so, it is especially important to pay attention to event handlers that fire in quick repetition. Browser events such as e.g. ‘mouse move’ and ‘resize’ are executed up to several hundred times each second. Thus, it is important to ensure that an event handler that reacts to one of these events can complete in less than 2-3 milliseconds (25 Techniques 2012). The box below visualizes the amount of events that are fired when the mouse is moved over an element.

hover me!

Another important point that needs to be taken care of, is the event unbinding. Every time an event handler is added to the code, it makes sense to consider the point when it is no longer needed and to make sure that it stops firing at this point. (Speeding up 2010) This avoids performance slumps through handlers that are bound multiple times, or events firing when they are no longer needed. One good approach to prevent this, is the usage of once-off execution constructs like or manually adding/coding the unbind behavior at the right place and time (25 Techniques 2012). The example below, shows the usage of – an event binding on each p element that is fired exactly once and unbinds itself afterwards. The implementation of this example can be seen in Figure 9.

Click the boxes to trigger a event!
Fired once. Also fired once. This is also fired only once.
Figure 9: The usage of

A last important part of the event binding optimization is to consider and understand the concept of event bubbling. A blog article by Alfa Jango describes the underlying difference between .bind(), .live(), and .delegate() events (Event Bubbling 2011).

Figure 10: Propagation of a click event through the DOM (Event Bubbling 2011)

Figure 10 shows what happens if e.g e a link is clicked that fires the click event on the link element, which triggers functions that are bound to that element’s click event: The click event propagates up the tree, to the next parent element and then to each ancestor element that the click event was triggered on one of the descendent elements (Event Bubbling 2011). Knowing this, the difference between the jQuery functions bind(), live() and delegate() can be explained:


jQuery scans the entire document for all $(‘a’) and binds the alert function to each of these click events.


jQuery binds the function to the $(document) tag including the parameters ‘click’ and ‘a’. If the 
event is fired, it checks if both parameters are true, then executes the function.


Similar to .live(), but binds the handler to a specific element, not the document.root.

The article says that .delegate() is better than .live(). But why?

$(document).delegate(‘a’, ‘click’, function() { blah() });

$(‘a’).live(‘click’, function() { blah() });

According to the blog entry (Event Bubbling 2011), delegate() can be preferred for two reasons:

Speed: $(‘a’) first scans for all a elements and saves them as objects, this consumes space and is therefore slower.

Flexibility: live() is linked to the object set of $(‘a’) elements, although it actually acts at the $(document) level..

Efficient Iterations

The next topic is the implementation of efficient iterations. As seen in Figure 11 below, the execution time for string operations grows exponentially during long iterations (String Performance 2008). This shows why iterations can often be the reason for performance flaws. Therefore it always makes sense to get rid of unnecessary loops, or calls inside of loops (25 Techniques 2012) .

Figure 11: Comparative String Performance Analysis (String Performance 2008)

One technique to avoid unnecessary loops, is to use JavaScript indexing. Native JavaScript objects can be used to store quick-lookup indexes to other objects, working in a similar way to how database indexes work (25 Techniques 2012). As seen in Figure 12 below it also speeds up finding objects by using e.g. the name as an index. This is highly optimized and avoids long search iterations.

Figure 12: Finding Objects by JavaScript indexing

Additionally it is always a good idea to use native JavaScript array functions such as push(), pop() and shift(), especially working with arrays. These functions also have a small overhead and are closely connected to their assembly language counterparts (25 Techniques 2012).

The difference between reference and primitive value types, also comes up in terms of efficient iterations. Primitive types such as String, Boolean or Integer are copied if they are handed over to a function. Reference types, such as Arrays, Objects or Dates are handed over as a light-weight reference. This knowledge should be considered if a reference is handed over to a function, running in an iteration: Obviously, it is better to avoiding frequent copying of primitive types and pass lightweight references to these functions.


A final option to consider for optimization is the usage of correct JavaScript syntax. Writing simple function patterns without being familiar to advanced native ECMAScript can lead to inefficient code. It is recommendable to try to stay up to date and learn how to apply these constructs.

One easy example (25 Techniques 2012) regarding this, is to prefer the usage of native, optimized constructs over self-written algorithms: Functions as e.g. Math.floor()or new Date().getTime() for timestamps don’t need to be rewritten. The operator === instead of == provides an optimized, faster type-based comparison (25 Techniques 2012). Furthermore, the switch statement can be used instead of long if-then-else blocks to provide an advantage during compilation, to name just a few examples.

3. Is WebAssembly replacing JavaScript?

On the 17th of June 2015, Brendan Eich (the creator of JavaScript) announced a new project that aims to make it easier to compile projects written in languages like C and C++ to run in browsers and other web related JavaScript environments (Why we need Web Assembly). The developing team consists of members from Apple, Google, Microsoft, Mozilla and others collective under the name of the W3C WebAssembly Community Group (Why we need Web Assembly). A blog article by Eric Elliot comments on these release announcements and states that „the future of the web platform looks brighter than ever“ (What is WebAssembly?).

But what exactly is WebAssembly? Elliot further explains that WebAssembly, often shortened as WASM, is a new language format. The code defines an AST (Abstract Syntax Tree) represented in a binary format that can be edited/developed as readable text. The instruction format has been built to compile languages such as C, C++, Java, Python and Rust and allows the deployment on the web and in server applications. Through WebAssembly it is now possible to run the respective code on the web at a native speed (WASM replace JS 2018). But WASM is also an improvement to JavaScript: The performance critical code that needs to be optimized can be implemented in WASM and imported like a standard JavaScript module (What is WebAssembly?). The blog entry by Elliot additionally explains that WASM is also an improvement for browsers. Through the fact, that browsers will be able to understand the binary code that can be compressed to smaller files than currently used Javascript files, smaller payloads would lead to faster delivery and make websites run faster (What is WebAssembly?).

But listing all these advantages, does WebAssembly have the potential to replace JavaScript in the nearest future? And is WebAssembly compilation really so much faster? A blog article from Winston Chen compares the performance of WebAssembly vs JavaScript and comes to a surprising result (WASM vs JS 2018). The performed experiment involved several implementations of matrix multiplications as a simple and computationally intensive way to compare JavaScript and C++ (in WASM). In summary it can be said that JavaScript performed better than WebAssembly on smaller array sizes and WebAssembly outperformed JavaScript on larger ones. Chen concludes, that outgoing from his results, JavaScript is still the best option for most web applications. According to him, Web Assembly was therefore “best used for computationally intense web applications, such as web games” (WASM vs JS 2018).

Among different articles throughout the internet, dealing with the question whether WASM is going to replace JavaScript, it is not possible to find a precise answer or prediction. But Brendan Eich, the creator of JavaScript himself, finds a relatively clear answer to the question if he was trying to KILL JavaScript (by developing WASM): “…We’re not killing JavaScript. I don’t think it’s even possible to kill JavaScript”(Why we need wasm 2015). The JavaScript ecosystem currently supports all major browser and most developers write libraries and frameworks in it (e.g. React, Bootstrap or Angular)(WASM replace JS 2018). In order to overtake JavaScript, any competitor (as WASM) would need to provide replacement options for all these libraries. Furthermore, it is not easily feasible to replace the existing code base of JavaScript based projects. With growing popularity in calculation intense projects as browser-based games, WebAssembly can possibly decrease the market share of JavaScript, but is not able to replace these already existing JS applications. It can be said, that the native speed improvements of WebAssembly are rather a complementation of the existing JavaScript features (WASM replace JS 2018). By using both, (e.g. by using WebAsembly run alongside JS using WASM JavaScript APIs) developers can benefit from the flexibility of JS and the native speed advantages of WASM in combination. Wrapping this up, the creator was right and is very unlikely that WASM is going to overtake JavaScript – still the single, dominating language of the web.

4. Sources

Speaking JavaScript 2014, Axel Rauschmeyer, Speaking JavaScript: An In-Depth Guide for Programmers
Chapter 4. How JavaScript Was Created
[Accessed 24 July 2019].

WebAssembly 2015, Eric Elliot, What is WebAssembly? [Online]
Available at:
[Accessed 28 July 2019]. 

Why we need wasm 2015, Eric Elliot, Why we Need WebAssembly [Online]
Available at:
[Accessed 28 July 2019]. 

WASM replace JS 2018, Vaibhav Shah, Will WebAssembly replace JavaScript? [Online]
Available at:
[Accessed 28 July 2019]. 

WASM vs JS 2018, Chen Winston, Performance Testing Web Assembly vs JavaScript [Online]
Available at:
[Accessed 27 July 2019]. 

25 Techniques 2012, Steven de Salas, 25 Techniques for Javascript Performance Optimization [Online]
Available at:
[Accessed 27 July 2019].

JS Optimization 2018, Addy Osmani, JavaScript Start-up Optimization [Online]
Available at:
[Accessed 27 July 2019].

JavaScript 2019, Chris David Mills (MDN web docs), What is JavaScript? [Online]
Available at:
[Accessed 27 July 2019].

Speeding up 2010, Yahoo Developer Network, Best Practices for Speeding Up Your Web Site [Online]
Available at:
[Accessed 27 July 2019].

String Performance 2008, Tom Trenka, String Performance: an Analysis [Online]
Available at:
[Accessed 27 July 2019].

Event Bubbling 2011 Alfa Jango, THE DIFFERENCE BETWEEN JQUERY’S .BIND(), .LIVE(), AND .DELEGATE() [Online]
Available at:
[Accessed 24 July 2019].

How to create and integrate a customised classifier based on IBM Visual Recognition

Helga Schwaighofer – hs082
Celine Wichmann – cw089
Lea Baumgärtner – lb092


Imagine you are having a bad day, but you don’t know what to do. Your friends are not available, but you’d like to have advice depending on your mood.
For that case, we created the Supporting Shellfish! This service generates advice based on the mood it recognises on your face.

The following blog post describes a step by step explanation of how to create a personalised classifier based on the IBM Visual Recognition cloud service and the integration of those functionalities into a JavaScript- / Flask-based Web Application.

Architecture of shelly the supporting shellfish web app

Research on different services

In order to realise our idea, we had to choose between different cloud based services in the field of image recognition or to be more specific, in the area of face recognition.

Machine Learning as a service is the overall definition for diverse cloud-based services providing functionalities in the area of Artificial Intelligence such as data pre-processing, model training and prediction. The prediction results can be used and integrated through REST APIs. First of all, we analysed three of the most popular companies and their services.

Google, Amazon and IBM. Which one should we choose?

All of those services provide the usage of pre-trained models via API or the possibility to create and use a customised model. This Website provides a very good overview of the detailed functionalities of the different services. However, for our case we focused on the following pros and cons of those different services:

Creating a customised classifier

After analysing the pros and cons of the different services, we decided to use IBM Cloud. The deciding factor for us was the pricing. But the well-structured documentation and the available tutorials also helped convince us.

Although IBM provides a facial emotion classifier, we decided to create our own facial expression recognizer based on Visual Recognition of IBM Watson for studying purposes.

We searched for different emotion datasets and found the MUG Facial Expression Database. After having read and accepted the license agreement we requested access. A few weeks later we received the necessary access credentials. The database provides videos and images of 52 different people expressing the emotions happiness, sadness, neutral, anger, surprise, disgust and fear.

To create our own classifier in IBM Visual Recognition, we had to summarise the data in a zip file per class / emotion and therefore created a whole new structure for the facial dataset. We had the possibility to choose between using the terminal or the well-structured user interface of IBM Watson Studio – we decided to use the later.

First, we configured the model:

Configuration of the Model in IBM Watson Studio

After the model was created, we were able to drag and drop our zipped training data on the right-hand side of the user interface below “2. Add from project”. We named the zip files equal to the classes we wanted to predict. We had to censor the faces in the following screenshots due to data protection. As soon as we finished uploading our training data files, we hit the button „Train Model “ and the training began.

Upload of training data and training process of customised model

After circa 15 to 20 minutes, the training finished successfully and we were able to embed our custom model into the backend of our web application.

These few lines of code enabled the integration of the customised model

Building the Web App

Parallel to this process, we created a one-page web application, using Python / Flask as backend and HTML / JavaScript as frontend.

Frontend Description

The frontend of our application is made up of one html site, which can be rendered as a Jinja template by flask. The functionalities are implemented in JavaScript and the design was created via Kube CSS.

We got two buttons: One enables the selection of an image from your local device and the other button enables the upload of that selected file. As soon as an image is selected, the user receives a preview of the image in a form next to Shelly, the Supporting Shellfish. After selecting the file, the image is encoded into a base64 format and sent to the backend. After pushing the Button “Upload File”, an XMLHttpRequest will be made.

Finally, the frontend waits for the status code of the backend and catches exceptions, if something went wrong.

Backend Description

The backend consists of two routes: one GET Route for the landing page and one POST Route for requesting the image from the frontend. The requested image will be decoded from base64 and processed by IBM visual recognition. Our classifier will predict the mood based on the received image and sends back a JSON File containing the predicted class with the highest probability.

Based on that prediction, a random advice will be picked from the corresponding advice-list and send to the frontend.


How does Shelly the Supporting Shellfish generate her advice?
First of all, upload a picture of your face. After hitting the button “Upload File”, Shelly will use the customised model via IBM Cloud and predict the mood on your face. Based on the recognised mood, she will provide you a more or less helpful advice.

Based on the recognised mood, Shelly will show you her empathy


Every member of the Supporting Shellfish Team has been active in the area of artificial intelligence. However, we wanted to analyse the advantages and disadvantages of integrating a cloud based service and the usage of “machine learning as a service” in an application.

The most interesting part for us was to create a customized model in a cloud. We were especially impressed by the functionality and usability of this process. The tough part was the selection of the dataset to train the model. We had to restructure the data to fit our needs and the requirements of IBM. After the training was completed, the integration of the model into our Web-App went smoothly and quite quickly.

If you are interested in the project, you can have a deeper insight here.

Mobile Security – How secure are our daily used devices?

Nowadays, the usage of mobile devices has become a part of our everyday life. A lot of sensitive and personal data is stored on these devices, which makes them more attractive targets for attackers. Also, many companies offer the possibility to work remotely, which results in storing confidential business information on private phones and therefore increases the organizations’ vulnerability. The following content shows what kind of attacks the mobile platform is facing and how secure we really are.

Continue reading

Crypto-Trading: Securing Million Worth API-Keys in a Distributed System

The teaching of IT security is often about trust boundaries – these are drawn at the interface of the system to the outside world. While this view is dangerous even with a monolithic system, it is simply wrong with a distributed system. Especially when the system’s data is so delicate that you don’t even want to trust all your own microservices.

In this essay, an approach is discussed to restrict access to data in a distributed system by means of cryptography. However, this is not only about security but also about the practicability and effects on the development.

Continue reading

How to create a K8s cluster with custom nodes in Rancher

Don’t you find it annoying not to be able to manage all your Kubernetes clusters at a glance? Ranger 2.0 offers an ideal solution. 

The following article is less a scientific post than a how-to guide to creating a new Kubernetes cluster with custom nodes in Ranger 2.0. 

But before we drift into the practical part, let’s take a look at what’s going on with Rancher and Kubernetes. We’ll briefly discuss the differences, origins, and use of the two tools. 

What is Kubernetes? 

Kubernetes, also often abbreviated as K8s, is a container orchestration system. An open source system for automating the deployment, scaling and management of container applications. In contrast to other orchestration systems, no fixed workflow is implemented. The actual state of the container system is continuously monitored and changed to the target state. K8s was originally developed by Google in 2014 and subsequently donated to the Cloud Native Computing Foundation (CNCF). 

So-called ‘pods’ representing the smallest unit which are being orchestrated. These are wrappers that consist of either a single container or several containers. They encapsulate these containers from the outside (like a legume). These pods do run on worker nodes (either physically or as virtual machines in a cluster). The cluster is controlled by one (or more) dedicated machine(s), the ‘Kubernetes Master’ (‘Master-Nodes’). 

What is Rancher? 

Rancher 2.0 is an open source container manager for managing multiple K8s clusters. It doesn’t matter if a cluster consists of custom nodes or a hosted Kubernetes cluster like GKE (Google Kubernetes Engine), EKS (Amazon Elastic Kubernetes Service) or AKS (Azure Kubernetes Service). Rancher is able to manage all these clusters centrally. Especially user administration and RBAC (Role Based Access Control), policy and project management, resource management, backup and recovery of clusters, monitoring, logging and much more are covered. 

As we have now seen, we are able to integrate multiple K8s clusters into Rancher. In our particular case, we don’t want to integrate a cluster from the big cloud supplier, but use our own hosts. So let’s take a look at what we need.


The following servers all run CentOs 7. Of course you can also use other operating systems.

We used seven hosts. One is only used for rancher. Three are used as master nodes and another three are used as worker nodes.

nunki[01:03][1:3]Master-Nodes (etcd)


Docker needs to be installed on all hosts that are to serve as master and worker nodes in the cluster and on the host on which our rancher instance is running?runs. Once Docker is installed, you need to start the Docker daemon. Most Linux distributions use systemctl (systemctl start docker) to start services. If you don’t have systemctl, use the service command. If you want Docker to automatically start at boot (recommended) you should enable the docker service as well. 

Further Setup
In this tutorial, we will not cover ‘SELinux’ configuration for Docker, so we disable it on every host. Edit the /etc/selinux/config file to set the SELINUX parameter to disabled, and then reboot the server. Execute getenforce to see the current mode of SELinux

Since we need certain ports for our Kubernetes environment, we deactivate the firewall completely for our tests. Of course this should not be done in production!
‘Firewalld’ is a complete firewall solution that has been made available by default on all CentOS 7. 

  • First, stop the Firewalld service with:
    • systemctl stop firewalld
  • Then Disable the Firewalld service
    • systemctl disable firewalld
  • To check the status of firewalld, run the following command
    • systemctl status firewalld

We will run Rancher 2.0 in the container using the docker image rancher/ranche r:v2.2.3. To start it, we use the following command:

  • docker run -d –restart=unless-stopped -p 80:80 -p 443:443 rancher/ranche r:v2.2.3

By means of parameter -d we tell the docker deamon that the container should run ‘detached’, in other words: in the background. –restart=unless-stopped allows us to automatically restart the container after a restart (the service must be enabled for this). With parameter -p we define ports, which can be used to access our container from the outside. 

Create a k8s cluster with custom nodes

The following section describes the process of creating a new cluster. For this we call the Rancher web interface in the browser. 

From the Clusters page, click Add Cluster

Add a new Cluster

  1. Choose Custom
  2. Enter a Cluster Name
  3. Use Member Roles to configure user authorization for the cluster.
    • Click Add Member to add users that can access the cluster.
    • Use the Role drop-down to set permissions for each user
    • Use Cluster Options to choose the version of Kubernetes, what network provider will be used and if you want to enable project network isolation
  4. Click Next

Assign nodes to roles

  1. Under Node select the role that certain nodes should fulfill. If, for example, the master nodes are to be created, only ‘Control Plane’ and ‘etcd’ may be selected. 
  2. Optional: Click Show advanced options to specify one or more IP address(es) to use when registering the node, override the hostname of the node or to add labels to the node.
  3. Copy the displayed command to your clipboard.
  4. Log in to your Linux host using your preferred shell, such as PuTTy or a remote Terminal connection. Run the command copied to your clipboard.
  5. After all commands have been executed on your Linux host(s), then click Done.


  • Your cluster is created and assigned a state of Provisioning. Rancher is standing up your cluster.
  • You can access your cluster after its state is Active.

Active clusters are assigned to two Projects, Default (containing the namespace default) and System (containing the namespaces cattle-system,ingress-nginx,kube-public and kube-system, if present).

In addition to the configured settings, the available resources can also be taken from the Cluster Dashboard.
It also shows the current workload of the nodes.

For more information about the used nodes, select Nodes.
The table shows the current status, assigned role, available resources and their utilization. Nodes can also be restarted or deleted in this overview.
In our example we selected 3 Master Nodes (Role ‘etcd’ and ‘Control Plane’) and 3 Worker Nodes.

As you could see, setting up a cluster is uncomplicated. Within minutes, a Kubernetes cluster with custom nodes can be created and managed within Ranger 2.0. In the Rancher web interface you are able to display all clusters, their projects and namespaces clearly.

You should try it out.

How internet giants deliver their data to the world

In the course of attending the lecture “Ultra Large Scale Systems” I was intrigued by the subject of traffic load balancing in ultra-large-scale systems. Out of this large topic I decided to look at traffic distribution at the frontend in detail and held a presentation about it as part of this lecture. As this subject has proven to be difficult to comprehend, as long as all related factors are considered, multiple questions remained open. In order to elaborate on these questions I decided to write this blog post to provide a more detailed view of this topic for those interested. Herein, I will not discuss the subject of traffic load balancing inside an internal infrastructure, however corresponding literature can be found at the end of this article. Despite concentrating only on the frontend part of the equation an in-depth look into the workings of traffic load balancing will be provided.

Historically grown structures of the internet

The “internet” started somewhere in late 1969 as the so-called ARPANET. In those days, it was exclusively used by scientists to connect their mainframe computers. These in turn were interconnected among each other by Interface Message Processors (IMP), a predecessor of modern routers, which enables network communication through packet switching. However, the underlying protocols were fairly unreliable in heterogeneous environments, because they were specifically designed for a certain transmission medium. In 1974 Vinton Cerf and Robert Kahn, commonly known as the “fathers of the internet”, published their research paper entitled “A Protocol for Packet Network Intercommunication”. The objective of this work was to find a proper solution for connecting such diverse networks and their findings provided the base for the internet protocol suite TCP/IP in the following years. The subsequent migration of the entire ARPANET to these protocols was finally completed in early 1983 and took around six months according to Robert Kahn. At this time the term “internet” slowly began to spread around the world. In addition to this, the Domain Name System (DNS) protocol was developed in 1984. From now on it was possible to address computers in a more native way by using human-readable names instead of IP addresses. Today, all these protocols create the foundation for most connections on the internet.

Generally speaking, today’s internet giants like e.g. Google, Facebook or Dropbox are still based on these previous developments in how they deliver data to their users. In relation to the past, due to the massive increase in scale of dimensions, completely new thoughts and strategies are now necessary in order to meet the requirements. The desire for a change of the existing protocols is great, but if you consider that just the migration of the ARPANET with a significantly lower number of computers connected took six months, then such a fundamental change with several billion devices is quiet hard to imagine. In the old days, often a single web server was sufficient for all your user needs. You can simply assign an IP address to your web server, associate it with a DNS record and advertise its IP address via the core internet routing protocol BGP. In most cases, you don’t have to worry about scalability or high availability and you rely only on the BGP, which takes care of a generic load distribution across redundant network paths and increases the availability of your web server automatically by routing around unavailable infrastructure.

Bigger does not automatically mean better

Today, if you were in the role of an ultra-large-scale system operator such as Dropbox you have to handle roughly half a billion users, who meanwhile stored an exabyte of data. On the network layer this means many millions of requests every second and implies terabits of traffic. As you may have already guessed, this huge demand could certainly not be managed by a single web server. Even if you did have a supercomputer that was able to handle all these requests in some way, it is not recommended to pursue a strategy that relies upon a single point of failure. For the sake of argument, let’s assume you have this unbelievably powerful machine, which is connected to a network that never fails. At this point, please remember the 8 fallacies of distributed computing, which point out that this is only an idealized scenario and will probably never become reality. However, would that configuration be able to meet your needs as the operator? The answer is definitely no. Even such a configuration would still be limited by the physical constraints of the network layer. For instance, the speed of light is a fundamental physical constant that limits the transfer speed for fiber optic cables. This results in an upper boundary on how fast data can be served depending upon the distance it has to travel. Thus, even in an ideal world one thing is absolutely sure: When you’re operating in the context of ultra-large-scale systems, putting all your eggs in one basket is a recipe for total disaster, in other words a single point of failure is always the worst idea.

As already indicated, all the internet giants of today have thousands and thousands of machines and even more users, many of whom issue multiple requests at a time. The keyword to face this challenge is called “traffic load balancing”. This describes the approach for deciding which of the many machines in your datacenters will serve a particular request. Ideally, the overall traffic is distributed across multiple nodes, datacenters and machines in an “optimal” way. But what does the term “optimal” mean in this context? Unfortunately, there’s no definite answer, because the optimal solution depends on various factors, such as:

  • The hierarchical level for evaluating the problem or in other words does the problem exists either inside (local) or outside (global) your datacenter.
  • The technical level for evaluating the problem, which means – is it more likely a hardware or a software issue.
  • The basic type of the traffic you’re dealing with.

Let’s pretend to take the role of the ultra-large-scale system operator Facebook and start by reviewing two typical traffic scenarios: A search request for a mates profile and an upload request for a short video of your cat doing crazy stuff. In the first case, users expect that their search query will be processed in a short time and they get their results quickly. That’s why for a search request the most important variable is latency. In the other case, users expect that their video upload takes a certain amount of time, but also look for that their request will succeed the first time. Therefore, the most important variable for a video upload is throughput. These differing needs of the two requests are crucial to determine the optimal distribution for each request at the global level:

  • The search request should preferably be sent to our nearest datacenter, which could be estimated by the aid of the round-trip time (RTT), because our purpose is to minimize the latency on the request.
  • By contrast, the video upload stream will be routed on a different network path and ideally to a node that is currently underutilized, because our focus is now on maximizing the throughput rather than latency.

On the local level in turn it is often assumed that all our machines within the datacenter are at equal distance to the user and connected to the same network. Therefore, an optimal load distribution follows in the first place a balanced utilization of the resources and protects a single web server from overloading.

There is no doubt, that the above example only depicts a quiet simplified scenario. Under real conditions many more factors have been taken into consideration for an optimal load distribution. For instance, in certain cases it is much more important to direct requests to a datacenter for keeping the caches warm while the datacenter itself is slightly farther away. Another example would be the avoidance of network congestions through routing non-interactive traffic to a datacenter in a completely different region. Traffic load balancing, especially for ultra-large-scale systems operators, is anything but easy to realize and needs to be permanently refined. Typically, the search for an optimal load distribution is taking place at multiple levels, which will be described in the following sections. For the sake of accuracy, it is considered that HTTP requests will be sent over TCP. Traffic load balancing of stateless services such as DNS over UDP is minimally different, but most of the principles that will be described are usually applicable in case of stateless services as well.

First step in the right direction is called “Edge”

To meet the requirements caused by the massive increase in scale of dimensions, effectively all of today’s internet giants have built up an extensive network of points of presence (PoPs) spread around the world named “Edge”. By definition, a PoP is a node within a communication system that establishes connections between two or more networks. A well-known example is the internet PoP, which is the local access point that allows users to connect to the wide area network of their internet service provider (ISP). In our context, Edge takes advantage of this principle: The most of you may be familiar with the basic idea of content delivery networks (CDN), terminating TCP connections close to the user, which results in an improved user experience because of extremely reduced latencies. By way of illustration using the example of Dropbox, you’ll now see a comparison of an HTTP request latency made directly to a datacenter and the same request made through a PoP:

Direct comparison of a connection between user and datacenter with or without a PoP respectively.

As it can be seen, in case of a direct connection between the user in Europe and our datacenter in the USA the round-trip time (RTT) is significantly higher compared to establishing the connection using a PoP. This means that by putting the PoP close to the user, it is possible to improve latency by more than factor of two!

Minimized network congestion as a positive side effect

In addition, the users benefit from faster file uploads and downloads, because latency may also cause network congestion due to long-distance transmission paths. Just as a reminder, the congestion window (CWND) is a TCP state variable that limits the amount of data which can be send into the network before receiving an acknowledgement (ACK). The receiver window (RWND) in turn is a variable which advertises the amount of data that the destination can receive. Together, both of them are used to regulate the data flow within TCP connections to minimize congestion and improve network performance. Commonly, network congestion occurs when packets sent from a source exceed what the destination can handle. If the buffers at the destination get filled up, packets are temporarily stored at both ends of the connection as they wait to be forwarded to the upper layers. This often leads to network congestion associated with packet loss, retransmissions, reduced data throughput, performance degradation and in worst case the network may even collapse. During the so called “slow start”, which is a part of the congestion control strategy used by TCP, the CWND size is increased exponentially to reach the maximum transfer rate as fast as possible. It increases as long as TCP confirms that the network is able to transmit the data without errors. Nevertheless, this procedure is only repeated until the maximum advertised window (RWND) is reached. Subsequently, the so called “congestion avoidance” takes place and continues to increase the CWND as long as no errors occur. This means that the transfer rate in a TCP connection, determined by the rate of incoming ACK, depends on the bottleneck in the round-trip time (RTT) between sender and receiver. However, the adaptive CWND size allows TCP to be flexible enough to deal with arising network congestion, because the path to the destination was permanently analyzed beforehand. It can adjust the CWND for reaching an optimal transfer rate with minimal packet loss.

Let’s keep that in mind and take up again our initial thought to see how latency affects growth of the CWND during file uploads:

Comparison of the impact from latency on connections with or without a PoP repectively.

By using the example of Dropbox, you can see the impacts arising from low latency compared to high latency for connections with or without a PoP respectively. The transfer history of our client with lower latency is progressing through the so called “slow start” more quickly, because as you already know this procedure is depending on the RTT. This client also settles down on a higher threshold for the so called “congestion avoidance”, because its connections have a lower level of packet loss. This is due to the fact that packets spend less time on the internet and its possibly congested nodes. Once they reach the PoP, you can take them into our own network.

Enhanced approach for congestion avoidance

As you may have already guessed the congestion control strategy as described above is no longer an adequate response due to the transfer speeds that are available today. Since late 1980 the internet has managed network congestion through indications of packet loss to reduce the transfer rate. This worked well for many years, because the small buffers of the nodes were well aligned to the overall low bandwidth of the internet. As a result, the buffers get filled up and drop packets that they could not handle anymore at the right time when the sender has begun transmitting data too fast.

In today’s diverse networks the previous strategy for congestion control is slightly problematic. Therefore, in small buffers on the one hand, packet loss often occurs before congestion. Through commodity switches having such small buffers, which are used in combination with today’s high transfer speeds, the congestion control strategy based on packet loss can result in extremely bad throughput because it commonly overreacts. The consequence is a decreased sending rate upon packet loss that is even caused by temporary traffic bursts. In large buffers on the other hand, congestion occurs usually before packet loss. At the Edge of today’s internet, the congestion control strategy based on packet loss causes the so called “bufferbloat” problem, by repeatedly filling these large buffers in many nodes on the last mile which results in an useless queuing delay.

That’s why going forward a new congestion control strategy, which responds to actual congestion rather than packet loss, is much-needed. The Bottleneck Bandwidth and Round-trip propagation time (BBR) developed by Google tackles this with a total rewrite of congestion control. They started from scratch by using a completely new paradigm: For the decision on how fast data can be send over the network, BBR considers how fast the network is delivering the data. For that to happen, it uses recent measurement of the network’s transmission rate and round-trip time (RTT) to generate a model that includes the current maximum bandwidth that is available as well as its minimum current round-trip delay. Afterwards, this model is used by BBR to control both how fast data is transmitted and the maximum amount of data that is possible in the network at any time.

How to detect the optimal PoP locations

After our extensive discursion on network congestion and how it may be avoided, let’s get back to Edge and its network of PoPs. In case of Dropbox, according to the latest information, they have roughly 20 PoP spread across the world. This year it is planned to increase their network by examining the feasibility of further PoPs in Latin America, Middle East and the Asia-Pacific region. Unfortunately, the procedure for selecting PoP locations, which was easy first, now becomes more and more complicated. Besides focusing on available backbone capacity, peering connectivity and submarine cables, all the existing locations have to be considered too.

Their method to select PoPs is still done manually but is already assisted by algorithms. Even with a small number of PoPs and without assistive software it is challenging to choose between, for instance a PoP in Latin America or Australia. Regardless of the number of PoPs, the basic question always remains: What location will benefit the users better? For this purpose, the following short “brute-force” approach helps them to solve the problem:

  1. First, the Earth is divided into multiple levels of so called “S2 regions”, which are based on the S2 Geometry library provided by Google. Through this, it is possible to project all your data in a three-dimensional space instead of a two-dimensional projection what results a more detailed view of the Earth.
  2. Then all existing PoPs are assigned accordingly to the prior generated map.
  3. Afterwards, the distance to nearest PoP for all regions weighted by “population” is determined. In this context the term “population” refers to something like the total number of people in a specific area as well as the number of existing or potential users. This is used for optimization purposes.
  4. Now, exhaustive search is done to find the “optimal” location for the PoP. In connection with that, L1 or L2 loss functions are used to minimize the error for determining the score of each placement. Hence, they try to compensate internet quirks such as the effects of latency on the throughput of TCP connections.
  5. Finally, the new-found location will be added to the map as well.
  6. The steps 3 to 5 are repeated for all PoPs until an “optimal” location is found for each of them.

The attentive reader may have already seen that the above problem can also be solved by more sophisticated methods like Gradient Descent or Bayesian Optimization. This is indeed true, but the problem space is relatively small with roughly 100.000 S2 regions, so that they can just brute-force through it and still achieve a satisfying result instead of running the risk to get stuck on a local optimum.

The beating heart of the Edge

Now, let’s have a closer look at Global Server Load Balancing (GSLB), which is completely without doubt the most important part of the Edge. It is responsible for load distribution of the users across all PoPs. This usually means that a user is sent to the closest PoP, which has free capacity and is not under maintenance. Calling the GSLB the “most important part” here is due to the fact that if it misroutes users to a suboptimal PoP frequently, then it potentially reduces performance and your Edge may be rendered useless. Using the example of Dropbox, commonly used techniques for GSLB should be discussed hereinafter by showing up the pros and cons.

Keep things easy with BGP anycast

Anycast is by far the easiest method to implement traffic load balancing, because it completely relies on the Border Gateway Protocol (BGP). Just as a quick reminder, anycast is a basic routing scheme where the group members are addressed by a so called “one-to-one-of-many” association. This means that packets are routed to any single member belonging to a group of potential receivers, which are all identified by the same destination IP address. The routing algorithm then selects a single receiver from the group based on a metric that relies on the least number of hops. This leads to the fact that packets are always routed to the topologically nearest member of an anycast group.

Nationwide traffic distribution using anycast.

As it can be seen, to use anycast it is sufficient to just start advertising the same IP subnet from all PoPs and the internet respectively. DNS will deliver packets to the “optimal” location as if by magic. Even though you get a rudimentary failover mechanism for free and the setup is quiet simple, anycast has a lot of drawbacks, so let’s have a look at these in detail.

Anycast is not the silver bullet

Above, it was mentioned that BGP automatically selects the “optimal” route to a PoP and from a general point of view this is not incorrect. However, the policy of Dropbox favors network paths that are likely to optimize the traffic performance. But the BGP does not know anything about latency, throughput or packet loss and it relies only on attributes such as path length, which turns out as inadequate heuristics for proximity and performance. Thus, traffic load balancing based on anycast is optimal in most cases, but it would seem that there is a poor behavior on high percentiles if there is only a small or medium number of PoPs involved. However, it looms that the probability of routing users to the wrong destination in networks using anycast decreases with the number of PoPs. Nevertheless, it is possible that with an increasing number of PoPs, anycast may eventually exceed the traffic load balancing method based on DNS, which is discussed below.

Limited traffic steering capabilities

Basically, with anycast control over the traffic is very limited, so that the capacity of a node may not be able to forward all traffic you would like to send over it. Considering that peering capacity is quiet limited, rapid growth in demand can quickly result in an overstrain of the capacity of an existing interconnection. Short-time peaks in demand due to an event or a holiday as well as reductions in capacity caused by failures can lead to variations in demand and available capacity. In any of these cases the decisions BGP makes won’t be affected by surrounding factors, as it is not aware of capacity problems and related conflicts. Nevertheless, you can do some “lightweight” traffic distribution using specific parameters of BGP in the announcements or by explicitly communicating with other providers to establish a peering connection, but all this is not scalable at all. Another pitfall of anycast is that careful drain of PoPs at times of reduced demand is effectively impossible, since BGP balances packets, not connections. As soon as the routing table changes, all incoming TCP connections will immediately be routed to the next best PoP and the user’s request will be refused by a reset (RST).

The crux of the matter is often hard to find

Most commonly, general conclusions about traffic distribution with anycast are not trivial, since it involves the state of internet routing at a given moment. Troubleshooting performance issues with anycast is like looking for a needle in a haystack and usually requires a lot of traceroutes as well as a long wind in communicating with involved providers. It is worth mentioning again, that in the case of draining PoPs, any connectivity change in the internet has a possibility to break existing TCP connections to IP addresses using anycast. Therefore, it can be quiet challenging to troubleshoot such intermittent connection issues due to routing changes and faulty or rather misconfigured hardware components.

Light and shadow of load distribution with DNS

Usually, before a client can send an HTTP request, it often has to look up an IP address first using DNS. This provides the perfect opportunity for introducing our next common method to manage traffic distribution called DNS load balancing. The simplest way here is to return multiple A records for IPv4 and AAAA records for IPv6 respectively in the DNS reply and let the client select an IP address randomly. While the basic idea is quiet simple and easy to implement, it still has multiple challenges.

The first problem is that you have only little control over the client behavior, because records are selected arbitrary and each will attract roughly the same amount of traffic. But is there any way to minimize this problem? On paper, you could use a SRV record to predefine specific priorities, but unfortunately these records have not yet been adopted for HTTP.

Our next potential problem is due to the fact that usually the client cannot determine the closest IP address. You can bypass this obstacle by using an anycast IP address for authoritative name servers and take advantage of the fact that DNS queries will lead to the nearest IP address. Thus, the server can reply addresses pointing to the closest datacenter. An improvement of that builds a map of all networks as well as their approximate geographical locations and serves corresponding DNS replies. However, this approach implies a much more complex DNS server implementation and a maintaining effort to keep the location mapping up to date.

The devil is often in the detail

Of course, none of the solutions above are easy to realize, because of fundamental characteristics of DNS: A user rarely queries authoritative name servers directly. Instead, a recursive DNS server is usually located between a user and the name server. This server proxies queries of the users and often provides a caching layer. Thereby, the DNS middleman has the following three implications on traffic management:

  • Recursive resolution of IP addresses
  • Multiple nondeterministic reply network paths
  • Additional caching problems

As you may have already guessed the recursive resolution of IP addresses is quiet problematic, because instead of the user’s IP address an authoritative name server sees the IP address of the recursive resolver. This is a huge limitation, as it only allows replies optimized for the shortest distance between the location of the resolver and the name server. To tackle this issue, you can use the so called “EDNS0” extension, which includes information about the IP subnet of the client inside the DNS query sent by a recursive resolver. This way, an authoritative name server returns a response that is optimal from the perspective of the user. Although this is not yet standard, the biggest DNS resolvers such as Google have decided to support it already.

The difficulty here is not only to return the optimal IP address by a name server for a given user’s request, but that name server may be responsible for serving thousands or even millions of users spread across multiple regions. For example, a large nationwide ISP might operate name servers for its entire network inside one datacenter, which has interconnections for each provided area. These name servers would always return the IP address that is optimal from the perspective of the ISP, despite there being better network paths for their users. But what does the term “optimal” mean in the context of DNS load balancing? The most obvious answer to this question is a location that is closest to the user. However, there are existing additional criteria. First of all, the DNS load balancer has to ensure that the selected datacenter and its network are working properly, because sending user requests to a datacenter that has currently problems would be a bad idea. Thus, it is essential to integrate the authoritative DNS server into the infrastructure monitoring.

The last implication of the DNS middleman is related to caching. Due to the fact that authoritative name servers cannot flush the caches of a resolver remotely, it is best practice to assign a relatively low time to live (TTL) for DNS records. This effectively sets a lower boundary on how quickly DNS changes can be theoretically propagated to users. But the value has to be set very carefully, since if the TTL is too short, it results in a higher load on your DNS servers plus an increased latency, because clients have to perform DNS queries more often. Moreover, Dropbox concluded that the TTL value in DNS records is fairly unreliable. They used a TTL of one minute for their top level domain (TLD) and in case of changes it takes about 15 minutes to drain 90% of the traffic and in total roughly a full hour to drain additional 5% of traffic.

DNS load balancing in the field

Despite all of the problems that are shown above, DNS is still a very effective way for traffic load balancing in ultra-large-scale systems.

Nationwide traffic distribution using DNS dependent on geographical location.

By using the example of Dropbox, you can see an approach where each PoP has its own unique unicast IP address space and DNS is responsible for assigning different IP addresses to the users based on their geographical location. Just as a quick reminder, unicast is another basic routing scheme where sender and receiver are addressed by a so called “one-to-one” association. This means that each destination IP address uniquely identifies a single receiver. It gives them control over traffic distribution and also allows a careful drain of PoPs. It is worth mentioning that conclusions in the context of a unicast setup are much easier and troubleshooting becomes simpler as well.

Hybrid unicast/anycast combines the best of both worlds

Finally, let’s briefly cover one of the composite approaches to GSLB, which is a hybrid setup with a combination of unicast and anycast. This allows getting all benefits from unicast and the ability to quickly drain PoPs if it is required along with DNS replies corresponding to geographical locations.

Nationwide traffic distribution using a combination of unicast and anycast.

As it can be seen, this approach is used by Dropbox to announce the unicast IP subnet of a PoP and one of its IP supernets simultaneously from all of the PoPs. However, it implies that every PoP needs a configuration that is able to handle traffic destined to any PoP. This will be realized inside a PoP by a L4 load balancer using virtual IP addresses (VIP). Basically, these VIPs are not assigned to any particular machine and usually shared across many of them. However, from the perspective of the user a VIP remains a single, regular IP address. On paper, this practice allows to hide the number of machines behind a specific VIP and other implementation details. This gives them the ability to quickly switch between unicast and anycast IP addresses as needed and also an immediate fallback without waiting for the unreliable TTL to expire. Thereby, a careful drain of PoPs is possible and all the other benefits of DNS load balancing will remain as well. All of that comes at a relatively small operational cost with a slightly more complex setup and may cause trouble in respect of scalability, once the amount of several thousand VIPs is reached. But the positive side effect of this approach is that the configuration of all PoPs becomes clearly more uniform.

Real User Measurement brings light into the darkness

At all the methods for traffic load balancing discussed up until now one critical problem always remains: None of them covers the actual performance which a user experiences. Instead, they rely only on some approximations such as the number of hops used by BGP and the assignment of IP addresses in case of DNS based on geographical location. However, this lack of knowledge can be bridged by using Real User Measurement (RUM). For example, Dropbox is collecting such detailed performance information from their desktop clients. As they said, a lot of time is put into the implementation of a measurement framework, helping them to estimate the reliability of their Edge from the perspective of a user. The basic procedure is pretty simple: From time to time, a subset of clients checks the availability of their PoPs and reports back the results. Afterwards, they extended this by collecting latency information too, which allows them a more detailed view of the internet.

Additionally, they build a separate mapping by joining data from their DNS and HTTP servers. On top of that, they added information about the IP subnet of their clients from the so called “EDNS0” extension, which is sent inside the DNS query by a recursive resolver. Now, they combined this with the aggregated latencies, BGP configuration, peering information and utilization data from the infrastructure monitoring to create a decidedly mapping of the connection between clients and their Edge. Finally, they compared this mapping to both anycast as well as the assignment of IP addresses in case of DNS based on geographical location and packed it into a so called “radix tree”, which is a special data structure for a memory-optimized prefix tree, to upload it onto a DNS server.

To expand the mapping for extrapolation purposes they made their decisions using the following patterns: In case that all samples for a node have the same “optimal” PoP in the end, they conclude that all IP address ranges announced by that node should lead to that PoP. Otherwise, if a node has multiple “optimal” PoPs, they break it down into announced IP address ranges. While doing so, for each one it is assumed that if all measurements in an IP address range end up at the same PoP, this choice can be extrapolated to the whole range as well. With this approach they are able to double their map coverage, make it more robust to changes and generate a mapping based on a smaller amount of data.

Reaching the safe haven

In conclusion, all this back and forth with finding out a user’s IP address based on its resolver, the effectiveness of assigning an IP address in case of DNS based on geographical location, the fitness of decisions made by BGP and so on are all no longer necessary after a request arrives at the Edge. Because from that moment on, you know the IP address of each user and even have a corresponding round-trip time (RTT) measurement. Now, it is possible to route users on a higher level such as embedding a link to a specific PoP in the HTML file. This allows traffic distribution on a more detailed level.

As you may have realized by reading this article, traffic load balancing is a difficult and complex problem that is really challenging, if you want to consider all related factors. In addition to the strategies described above, there are much more traffic distribution algorithms and techniques to reach high availability in ultra-large-scale systems. If you want to know more about the idea of traffic load balancing, or which facts you have to consider for developing a strategy of traffic distribution inside the PoPs, please follow the links at the end of this article.

References and further reading

  1. Dropbox traffic infrastructure: Edge network by Oleg Guba and Alexey Ivanov
  2. Steering oceans of content to the world by Hyojeong Kim and James Hongyi Zeng
  3. Site Reliability Engineering: Load Balancing at the Frontend by Piotr Lewandowski and Sarah Chavis
  4. Directing traffic: Demystifying internet-scale load balancing by Laura Nolan
  5. What is CWND and RWND by Amos Ndegwa
  6. TCP BBR congestion control comes to GCP – your Internet just got faster by Neal Cardwell et al.
  7. What Are L1 and L2 Loss Functions? by Amit Shekhar

Consensus protocols – A key to cluster management?


In these times, applications require increasing robustness and scalability, since otherwise they will collapse under the burden of the vast number of users. Cluster managers like kubernetes, Nomad or Apache Marathon are a central factor of this resilience and scalability. A closer look at the insides of cluster managers reveals consensus protocols to be the crucial point. This blog post gives an overview of the most common consensus protocols and their workflows. Furthermore, the concept of a consensus protocol is questioned and potential improvements of the consensus protocols in cluster management are discussed.

Table of contents

List of figures

  1. Container Management Platforms Perferences
  2. Distributed System
  3. Concurrency
  4. State transition
  5. Consensus algorithm
  6. FLP impossibility
  7. Two-phase commit, fault-free execution, phase one
  8. Two-phase commit, fault-free execution, phase two
  9. Two-phase commit, with coordinator failure, phase one
  10. Two-phase commit, with coordinator failure, phase two
  11. Procedure ZooKeeper
  12. Performance test


Container management platforms are the preferred choice when it comes to orchestrating containers with high availability, reliability and scalability. The diagram in Figure 1 shows the distribution of container management platforms from 2016 to 2017.

Figure 1: Container Management Platforms Preferences
Figure 1: Container Management Platforms Preferences [1]

In comparison to the other platforms, a clear trend towards kubernetes is evident. In order to understand why such a trend exists, it is necessary to look behind the scenes. Are there differences or similarities between these platforms in terms of consensus protocols? Is this a critical factor for the emergence of this trend? In order to clarify, a number of essential terms need to be addressed.

Distributed system

A distributed system includes a set of distinct processs sending messages to each other and coordinating to achieve a common objective.
Even a single computer can be viewed as a distributed system.
Memory units, input-output channels and the central control unit also represent separate proceses collaborating to complete an objective.
In this blog post the focus is on distributed systems where the processes are spatially distributed across computers [4].

Figure 2: Distributed System
Figure 2: Distributed System [4]

Properties of a distributed system

The following features are particularly important in relation to a distributed system.

1. Concurrency

Nodes or Processes running simultaneously require coordination. As illustrated in Figure 3, this concurrency results in different events at certain points in time.

Figure 3: Concurrency
Figure 3: Concurrency [2]
2. Lack of global clock

The order of the events must be determined. However, there is no global clock that can be used to determine the order of events on the computer network.
The following two factors can be considered to determine whether an event happened before another.

  • Messages are sent before they are received
  • Every computer has a sequence of events

This results in a partial sequence of events of the system. In order to obtain a total sequence of the system's events, an algorithm is required which requires the communication of the computers in this system.
If an algorithm relies only on the order of events, abnormal behavior may occur.
Such abnormal behavior can be avoided by synchronizing physical clocks.
The coordination of independent clocks is rather complex and still clock drifts can occur.
The time and sequence of events are fundamental obstacles in a distributed system with spatially distributed processes [4].

3. Independent failure of components

A key aspect is the insight that components can be faulty in a distributed system.
It is impossible to have an error-free distributed system due to the large number of potential failures.
Faults can be divided into the following three groups.

  • Crash-fail: The component stops immediately without warning.
  • Omission: The component sends a message which does not arrive.
  • Byzantine: The component behaves arbitrarily, it sometimes exhibits regular behavior but also malicious Byzantine behavior. This variant is irrelevant in controlled environments like data centers.

Based on this assumption, protocols should be designed to allow a system to have faulty components, yet achieve a common objective and provide a meaningful service.

Since every system has failures, a major consideration is the ability of the system to survive if its components deviate from normal behavior regardless of whether they are malicious or not.
Basically, a distinction is made between simple fault-tolerance and Byzantine fault-tolerance.

  • Simple fault-tolerance: In systems with simple fault-tolerance, one assumes that components either exactly follow the protocol or fail completely. Arbitrary or malicious behavior is not considered.
  • Byzantine fault-tolerance: In uncontrolled environments, a system with simple fault-tolerance is not particularly useful. In a decentralized system, where components are controlled by independent actors communicating on the public, unapproved Internet, malicious components must also be expected.

The BAR fault-tolerance extends the Byzantine fault-tolerance and defines the following three classes.

  • Byzantine: Components that are malicious
  • Altruistic: Components that always follow the protocol
  • Rational: Components that only follow the protocol when it is convenient [4].
4. Message transmission

Messages are sent either synchronously or asynchronously.

  • Synchronous: In a synchronous system, messages are assumed to be delivered within a fixed time window. Conceptually, synchronous messaging is less complex because it guarantees a response.
    However, this variant is often impracticable in a genuine distributed system, with computers crashing, messages being delayed, or not arriving.
  • Asynchronous: In an asynchronous system, it is assumed that the network delays, duplicates, or sends out-of-order messages for an infinite amount of time.

Replicated State Machine

A replicated state machine is a deterministic state machine distributed across many computers, though acting as a single state machine. The state machine works even if an arbitrary computer fails.
A valid transaction in a replicated state machine results in a transition to another state.
A transaction represents an atomic operation on the database complying with the ACID principle.

Figure 4: State transition
Figure 4: State transition [4]

A replicated state machine is a set of distributed computers all starting with the same initial value.
Each of the processes decides on the next value for each state transition, illustrated in Figure 4.
Achieving consensus implies the collective agreement of all computers on an output value based on the current value. A consistent transaction log thus is obtained for each computer in the system.
A replicated state machine must continuously accept new transactions in the log, even when:

  • Some computers fail,
  • The network fails to send messages reliably,
  • No global clock exists to determine the order of events.

This is the fundamental intent of any consensus algorithm as depicted in Figure 5.

Figure 5: Consensus algorithm
Figure 5: Consensus algorithm [4]

A consensus algorithm achieves consensus when the following three conditions are satisfied:

  • Agreement (or safety): All non-faulty nodes decide on the same output value.
  • Validity: The value to be decided on must have been proposed by a node in the network.
  • Termination (or liveness): All non-faulty nodes may decide on an output value.

Typically a consensus algorithm has three types of actors in the system.

  1. Proposers, often referred to as leaders or coordinators
  2. Acceptors or followers, are the ones who listen to the requests of the proposers and answer
  3. Learners, are the ones who learn the output value resulting from the vote

A consensus algorithm generally has the following procedure.

  1. Elect: In this phase the leader is selected. A leader makes the decisions and proposes the next valid output.
  2. Vote: All non-faulty components listen to the proposed value of the leader, validate it and propose it as the next valid value.
  3. Decide: All non-faulty components must come to a consensus on a correct output value, otherwise the procedure is repeated [4].
FLP impossibility

As described above, there are differences between synchronous systems and asynchronous systems. In synchronous environments, messages are delivered within a fixed time frame. In asynchronous environments, there's no guarantee of a message being delivered.

Figure 6: FLP impossibility
Figure 6: FLP impossibility [3]

Reaching consensus in a synchronous environment is possible due to assumptions about the maximum time required to deliver messages. In such a system, different nodes are allowed to alternately propose new transactions, search for a majority vote, and skip each node if they do not offer a proposal within the maximum time period.

In a fully asynchronous system there is no consensus solution that can tolerate one or more crash failures even when only requiring the non triviality property [4]

The FLP impossibility describes the property of being unable to accept a maximum message delivery time in an asynchronous environment. termination becomes much more difficult, if not impossible. This is necessary since termination conditions must be complied with in order to reach a consensus, meaning every node not having a fault must decide on an output value [4].

To circumvent the FLP impossibility there are two options.

  • Use synchrony assumptions
  • Use non-determinism

Consensus protocols

Consensus protocols can be distinguished into two basic approaches: Synchrony assumption and non-deterministic.

Approach 1: Use Synchrony Assumptions

If messages are sent asynchronously, termination cannot be guaranteed.
How can it be guaranteed that every non-faulty node will choose a value?
Due to asynchronicity, a consensus cannot be reached within a fixed time window.
This leads to the conclusion that consensus cannot always be reached.
One way to prevent this are timeouts. If there is no progress, the system waits until the timeout and restarts the process. Consensus algorithms like Paxos and Raft apply this method [4].

Simple fault-tolerance

The following section describes algorithms of the Simple fault-tolerance category. These differ from the Byzantine fault-tolerance algorithms in terms of either following the protocol or failing. Byzantine nodes may also exhibit malicious behavior.

Two-phase commit

The two-phase commit is the simplest and most commonly utilized consensus algorithm. As the name suggests, the algorithm consists of two different phases.
The first phase is the proposal phase. It involves proposing a value for each participant in the system and obtaining the answers as shown in Figure 7.

Figure 7: Two-phase commit, fault-free execution, phase one
Figure 7: Two-phase commit, fault-free execution, phase one [5]

The second phase is the commit or abort phase. In this phase, the result of voting is communicated to all participants in the system. Also, it is transmitted whether to continue and decide or erase the log, as depicted in Figure 8.

The node proposing the values is referred to as the coordinator. The coordinator is not required to be selected by means of a special procedure. Each node may act as a coordinator and thus start a new round of the two-phase commit.

It is important that the nodes do not reach a consensus on what a value should be, but reach a consensus on whether or not to accept it.

Figure 8: Two-phase commit, fault-free execution, phase two
Figure 8: Two-phase commit, fault-free execution, phase two [5]

In phase 2 each node decides on the value proposed by the coordinator in phase 1 when and only when communicated by the coordinator. The coordinator sends the same decision to each node, so if a node is instructed to determine a value, they all do. Therefore the condition for agreement is satisfied.

The two-phase commit always aborts, except when each node approves. In all cases, the final value of at least one node was voted on. Thus the condition for validity is complied with.

Finally, termination is guaranteed if each node makes progress and finally returns its vote to the coordinator, who passes it on to each node. There are no loops in the two-phase commit, so there is no possibility to continue forever [5].

Crashes and failure

To understand the failures, it is necessary to consider each state the protocol may take and contemplate what occurs if either the coordinator or one of the participants crashes in this state.

In phase one, before messages are sent, the coordinator could crash, as illustrated in Figure 9. This is not too troublesome as it simply means that Two-phase commit is never started.
If a participant node crashes prior to starting the log, then no harm will result until the proposal message does not reach the crashed node, so this fault can be corrected later.

Figure 9: Two-phase commit, with coordinator failure, phase one
Figure 9: Two-phase commit, with coordinator failure, phase one [5]
If the coordinator crashes, but some proposal messages have not all been sent, some nodes have received a proposal and are starting a two-phase commit round, and some nodes are unaware anything happened. If the coordinator does not recover for a long time, the nodes receiving the proposal will block and wait for the result, which may never complete. This can mean nodes have sent their votes back to the coordinator without knowing they failed. Therefore, the protocol cannot simply be aborted due to a possibility that the coordinator awakens again, sees their "commit" votes, and starts phase 2 of the protocol with a commit message.

Therefore, the protocol is blocked on the coordinator and cannot make any progress. The problem of waiting for a participant to fulfill his or her part of the protocol cannot be completely resolved.

To counteract this, another participant can take over the coordinator's work once the coordinator is determined to have crashed. If a timeout occurs at a node, it can be forced to complete the protocol the coordinator started.
As in a phase 1 message, this node can contact all other participants and discover their votes. However, this requires persistence in each node.

It is also possible that only one node knows the result of the transaction. If the coordinator fails in phase 2 before all nodes are told to abort/transmit the decision, as shown in Figure 10.

Figure 10: Two-phase commit, with coordinator failure, phase two
Figure 10: Two-phase commit, with coordinator failure, phase two [5]

However, if another node crashes before the recovery node can end the protocol, the protocol cannot be restored to its original state.

The same applies to phase 2. If the coordinator crashes before the commit message has reached all participants, a recovery node is required to take over the protocol and safely complete it.

The worst-case scenario is when the coordinator is a participant himself and grants himself a vote on the result of the protocol. Then, if it crashes, both the coordinator and a participant are shut down, ensuring that the protocol remains blocked as a result of a single fault [5].


The Paxos protocol consists of the following phases.

Phase 1: Prepare request

The proposer picks a new number n and sends a prepare request to all acceptors.
If all acceptors have a prepare request (prepare, n), they send a response (ack, n, n', v') or (ack, n, ,). In order for acceptors to respond with a promise, n must be greater than any number ever received before.
Acceptors now propose the value v of the proposal with the highest number they have accepted, if any. Otherwise, they reply with ^.

Phase 2: Accept request

When the proposer receives the responses of the majority of acceptors, it sends an accept request (accept, n, v) with the number n and the value v to the acceptors. The number n is the number from the phase 1 prepare request.
The value v is the highest numbered proposal among the responses.
If an acceptor receives an Accept Request (accept, n, v), it accepts the proposal unless it has already responded to a Prepare Request with a number greater than n. The value v is the highest numbered proposal among the responses.

Phase 3: Learning phase

Whenever an acceptor accepts a proposal, he answers to all Learners with (accept, n, v).
Learners receive (accept, n, v) from a majority of acceptors, decide v and send (decide, v) to all other Learners. Learners receive (decide, v) and the decided value v.

Each Distributed System contains faults. To counteract these, the decision is delayed in Paxos if a proposer fails. A new number is used to start in phase 1, even if previous attempts have not yet been completed.

Paxos is difficult to understand due to its many deliberately open implementation details.
Questions like: When to be certain if a proposer has failed? or Do synchronous clocks have to be used to set the timeouts? are some of these implementation details.
Leader election, failure detection and log management are also purposely kept open to ensure greater flexibility. However, exactly these design decisions are the biggest disadvantages of Paxos [4].

A Leader election mechanism in Paxos might be realized by a simple algorithm like the bully algorithm.
It starts by sending a server id to all nodes. If an id is received, a node sends a response containing the own id.
Next, a node checks if all responses have lower ids than its own. If this is the case the node is a the new leader.
If the id of a node is higher than a the received id, the node starts its own election.

The procedure of performing multiple Paxos decisions consecutively based on a log is called Multi-Paxos.


Unlike Paxos, Raft is designed with a focus on intelligibility.
For the first time, Raft introduced the concept of shared timeouts to deal with termination. If an error occurs in Raft, a restart is performed. Nodes wait at least one timeout period until they try to declare themselves leader again. This guarantees progress.

The shared status in Raft is typically represented by a replicated log data structure. Like Paxos, Raft requires a majority of servers that are available to operate correctly. In general, Raft consists of the following three elements.

  • Leader election: If the current leader fails, a new leader must be elected. The leader is responsible for accepting client requests and managing the replication logs of other servers. Data always flows from Leader to other servers.
  • Log replication: The leader synchronizes the logs of all other servers by replicating his own log.
  • Safety: If a server commits a log entry with a particular index, other servers cannot set a different log entry for that index.

Raft servers can have the following three states.

  • Leader: Typically only one leader exists, all other servers in the cluster are followers. Client requests from the followers are forwarded to the leader.
  • Follower: A follower is passive. It only responds to requests from leaders or candidates, or forwards client requests.
  • Candidate: A computer in this state wants to be elected as the new leader.

If a candidate wins an election to be leader, this leader remains for an arbitrary period of time, referred to as the term. Each term is identified by a term number, which is incremented after each term. Each server must persistently store the current term number.

Raft uses the remote procedure calls RequestVotes and AppendEntries.
RequestVotes are used by candidates during elections.
AppendEntries are used by leaders for replication log entries and as heartbeat [6].

Leader election

Leaders periodically send heartbeats to the followers. A Leader election is triggered when a follower does not receive a heartbeat from the leader for a certain period of time.
Next, the follower becomes a candidate and increments his term number.
He now sends RequestVotes to all other participants in the cluster, resulting in the following three options.

  • The candidate receives the majority of votes and becomes leader.
  • If another candidate receives an AppendEntries message, he must check whether the received term number is greater than his own. If the own term number is greater, the server remains in candidate state and the AppendEntries message is rejected. If the own term number is smaller, the server switches back to the follower state.
  • Several servers became candidates at the same time and the vote did not give a clear majority decision. In this case a new election starts and one of the candidates times out [6].
Log replication

Client requests can initially be regarded as write-only. Each request consists of a command, which is ideally executed by the replicated state machine of all servers. A leader who receives a new client request adds it to his log in the form of a new entry. Each log entry contains a client-specific command, an index to identify the position in the log, and the term number to maintain a logical order.
To ensure consistency, a new log entry must be replicated in all followers.
The leader sends the AppendEntries message to all servers until all followers have replicated the entry securely. If all followers have replicated the entry, this entry can be considered committed together with all previous entries.
The leader stores the highest index of committed logs. This index is sent to the followers with every AppendEntries message, so they can check if their state machine is still in the correct order.
So if two different logs have the same index and the same term number, they store the same command and all previous log entries are identical.
If a follower does not find a suitable position for the log entry when receiving an AppendEntries message based on the index and the term number, it rejects that message.
By this mechanism the leader is certain that after a successful response of the AppendEntries request the log of a followers is identical to his own log.

In Raft, inconsistencies are resolved by overwriting the follower logs. First, the leader tries to find the last index matching the followers log. If found, the follower deletes all newer entries and adds new ones [6].


Raft ensures the leader of a term has all committed entries of all previous terms in his log. This is necessary in order for all logs to be consistent and for the state machines to execute the same commands.
During a Leader election, the RequestVote message contains information about the candidate's log. When a voter's log is more up to date than the candidate's log sending the RequestVote message, it does not vote for that candidate.
Choosing which log is more up to date is based on the term number and the length of the log. The higher the term number and the longer the log, the more up-to-date it is [6].

ZooKeeper Atomic Broadcast protocol (ZAB)

Like Raft, the ZooKeeper Atomic Broadcast protocol (ZAB) achieves high availability by distributing data across multiple nodes. This allows clients to read data from any node. Client writes are also forwarded to the leader. An important design criterion is the incremental assignment of each state change to the previous state. This results in an implicit dependency of the order of the states. Besides the guarantee of replication in order, ZAB defines procedures for Leader election and recovery of faulty nodes.

A term in Raft is defined in ZAB as an epoch of a leader. An epoch is also identified by a number generated by the leader.
The epoch number must also be larger than all previous epoch numbers.
A transaction represents a state change the leader propagates to his followers.
Furthermore, analog to the index in Raft, a sequence number is generated by the leader, which starts at the value 0 and increments.
The epoch number and the sequence number are important to ensure the order of the state changes.
Analogous to the replication log in Raft, each follower in ZAB has a history queue. In this queue all incoming transactions are committed in received order.

In order for ZAB to be executed correctly, the following prerequisites must be met.

  • Replication guarantees reliable delivery, total and causal order.
    • If a transaction is committed by a server, it may be committed to all servers.
    • If a transaction A is committed by a server prior to a transaction B, all servers must commit transaction A prior to transaction B. If a transaction A is committed by a server prior to a transaction B, all servers must commit transaction A prior to transaction B.
    • If a transaction A is committed by a server and another transaction B is sent, transaction A must be placed before transaction B. If a transaction A is committed by a server and another transaction B is sent, transaction A must be put before transaction B. If transaction B is committed by a server, transaction A must be entered before transaction B. If transaction B is committed by a server and another transaction B is sent, transaction A must be put before transaction B. When a transaction C is then sent, C must be put after B.
  • Transactions are replicated as long as the majority of nodes are available.
  • If a node fails and restarts, it must catch up on the missed transactions.

The general procedure outlined below is depicted in Figure 11.

When a leader receives a change update from a client, it generates a transaction with a sequence number and the epoch number. Afterwards, it sends this transaction to its followers. A follower adds this transaction to its history queue and confirms this to the leader via ACK. If a leader has received an ACK from the majority of the nodes, it sends a COMMIT for this transaction. A follower who accepts a COMMIT commits this transaction unless the sequence number received is greater than the sequence numbers in its history queue. This causes the follower to wait for COMMITs from previous transactions before committing.

Figure 11: Procedure ZooKeeper
Figure 11: Procedure ZooKeeper [9]

If the leader crashes, the nodes run a recovery protocol. Both to agree a common consistent state before resuming regular operation and to establish a new leader for transferring state changes.

Since nodes can fail and be restored, multiple leaders can emerge over time, allowing the same nodes to perform a node role more than once.
The life cycle of a node is defined in the following four phases. Each node performs an iteration of the protocol. At any time, a node can cancel the current iteration and start a new one by transitioning to phase 0.
phases 1 and 2 are especially important in terms of coordination for a consistent state for recovery after a failure [7].

0. Leader election phase

Nodes are initialized in this phase. No particular Leader election protocol must be used. As long as the leader election protocol is terminated and chooses a node which is available and the majority of the nodes have voted for it most likely. After termination of the Leader election, a node stores its vote to local volatile memory. When a node n voted for node n0, then n0 is called the prospective leader for n. Only at the beginning of phase 3 a prospective leader becomes an established leader, when it will also be the primary process [8].

1. Discovery phase

In this phase the followers communicate with their prospective leader so the leader collects information about the last transactions his followers have accepted. The intent of this phase is to determine the most recent sequence of accepted transactions between the majority of nodes. In addition, a new epoch is being established so previous leaders cannot make new proposals. Since the majority of followers have accepted all changes from the previous leader, there is at least one follower having all changes accepted from the previous leader in its history queue, meaning the new leader will have them too.

2. Synchronization phase

The synchronization phase completes the recovery part of the protocol and synchronizes the replicas in the cluster using the updated history of the leader from the discovery phase. The leader proposes transactions from its history to its followers. The followers recognize the proposals when their own history is behind the leader's history. If the leader receives approval from the majority of the nodes, it sends a commit message to them. At this point, the leader is established and no longer just a perspective leader.

3. Broadcast phase

If no crashes occur, the nodes remain in this phase indefinitely and send transactions as soon as a ZooKeeper client sends a write request.

To detect errors, ZAB employs periodic heartbeat messages between followers and their leaders. If a leader does not receive heartbeats from the majority of his followers within a certain timeout, he resigns leadership and switches to Leader election in phase 0. A follower also switches to phase 0 if it does not receive heartbeats from its leader within a timeout [7].

Byzantine Algorithms

A Byzantine fault-tolerant protocol should be able to achieve a common objective even in the event of node malicious behavior.
The paper "Byzantine General's Problem" by Leslie Lamport, Robert Shostak and Marshall Pease provided the first proof for the solution of the Byzantine General's problem: It revealed that a system with x Byzantine nodes must have at least 3x + 1 total node to reach consensus.
Byzantine nodes are the cause. If x nodes are faulty, then the system must function correctly after reconciliation with n – x nodes, since x nodes can be byzantine.
In order for the number of non-faulty nodes to exceed the number of faulty nodes, at least n – x – x – x > x is required. Therefore n > 3x + 1 is optimal.
However, the algorithm from the paper "Byzantine General's Problem" only works in synchronous environments.
Byzantine algorithms such as DLS and PBFT prodived for asynchronous environments are significantly more complex [4].

DLS Algorithm

The DLS algorithm was presented by Dwork, Lynch and Stockmeyer in the paper "Consensus in the Presence of Partial Synchrony" as a significant advancement of Byzantine algorithms.
It defines models to reach a consensus in a partially synchronous system. Partially synchronous lies between a synchronous and an asynchronous system, which is defined by the following two statements.

  • There are fixed timeouts until messages are delivered, though these are unknown in advance. The aim is to reach consensus independently of the actual timeouts.
  • The timeouts are known, yet they only apply at an unknown time. The goal is to design a system capable of achieving consensus regardless of when this time occurs.

The process of the DLS algorithm is based on rounds divided into "drying" and "lock-release" phases.

  1. Each round has a proposer and starts with each node giving a value it considers correct.
  2. The proposer suggests a value if at least n -x nodes have given this value.
  3. When a node receives the proposed value from the proposer, the proposer must lure and broadcast this information on the network.
  4. If the proposer learns from x + 1 nodes that they have lured the value, then they commiteted the value as the final value.

For the first time, DLS introduced the terms safety and liveness, which are equivalent to agreement and termination.

In addition to the DLS algorithm, there is also the Practical Byzantine Fault-Tolerance (PBFT) algorithm, available for use in asynchronous environments. However, due to the limited scope of this blog post, it is not discussed here [4].

Approach 2: Non-Deterministic

In addition to the option of using synchrony assumption to bypass the FLP impossibility, non-deterministic algorithms can also be used to achieve this.

Nakamoto Consensus (Proof of Work)

In traditional consensus algorithms, a function f(x) is defined so a proposer and a set of acceptors must all coordinate and communicate to decide on the next value.
Therefore, these algorithms often scale poorly, since each node must be aware of and communicate with every other node in the network. Using the Nakamoto Consensus, nodes do not agree on a value, but f(x) works in such a way that all nodes agree on the probability that the value is correct.
Instead of electing a leader and coordinating with all nodes, a consensus is reached on which node can solve a calculation puzzle the fastest. Nakamoto Consensus assumes that the nodes will use computational effort for the chance to decide the next block. This proof of work consensus is simpler than previous consensus algorithms, eliminating the complexity of point-to-point connections, leader choices, and square communication effort.
However, the Nakamoto Consensus requires time and energy to write a new block to solve the calculation puzzle [4].

Proof of Stake and Proof of Authority

In addition to reaching consensus on resources and mining power via PoW, the mechanisms Proof of Stake (PoS) and Proof of Authority (PoA) proceed differently.
The PoS mechanism operates with an algorithm that selects participants with the highest stakes as validators, assuming the highest stakeholders receive incentives to ensure a transaction is processed. Meanwhile, PoA uses identity as the only verification of the authority to be validated, so there is no need to use mining [10].

Cluster manager

Now, knowing the consensus protocols, a look with regard to the use of consensus algorithms in existing cluster managers is possible.

Cluster Manager Distributed key value store Consensus Protocol
Google Borg Chubby Paxos
Kubernetes/Openshift Etcd Raft
Nomad Raft
Docker Swarm Raft
Apache Marathon based on Mesos Apache Aurora ZooKeeper
Kontena Ectd Raft
Amazon Elastic Container Service (ECS) Consul Raft
Table 1: Overview of consensus protocols in cluster managers

As Table 1 illustrates, the tendency towards Raft is strong [11-14].
Surely this is due to the good comprehensibility and the many implementation requirements compared to Paxos. Although Paxos is used by Google, but rather historically in Borg. ZooKeeper is very similar to Raft and therefore a reasonable choice for Apache Marathon.

Cluster management without a consensus protocol

In contrast to a cluster management with consensus algorithm, the paper "Subordination: Cluster management without distributed consensus" proposes a cluster management without consensus algorithm.
In this paper cluster management is achieved by subdividing the nodes in the cluster via subordination. The main goal is to distribute the workload over a large number of nodes, similar to a load balancer. Cluster management without distrubuted consensus relies on the following conditions.

  • Few frequented network configuration changes
  • It is not intended for managing updates of a distributed database.
  • The node performance and node latency must remain stable.
  • A constant network traffic is assumed [15].

Node Mapping

A fundamental idea of this variant of cluster management is node mapping or the evaluation of nodes.
In general, node mapping is defined as a function that maps a node to a number. This allows nodes to be compared with each other.
Due to Amdahl's law: The higher the link performance, the higher the speedup.
Instead of including the node performance and latency separately in the mapping, the node performance and latency can be correlated. This means that only the ratio is included in the mapping [15].

Subordination Tree

In the subordination tree, each node is uniquely identified by a level and an offset. A distance can be calculated based on this.
To select a leader, a node evaluates all nodes in the network based on the mapping and calculates the distance. The node with the smallest distance and ranking is selected.
Level and Offset is only used for linear topologies (at a switch).
For non-linear topologies, latency is used for mapping.
Instead of maximizing performance, the main goal of the algorithm is to minimize network traffic per time unit if the leader is chosen and the number of nodes is unknown.
Due to the condition of low traffic changes to the network configuration, the algorithm could be initially executed and persisted when the cluster is installed.
The paper includes a performance and subordination tree test.
In the performance test, a full network scan was performed as a basis for comparison and a leader was determined.
The IP mapping was then performed by using the node mapping algorithm and a leader was determined. Figure 12 displays the result [15].

Figure 12: Performance test
Figure 12: Performance test [15]
Using node mapping algorithm, the subordination tree is built up about 200% faster.

The subordination tree test examined whether the resulting trees reached a stable state. For 100-500 nodes the structure of the subordination tree was repeated in order to obtain meaningful results. After 30 seconds the test was aborted to define a temporal upper limit. The results reveal that a stable state was achieved well below 30 seconds.


In conclusion, consensus algorithms are a central aspect of cluster management and distributed systems. Older protocols like Paxos have revealed their strengths and weaknesses over time. The experience gained has been incorporated into newer protocols such as Raft and ZooKeeper to improve understanding and robustness.
Byzantine consensus algorithms represent a particularly exciting application. Especially in times of increasing Internet crime and organized hacking, manipulated behavior of individual nodes in the public Internet must be assumed.
New technologies such as blockchain offer a new perspective on matters. For instance, a key-value store could possibly be developed using Proof of Stake or Proof of Authority as a consensus algorithm. This key-value store could be used in a public kubernetes cluster instead of Etcd. This would provide additional protection against Byzantine nodes. On the other hand, there are also incentives against cluster management with a consensus algorithm. The paper "Subordination: Cluster management without distributed consensus" shows a way to manage a cluster without a consensus algorithm.
However, this approach has many implicit conditions which cannot be guaranteed in a real distributed system. Another approach supporting this hypothesis is K3s, which shows that cluster management on a small scale is also possible without a consensus algorithm. K3s uses SQLite3 as database. However, it is possible to use Etcd3. The field of application is IoT, where often fixed IPs are assigned and therefore hardly any changes take place.
Ultimately, there is no generic solution. Currently a tendency to Raft in controlled data centers seems to exist. Nevertheless, depending on the application, a decision on whether to use cluster management with simple consensus, possibly Byzantine consensus or without consensus remains.


  1. Survey Shows Kubernetes Leading As Orchestration Platform
  2. The Techglider
    Kartik Singhal –
  3. A Cursory Introduction To Byzantine Fault Tolerance and Alternative Consensus
    Alexandra Tran-Alexandra Tran –
  4. Let's Take a Crack At Understanding Distributed Consensus
    Preethi Kasireddy-Preethi Kasireddy –
  5. Consensus Protocols: Two-phase Commit
    Henry Robinson –
  6. Understanding the Raft Consensus Algorithm: an Academic Article Summary
    Shubheksha –
  7. Architecture Of Zab – Zookeeper Atomic Broadcast Protocol
    Guy Moshkowich –
  8. ZooKeeper’s atomic broadcast protocol: Theory and practice, Andr´e Medeiros March 20, 2012
  9. A simple totally ordered broadcast protocol, Benjamin Reed, Flavio P. Junqueira
  10. Proof Of Authority: Consensus Model with Identity At Stake.
    POA Network- POA Network –
  11. Consensus Protocol
  12. Raft Consensus in Swarm Mode
  13. The Mesos Replicated Log
  14. The Developer Friendly Container & Microservices Platform
    Kontena, Inc –
  15. Gankevich, Ivan & Tipikin, Yury & Gaiduchok, Vladimir. (2015). Subordination: Cluster management without distributed consensus. 639-642. 10.1109/HPCSim.2015.7237106.

Large Scale Deployment for Deep Learning Models with TensorFlow Serving

Image source


“How do you turn a trained model into a product, that will bring value to your enterprise?”

In recent years, serving has become a hot topic in machine learning. With the ongoing success of deep neural networks, there is a growing demand for solutions that address the increasing complexity of inference at scale. This article will explore some of the challenges of serving machine learning models in production. After a brief overview of existing solutions, it will take a closer look at Google’s TensorFlow-Serving system and investigate its capabilities. Note: Even though they may be closely related, this article will not deal with the training aspect of machine learning, only inference.

Inference and Serving

Before diving in, it is important to differentiate between the training and inference phases, because they have completely different requirements.

  • Training is extremely compute-intensive. The goal here is to maximize the number of compute operations in a given time. Latency is not of concern.
  • Inference costs only a fraction of the computing power that training does. However, it should be fast. When you query the model, you want the answer immediately. Inference must be optimized for latency and throughput.

There are two ways to deploy a model for inference. Which one to use largely depends on the use case. First, you can push the entire model to client devices and have them do inference there. Lots of ML features are already baked into our mobile devices this way. T­his works well for some applications e.g. for Face-ID or activity-detection on phones, but falls flat for many other, large-scale industrial applications. You probably won’t have latency problems, but you are limited to the client’s compute power and local information. On the other hand, you can serve the model yourself. This would be suitable for industrial-scale applications, such as recommender systems, fraud detection schemes, intelligent intrusion detection systems and so forth. Serving allows for much larger models, direct integration into your own systems and the direct control and insights that come with it.

Serving Machine Learning at Scale

Of course, it’s never that easy. In most “real-world” scenarios, there isn’t really such a thing as a “finished ML model”. Consider the “Cross-industry standard process for data mining”:

Fig. 1 – Back to the basics: the six phases of CRISP-DM. Source

It might be ancient, but it describes a key concept for successful data mining/machine learning: It is a continuous process [12]. Deployment is part of this process, which means: You will replace your productive models, and you will do it a lot! This will happen for a number of reasons:

  • Data freshness: The ML model is trained on historical data. This data can go stale quickly, because new patterns constantly appear in the real world. Model performance will deteriorate, and you must replace the model with one that was trained on more recent data, before performance drops too low.
  • Model revision: With time, retraining the model might just not be enough to keep the performance up. At this point you need to revise the model architecture itself, perhaps even start from scratch.
  • Experiments: Perhaps you want to try another approach to a problem. For that reason you want to load a temporary, new model, but not discontinue your current one.
  • Rollbacks: Something went wrong, and you need to revert to a previous version.

Version control and lifecycle management aren’t exactly new ideas. However, here they come with a caveat: Since artificial neural networks are essentially “clunky, massive databases” [5], loading and unloading them can have an impact on performance. For reference, some of the most impactful deep models of recent years are a few hundreds of megabytes in size (AlexNet: 240MB, VGG-19: 574MB, ResNet-200: 519MB). But as model performance tends to scale with depth, model size can easily go to multiple gigabytes. That might not be much in terms of “Big Data”, but it’s still capable of causing ugly latency spikes when implemented poorly. Besides ML performance metrics, the primary concerns are latency and throughput. Thus, the serving solution should be able to [4]:

  • quickly replace a loaded model with another,
  • have multiple models loaded at the same time, in the same process,
  • cope with differences in model size and computational complexity,
  • avoid latency spikes when new models are loaded into RAM,
  • if possible, be optimized for GPUs and TPUs to accelerate inference,
  • scale out inference horizontally, depending on demand.

Serving Before “Model Servers”

Until some three years ago, you basically had to build your ML serving solution yourself. A popular approach was (and still is) using Flask or some other framework to serve requests against the model, some WSGI server to handle multiple requests at once and have it all behind some low-footprint web-server like Nginx.

Fig. 2 – An exemplary serving solution architecture. Source: [8]

However, while initially simple, these solutions are not meant to perform at “ultra large” scale on their own. They have difficulty benefiting from hardware acceleration and can become complex fast. If you needed scale, you had to create your own solution, like Facebook’s “FBLearnerPredictor” or Uber’s “Michelangelo”. Within Google, initially simple solutions would often evolve into sophisticated, complex pieces of software, that scaled but couldn’t be repurposed elsewhere [1].

The Rise of Model Servers

Recent years have seen the creation of various different model serving systems, “model servers”, for general machine learning purposes. They take inspiration from the design principles of web application servers and interface through standard web APIs (REST/RPC), while hiding most of their complexity. Among simpler deployment and customization, model servers also offer machine learning-specific optimizations, e.g. support for Nvidia GPUs or Google TPUs. Most model servers have some degree of interoperability with other machine learning platforms, especially the more popular ones. That said, you may still restrict your options, depending on your choice of platform.

Fig. 3 – Exemplary model server for an image recognition task. Source: [10]

A selection of popular model serving and inference solutions includes:

  • TensorFlow Serving (Google)
  • TensorRT (Nvidia)
  • Model Server for Apache MXNet (Amazon)
  • Clipper
  • MLflow
  • DeepDetect
  • Skymind Intelligence Layer for Deeplearning4j


By far the most battle-tested model serving system out there is Google’s own TensorFlow-Serving. It is used in Google’s internal model hosting service TFS², as part of their TFX general purpose machine learning platform [2]. It drives services from the Google PlayStore’s recommender system to Google’s own, fully hosted “Cloud Machine Learning Engine”. TensorFlow-Serving natively uses gRPC, but it also supports RESTful APIs. The software can be downloaded as a binary, Docker image or as a C++ library.


The core of TensorFlow-Serving is made up of four elements: Servables, Loaders, Sources and Managers. The central element in TensorFlow Serving is the servable [3]. This is where your ML model lives. Servables are objects, that TensorFlow-Serving uses for inference. For example, one servable could correspond to one version of your model. Servables can be simplistic or complicated, anything from lookup-tables to multi-gigabyte deep neural networks. The lifecycles of servables are managed by loaders, which are responsible for loading servables to RAM and unloading them again. Sources provide the file system, where saved models are stored. They also provide a list of the specific servables, that should be loaded and used in production, the aspired versions. Managers are the broadest class. Their job is to handle the full life cycle of servables, i.e. loading, serving and unloading the aspired versions. They try to fulfill the requests from sources with respect the specified version policy.

Fig. 4 – TensorFlow-Serving architecture overview. Source: [3]

When a servable is elevated to an aspired version, its source creates a loader object for it. This object only contains metadata at first, not the complete (potentially large) servable. The manager listens for calls from loaders, that inform it of new aspired versions. According to its version policy, the manager then executes the requested actions, such as loading the aspired version and unloading the previous one. Loading a servable can be temporarily blocked if resources are not available yet. Unloading a servable can be postponed while there are still active requests to it. Finally, clients interface with the TensorFlow-Serving core through the manager. Both requests and responses are JSON objects.

Simple Serving Example

Getting started with a minimal setup is as simple as pulling the tensorflow/serving Docker image and pointing it at the saved model file [16]. Here I’m using a version of ResNet v2, a deep CNN for image recognition, that has been pretrained on the ImageNet dataset. The image below is encoded in Base64 and sent to the manager as a JSON object.

Fig. 5 – Some random image to predict. Source

The prediction output of this model consists of the estimated probabilities for each of the 1000 classes in the ImageNet dataset, and the index of the most likely class.

Fig. 6 – Model output, class 771 corresponds to “running_shoe”.


Implementing and hosting a multi-model serving solution for an industrial-scale web application with millions of users, just for benchmarks, is slightly out of scope for now. However, Google provides some numbers that should give an idea of what you can expect TensorFlow-Serving to do for you.


A strong point of TensorFlow-Serving is multi-tenancy, i.e. serving multiple models in the same process concurrently. The key problem with this is avoiding cross-model interference, i.e. one model’s performance characteristics affecting those of another. This is especially challenging while models are being loaded to RAM. Google’s solution is to provide a separate thread-pool for model-loading. They report that even under heavy load, while constantly switching between models, the 99th percentile inference request latency stayed in the range from ~75 to ~150 milliseconds in their own TFX benchmarks [2].


Google claims that the serving system on its own can handle around 100,000 requests per second per core on a 16 vCPU Intel Xeon E5 2.6 GHz machine [1]. That is however ignoring API overhead and model complexity, which may significantly impact throughput. To accelerate inference on large models with GPUs or TPUs, requests can be batched together and processed jointly. They do not disclose whether this affects request latency. Since late February (TensorFlow-Serving v1.13), TensorFlow-Serving can now work directly in conjunction with TensorRT [14], Nvidia’s high-performance deep learning inference platform, which claims a 40x increase in throughput compared to CPU-only methods [15].

Usage and Adoption

In their paper on TFX (TensorFlow Extended), Google presents their own machine learning platform, that many of its services use [2]. TFX’ serving component, TFS², uses TensorFlow-Serving. As of November 2017, TensorFlow-Serving is handling tens of millions of inferences per second for over 1100 of Google’s own projects [13]. One of the first deployments of TFX is the recommender system for Google Play, which has millions of apps and over a billion active users (over two billion if you count devices). Furthermore, TensorFlow-Serving is also used by companies like IBM, SAP and Cloudera in their respective multi-purpose machine learning and database platforms [2].


Today’s machine learning applications are very much capable of smashing all practical limits: DeepMind’s AlphaGo required 1920 CPUs and 280 GPUs running concurrently in real-time, for inference, for a single “client” [6]. That example might be excessive, but the power of deep ML models does scale with their size and compute complexity. Deep learning models today can become so large that they don’t fit on a single server node anymore (Google claims that they can already serve models up to a size of one terabyte in production, using a technique called model sharding [13]). Sometimes it is worth investing the extra compute power, sometimes you just need to squeeze that extra 0.1 percent accuracy out of your model, but often there are diminishing returns,. To wrap it up, there may be a trade-off between the power of your model versus latency, throughput and runtime cost.


When you serve ML models, your return on investment is largely determined by two factors: How easily you can scale out inference and how fast you can adapt your model to change. Model servers like TensorFlow-Serving address the lifecycle of machine learning models, without making the process disruptive in a productive environment. A good serving solution can reduce both runtime and implementation costs by a significant margin. While building a productive machine learning system at scale has to integrate a myriad different steps from data preparation to training, validation and testing, a scalable serving solution is the key to making it economically viable.

References and Further Reading

  1. Olston, C., Fiedel, N., Gorovoy, K., Harmsen, J., Lao, L., Li, F., Rajashekhar, V., Ramesh, S., and Soyke, J. (2017). Tensorflow-serving: Flexible, high-performance ML serving.CoRR, abs/1712.06139
  2. Baylor, D., Breck, E., Cheng, H.-T., Fiedel, N., Foo, C. Y., Haque, Z., Haykal, S., Ispir, M., Jain, V., Koc, L., Koo, C. Y., Lew, L., Mewald, C., Modi, A. N., Polyzotis, N., Ramesh, S., Roy, S., Whang, S. E., Wicke, M., Wilkiewicz, J., Zhang, X., and Zinkevich, M. (2017). Tfx: A tensorflow-based production-scale machine learning platform. In Proceedings of the 23rd ACM SIGKDD International Conference on Knowledge Discovery and Data Mining, KDD ’17, pages 1387–1395, New York, NY, USA. ACM.
  3. TensorFlow-Serving documentation – (accessed 11.03.2019)
  4. Serving Models in Production with TensorFlow Serving (TensorFlow Dev Summit 2017) – (accessed 11.03.2019)
  5. Difference Inference vs. Training – (accessed 11.03.2019)
  6. Challenges of ML Deployment – (accessed 11.03.2019)
  7. Lessons Learned from ML deployment – (accessed 11.03.2019)
  8. (accessed 11.03.2019)
  9. (accessed 11.03.2019)
  10. (accessed 11.03.2019)
  11. (accessed 11.03.2019)
  12. (accessed 11.03.2019)
  13. (accessed 12.03.2019)
  14. (accessed 12.03.2019)
  15. (accessed 12.03.2019)
  16. (accessed 12.03.2019)
  17. (accessed 12.03.2019)

The Renaissance of column stores

While attending the lecture ‘Ultra Large Scale Systems’ I got introduced into the quite intriguing topic of high-performance data storage systems. One subject which caught my special attention were column-oriented database management systems (column stores) about which I decided to give a presentation. Being quite lengthy and intricate, I realized that the presentation left my colleagues more baffled than informed. So I decided to write a blog post to recapitulate the topic for all those who were left with unanswered questions that day and for all the rest out there who might be interested in such matters. I believe this article, even though depicting a quite technical and specialized topic, is nevertheless of general interest because it shows how a system can be optimized for performance by emphasizing on inherent design characteristics.


So what are column stores and what do we need them for?

This may be the most eminent question that crosses the mind of people who hear the term ‘column stores’ for the first time. Well let me tell you what they aren’t, a euphonic buzzword which, once uttered, will capture the attention of every IT geek in close vicinity. However, a rather matured technology that has been around since the early 70s and which has been going through constant architectonic refinements that allowed it to establish a foothold on the field of data storage systems used for large scale systems or big data management [1, 2]. Nevertheless, because of their quite specific area of application, column stores still cover a rather opaque field of technical innovation. This article, therefore, tries to provide a brief overview of the subject by giving insights into the architecture, design concepts and current technical advancements concerning column stores.

Most modern database management systems (DBMSs) rely on the  N-ary Storage Model (NSM). Here records are contiguously stored starting from the beginning of each disk page while using an offset table at the end of the page to position the start of each tuple (record). Thus, within each page the tuples are stored in sequence until the maximum page length of the storage system has been reached and a new page has to be created (figure 1). Database systems centered on this model show good access times when executing queries that either insert or modify single tuples or that result in a projection of a limited number of complete tuples [3]. The major drawback of this model, however, is its poor cache performance because it often burdens the cache with unnecessary attributes [4]. In contrast, column stores follow an entirely different concept called Decomposition Storage Model (DSM) where tables are vertically fragmented storing each attribute in a separate column (figure 1). The different attribute values for each tuple can then be reassembled by correlating their absolute position within each page. Another approach is to use binary relations based on an artificial key (surrogate) that allows reconnecting the different attributes to generate a partial or complete reconstruction of the initial tuple [5]. The performance advantage of this model can be seen when executing queries that require operations on entire columns. Those include aggregation queries (where only subsets of the entire data are required) or scan operations. Furthermore, since the data composition of each column is very homogeneous with little entropy, much better compression ratios can be reached [6]. This becomes even more accentuated when increasingly larger datasets have to be processed.

Figure 1: Row-based vs. column-based storage models

Diving into the internals

Trying to determine whether to use a row-oriented or a column-oriented storage system will inevitably result in pondering about the pros and cons of the above-mentioned architectures. It is clear that both systems have their strengths and weaknesses and the choice, like so many times, entirely depends on the problem to be solved. This may be elaborated by taking a closer look into the subject using an example. Let’s take the table depicted in figure 1 and imagine a row-oriented database system stored information about company employees in a similar manner. In that case, queries resulting in key lookups and extraction of single but complete records of employees would be executed with high performance by the system. This could be, for example, the search for a record of a specific employee by its ‘ID’ or ‘Name’. This process could even be improved by putting indexes on high cardinality columns like the ‘ID’, for example, which would further speed up the search. Thus, an application or service operating on the database soliciting requests of that classification would definitely benefit from the advantages provided by a row-oriented storage system.

However, what about a request to determine the average age of all male employees stored in the database. This kind of analytical query could in the worst case result in a complete scan of the entire table and would generate a completely different strain on the system [7]. Even though it could be mitigated by the use of composite indexes which, however, are only feasible when the table contains a small number of columns. Latter on the other hand is not the case in many big data storage systems where rather hundreds of columns per table are the norm. Working with composite indexes here will sooner or later produce an immense processing overhead which in the long run would consume substantial system resources [8]. This ultimately means, that for systems containing tables with sizes in the range of several hundred gigabytes, many analytical queries could potentially initiate sequential scans of the entire dataset. For this scenario, column stores represent a better option because the query execution would, by design, be limited to only those attributes required for the final projection. This would spare computation resources by avoiding the necessity to scan large amounts of irrelevant data and, as a result, lead to overall better performance of the system. Consequently, the right choice of the database system should, therefore, be guided by the demands posed by the services operating on it, because they ultimately define the predominant query structure processed by the system.

Query processing models

Figure 2: Row-scanner implementation

To understand the subjects in the sections that follow, some principle design aspects on how row and column stores execute queries have to be elaborated. Thus, when comparing the implementations of query execution between the two systems, fundamental differences become obvious. Capitalizing on the previous example, let’s observe the execution of the following simple SQL query “SELECT Name, Profession FROM Employees WHERE Age > 30”. The expected result from the query would be a list of the names and professions of all employees being older than 30 years. The request leads to low-level database operations where scanning processes on the corresponding table will be performed to gather the necessary datasets. In the center of every query execution are scanners that apply predicates on tuples, generate projections and provide their parent operators with the corresponding output data.

In the case of the row-scanner implementation (figure 2), the execution process is quite straightforward. Here the data is fetched from the storage layer in the form of record batches on which the scanner will perform filter operations. The data will then be forwarded to the parent operator which aggregates the data to assemble the final projection [9]. Now in case of the column store, the process looks considerably different. As illustrated in figure 3, operations are executed based on single columns rather than entire tables. Here the initial scan operation is performed on a single column containing the data on which the predicate has to be applied. Hence, in the first step, values are filtered by submitting them to predicate evaluation, leaving only a subset of the original dataset. However, instead of returning the values directly, only a list of their corresponding column positions is returned. In the steps that follow, the positions are correlated with the columns containing the requested attributes to extract the corresponding values which are then aggregated to assemble the projection [9]. Thus, the example already indicates why analytical queries may perform better on column stores than on row stores. Instead of scanning the entire table to generate an output consisting of only a small subset of all attributes of the extracted records, operations are limited to that subset of attributes from the beginning, resulting in a significant reduction of the operational overhead.

Figure 3: Column-scanner implementation

Bound to be optimized

Simply storing data in the form of columns will not bring the improvements that can be expected from column stores. Actually, with few exceptions, they usually get outperformed by row stores in most scenarios. Consequently, a number of optimization techniques have been adopted over the past years yielding significant performance enhancements. This allowed column stores to be successfully utilized in areas where large datasets have to be handled like, for example, data warehousing,  data mining or data analytics [4]. Therefore, the following section will give a brief overview of a selected number of optimization techniques which have been integrated into many modern column store systems today.


Given the characteristics of columnar data, using compression on such structures seems to be the most obvious approach to reduce disc space usage. Indeed, values from the same column tend to fall into the same domain and, therefore, display low information entropy and more value locality [10]. Those qualities allow compressing one column at a time while even permitting different compression algorithms for individual columns. In addition, if values are sorted within a column, which is common for column store systems, that column will become remarkably compressible  [6]. Another technique is ‘frequency partitioning’ where a column is reorganized in such a way, that each page of the column shows as low information entropy as possible. To accomplish this, certain column stores reorganize columns based on the frequency of values that appear in the column and allow, for example, frequent values to be stored together on the same page [11]. The improvements of such methods are apparent and investigations suggest that while row stores allow average compression ratios of 1:3, column stores usually achieve ratios of 1:10 or better. Finally, in addition to lowering disk space usage, compression also helps to improve query performance. If data is compressed, then less time is spent in I/O operations during query execution because of reduced seek times, increased buffer hit rates and less transfer time of data from disk into memory and from there to the CPU [6, 10].

Dictionary Encoding

This form of compression works fine on data sets composed of a small number of very frequent values. For each value appearing in a column, an entry is created in a dictionary table. The values in the column are then represented by integer values referencing the positions in this table. Furthermore, dictionary encoding can not only be applied to single columns but also to entire blocks [12]. Another advantage of dictionary compression is that it allows working with columns of fixed length if the system keeps all codes of the same width. This can further maximize data processing speeds in systems that rely on vectorized query execution.

Figure 4: Examples of ‘Dictionary’ encoding and ‘Run-Length’ encoding

Run-Length Encoding

The encoding is well suited to compress columns containing repeating sequences of the same value by reducing them to a compact singular representation. Here, the column entries are replaced by triple values describing the original value, its initial start position and its frequency (run-length). Hence, when a column starts with 20 consecutive entries of the value ‘male’ than these entries can be reduced to the triple (‘male’, 1, 20). This compression form works especially well on sorted columns or columns with reasonable-sized runs constituted of the same value [10].

Figure 5: Examples of ‘Bit-Vector’ encoding, ‘Differential’ encoding and ‘Frame of Reference’ encoding

Bit-Vector Encoding

In this type of encoding for each unique value in a column, a bit-string with the same length as the column itself is generated. The string contains only binary entries designating a ‘1’ if the value the string is associated with exists at the corresponding position in the column, or a ‘0’ otherwise. ‘Bit-Vector’ encoding is frequently used when columns have a limited number of unique data values. In addition, there is also the possibility to further compress the bit-vector allowing to use the encoding even on columns containing a larger amount of unique values [10].

Differential Encoding and Frame of Reference Encoding

‘Differential’ encoding expresses values as bit-sized offsets from the previous value. A value sequence beginning with ’10, 8, 6, 12′ for example, can be represented as ’10, -2, -2, 6′. The bit-size for the offset value, however, is fixed and cannot be changed once established. Therefore, special escape codes have to be used to indicate values whose offset cannot be represented with the specified bit-size. The encoding performs well on columns containing sequences of increasing or decreasing values, thus, demonstrating value locality. Those can be inverted lists, timestamps, object IDs or sorted numeric columns. As a variation of the concept, there is also the ‘Frame of Reference’ encoding which works in a very similar way. The main difference here is that the offsets do not refer to the direct predecessor but to a reference value within the set. For example, the previous sequence ’10, 8, 6, 12′ would be represented as ’10, -2, -4, 2’ with ’10’ being the reference value [13].

Operations on compressed data

Figure 6: Layout of a compression block

Performance gains through compression can be maximized when operators are able to directly act on compressed values without the need for prior decompression [14]. This can be achieved through the introduction of buffers that consist of column data in a compressed format providing an API which allows query operators to work directly on the compressed values. Consequently, a component wrapping an intermediate representation for compressed data termed a ‘compression block’ is added to the query executor (figure 6). The methods provided by the API can be utilized by query operators to directly access compressed data without having to decompress and iterate through it [6, 10]. The illustration in figure 7 should exemplify the design by showing how a query execution on compressed data would look like. The query is aimed to determine the number of male and female employees who work as accountants. In the first step, a filter operation is performed on the sorted and ‘Run-length’ encoded ‘Profession’ column by calling the corresponding API method which returns the index positions of those values passing the predicate condition (Profession = ‘Accountant’). The positions can then be used to delimit the corresponding region within the ‘Bit-Vector’ encoded ‘Gender’ column. Finally, the number of males and females working as accountants can be calculated using a corresponding API method that sums up all occurrences of the value ‘1’ within the interval. As seen, for none of the operations any decompression of the scanned data was necessary.

Figure 7: Example depicting query operations on compressed data

Vectorized execution

Figure 8: Tuple-at-a-time vs. vectorized execution

Most of the traditional implementation strategies for the query execution layer are based on the ‘iterator’ or ‘tuple-at-a-time’ model where individual tuples are moved from one operator to another through the query plan tree [10]. Each operator normally provides a next() method which outputs a tuple that can be used as input by a caller operator from further up the execution tree. The advantage of this approach is that the materialization of intermediate results is minimal. There is, however, another alternative called ‘vectorized execution’ where in contrast to the ‘tuple-at-a-time’ model,  each operator returns a vector of N tuples instead of only a single tuple (figure 8). This approach offers several advantages [6]:

  1. Reduction in interpretation overhead by limiting the amount of function calls through the query interpreters.
  2. Improved cache locality by being able to adjust the vector size to the CPU cache.
  3. Better profiling by allowing operators to execute all expression evaluation work in a vectorized fashion (i.e. array-at-a-time), keeping the overhead for measurements of individual operations low.
  4. Taking advantage of the columnar format by reading larger data batches of N tuples at a time allowing array iteration with good loop pipelining techniques, so that operations can repeatedly be executed within one function call.

Early vs. late materialization

One fundamental problem when designing the execution plan of queries for column stores is to determine when the projection of columns should occur. In a column store, information of a logical entity or object is distributed over several column pages on the storage medium. As a result, during the execution of most queries, several attributes of a singular entity have to be accessed. In many cases, however, database outputs are expected to be entity-based and not column-based. Consequently, during every execution, the information scattered over multiple columns has to be reassembled at some point, to form ‘rows’ of information about the entity. This joining of tuples is a process very common for column stores and has been coined with the term ‘materialization’ [15]. In this context, there are principally two design concepts that address the problem of column projection called ‘Early Materialization’ and ‘Late Materialization’. During query execution, most naive column stores first select the columns of relevance, construct tuples from their component attributes, and then execute standard row store operations on the resulting rows. This approach of constructing tuples early in the query plan called ‘Early Materialization’ will in many cases result in better performance for analytical queries when compared to those seen in typical row stores, however, much of the potential of column stores will still be left untouched.

Modern column stores adapted the concept of ‘Late Materialization’ where operations are performed on the basis of single columns as long as possible and projection of columns occurs late in the query plan. From this rises the necessity to work with intermediate ‘position lists’ to join operations that have been conducted on individual columns. This lists can be represented as a simple array of bit strings or as a set of ranges of positions. Those position representations can then be intersected to extract the values of interest which then confluent into the final projection [15]. Thus, the concept of ‘Late Materialization’ offers several advantages that result in significant performance boosts which can be attributed to the following characteristics:

  1. Given specific selection and aggregation operations, it is possible to completely skip the materialization of some tuples.
  2. Decompression of data for the reconstruction of tuples can be avoided which allows the continuous operation on compressed data in memory.
  3. Cache performance can be improved while operating directly on column data because irrelevant attributes for given operations can be omitted.
  4. Efficient CPU usage through operations on highly compressible position representations which, given their structure, are well suited for CPU processing.
Figure 9: Analytical query to be executed using late materialization

In the following, a typical query execution using late materialization as implemented in modern column store systems will be described. Here the query consists of a simple SQL-statement aimed to determine the number of all female employees over 30 years of age ordered by professions (figure 9). In this example, the intermediate lists are expressed in the form of bit-vectors representing the positions of those values that passed the predicates and on which bit-wise ‘AND’ operations can be executed.

The query execution illustrated in figure 10 is a select-project operation where essentially two columns of a table (Employees) are filtered, while subsequently a sum aggregation is performed to generate the projection. Thus, in the first step, the two predicates are applied to the ‘Age’ and ‘Gender’ columns which results in two bit-vectors representing only those values which passed the predicates. These are then intersected by applying a bit-wise ‘AND’ operation and the resulting bit-vector then used to extract the corresponding values from the ‘Profession’ column. In the last step, the results are aggregated to assemble the projection by grouping and summing the values from the previous operation.

Figure 10: Execution of query using late materialization

Virtual IDs

A possible way to structure columns within a column store is to associate individual columns with an identifier like, for example, a numeric primary key. Adding an identifier in such an explicit fashion, however, unavoidably introduces redundancy and increases the amount of data to be stored on the disk. To solve this problem, modern database systems try to avoid additional columns containing solely IDs by substituting them with virtual identifiers which represent the position (offset) of the tuple in the column [5]. The design can be further enhanced by implementing columns composed of fixed-width dense arrays. This allows storing attributes of an individual record at the same position across all the columns of a table. In combination with offsets, the design permits to significantly improve the localization of individual records. A value at the i-th position of a table EMP, for example, could be located and accessed by just calculating ‘startOf(EMP)+i*width(EMP)’.

Database cracking

Sorted columns are a helpful measure to significantly improve the performance of column stores, including the realization of high compression ratios. However, common approaches require a complete sorting of columns in advance, demanding idle time and workload knowledge. More dynamic approaches have brought forward architectures aiming to perform such tasks incrementally by combining them with query execution. The principal motivation is to continuously change the physical data store with every executed query. Sorting is consequently done adaptively in a continuous manner and limited to the accessed sections of a column [16]. Therefore, each query performs a partial reorganization of all processed columns making subsequential access faster. This process is called ‘Database cracking’ which allows using the database system immediately once data is available. It is an interesting approach to adaptive indexing because on every range-selection query, the data is reorganized and compartmentalized using the provided predicates as pivots. In that manner, the optimal performance is achieved incrementally without the prior need to analyze the expected workload, tune the system and create indexes. Thus, figure 11 shows an example of a search request where two consecutive queries search and ‘crack’ a column (‘Age’). The first query subdivides the column into three pieces while the second query further improves the partitioning process. The final result is a column that is partially sorted and comprised of five value ranges. Consequently, the structure of the column data represents a reflection of the query structure and thus constitutes an adaption to the data requirements of the applications (services) accessing the database.

Figure 11: Adaptive indexing of column using database cracking


The column store concept, albeit nearly half a century old, has received considerable attention during the last decade. This is due to the fact that column stores exceed when it comes to performing analytical-style processing of large datasets [9]. This has made them the storage system of choice especially for applications operating with OLAP-like workloads which rely on very complex queries often involving complete datasets. Investigations have shown, however, that substantial improvements of the basic concept of column stores are necessary to truly reap the benefits such an architecture may provide. Therefore, several optimizations have been introduced over the past years, some of which have been discussed here. Those include compression, API-based compression buffers, vectorized processing, late materialization, virtual IDs and database cracking. All of which aim to significantly improve the processing time by tackling performance issues from different angles. When adding such optimization techniques column stores outperform row stores by an order of magnitude on analytical workloads [8]. This also indicates that the architectural design will be of interest even in the years to come because of the ever-growing number of large-scale, data-intensive applications with high workload. Those include scientific data management, business intelligence, data warehousing,  data mining, and decision support systems.

Finally, there are also still directions for future developments that are worth investigating like, for example, hybrid systems that are partially column-oriented. Those could be realized in the form of architectures that store columns grouped by access frequency or that adapt to access patterns allowing to switch between column-oriented and row-oriented table structures when needed. Another issue to be addressed are the loading times of column stores, which still do not perform well when compared with row stores, especially if there are many views to materialize. Thus, studies on possible new algorithms that could alleviate the problem by substantially improve read performance would surely be an interesting field for future investigations.


[1] S. Melnik, A. Gubarev, J. J. Long, G. Romer, S. Shivakumar, M. Tolton and T. Vassilakis, Dremel: Interactive Analysis of Web-scale Datasets, Proceedings of the VLDB Endowment, VLDB Endowment, 2010, Vol. 3(1-2), pp. 330-339.

[2] D. J. Abadi, P. A. Boncz and S. Harizopoulos, Column-oriented database systems, Proceedings of the VLDB Endowment, VLDB Endowment, 2009, Vol. 2(2), pp. 1664-1665.

[3] D. Bößwetter, Spaltenorientierte Datenbanken, Informatik-Spektrum, Springer, 2010, Vol. 33(1), pp. 61-65.

[4] A. El-Helw, K. A. Ross, B. Bhattacharjee, C. A. Lang and G. A. Mihaila, Column-oriented Query Processing for Row Stores, Proceedings of the ACM 14th International Workshop on Data Warehousing and OLAP, ACM, 2011, pp. 67-74.

[5] S. Idreos, F. Groffen, N. Nes, S. Manegold, K. S. Mullender and M. L. Kersten, MonetDB: Two Decades of Research in Column-oriented Database, IEEE Data Engineering Bulletin, 2012, Vol. 35(1), pp. 40-45.

[6] D. J. Abadi, Query execution in column-oriented database systems, Massachusetts Institute of Technology, 2008.

[7] D. J. Abadi, Column Stores for Wide and Sparse Data, CIDR, 2007, pp. 292-297.

[8] D. J. Abadi, S. R. Madden and N. Hachem, Column-stores vs. row-stores: how different are they really?, Proceedings of the 2008 ACM SIGMOD international conference on Management of data, 2008, pp. 967-980.

[9] S. Harizopoulos, V. Liang, D. J. Abadi and S. Madden, Performance tradeoffs in read-optimized databases, Proceedings of the 32nd international conference on very large data bases, 2006, pp. 487-498.

[10] D. Abadi, S. Madden and M. Ferreira, Integrating compression and execution in column-oriented database systems, Proceedings of the 2006 ACM SIGMOD international conference on management of data, 2006, pp. 671-682.

[11] V. Raman, G. Swart, L. Qiao, F. Reiss, V. Dialani, D. Kossmann, I. Narang and R. Sidle, Constant-Time Query Processing, IEEE 24th International Conference on Data Engineering (ICDE ’08), IEEE Computer Society, 2008, pp. 60-69.

[12] P. Raichand, A short survey of data compression techniques for column oriented databases, Journal of Global Research in Computer Science, 2013, Vol. 4(7), pp. 43-46.

[13] J. Goldstein, R. Ramakrishnan and U. Shaft, Compressing relations and indexes, Proceedings 14th International Conference on Data Engineering, IEEE, 1998, pp. 370-379.

[14] O. Polychroniou and K. A. Ross, Efficient lightweight compression alongside fast scans, Proceedings of the 11th International Workshop on Data Management on New Hardware, 2015, pp. 9.

[15] D. J. Abadi, D. S. Myers, D. J. DeWitt and S. R. Madden, Materialization strategies in a column-oriented DBMS, IEEE 23rd International Conference on Data Engineering, 2007, pp. 466-475.

[16] S. Idreos, M. L. Kersten and S. Manegold, Self-organizing Tuple Reconstruction in Column-stores, Proceedings of the 2009 ACM SIGMOD International Conference on Management of Data, ACM, 2009, pp. 297-308.