How internet giants deliver their data to the world

In the course of attending the lecture “Ultra Large Scale Systems” I was intrigued by the subject of traffic load balancing in ultra-large-scale systems. Out of this large topic I decided to look at traffic distribution at the frontend in detail and held a presentation about it as part of this lecture. As this subject has proven to be difficult to comprehend, as long as all related factors are considered, multiple questions remained open. In order to elaborate on these questions I decided to write this blog post to provide a more detailed view of this topic for those interested. Herein, I will not discuss the subject of traffic load balancing inside an internal infrastructure, however corresponding literature can be found at the end of this article. Despite concentrating only on the frontend part of the equation an in-depth look into the workings of traffic load balancing will be provided.

Historically grown structures of the internet

The “internet” started somewhere in late 1969 as the so-called ARPANET. In those days, it was exclusively used by scientists to connect their mainframe computers. These in turn were interconnected among each other by Interface Message Processors (IMP), a predecessor of modern routers, which enables network communication through packet switching. However, the underlying protocols were fairly unreliable in heterogeneous environments, because they were specifically designed for a certain transmission medium. In 1974 Vinton Cerf and Robert Kahn, commonly known as the “fathers of the internet”, published their research paper entitled “A Protocol for Packet Network Intercommunication”. The objective of this work was to find a proper solution for connecting such diverse networks and their findings provided the base for the internet protocol suite TCP/IP in the following years. The subsequent migration of the entire ARPANET to these protocols was finally completed in early 1983 and took around six months according to Robert Kahn. At this time the term “internet” slowly began to spread around the world. In addition to this, the Domain Name System (DNS) protocol was developed in 1984. From now on it was possible to address computers in a more native way by using human-readable names instead of IP addresses. Today, all these protocols create the foundation for most connections on the internet.

Generally speaking, today’s internet giants like e.g. Google, Facebook or Dropbox are still based on these previous developments in how they deliver data to their users. In relation to the past, due to the massive increase in scale of dimensions, completely new thoughts and strategies are now necessary in order to meet the requirements. The desire for a change of the existing protocols is great, but if you consider that just the migration of the ARPANET with a significantly lower number of computers connected took six months, then such a fundamental change with several billion devices is quiet hard to imagine. In the old days, often a single web server was sufficient for all your user needs. You can simply assign an IP address to your web server, associate it with a DNS record and advertise its IP address via the core internet routing protocol BGP. In most cases, you don’t have to worry about scalability or high availability and you rely only on the BGP, which takes care of a generic load distribution across redundant network paths and increases the availability of your web server automatically by routing around unavailable infrastructure.

Bigger does not automatically mean better

Today, if you were in the role of an ultra-large-scale system operator such as Dropbox you have to handle roughly half a billion users, who meanwhile stored an exabyte of data. On the network layer this means many millions of requests every second and implies terabits of traffic. As you may have already guessed, this huge demand could certainly not be managed by a single web server. Even if you did have a supercomputer that was able to handle all these requests in some way, it is not recommended to pursue a strategy that relies upon a single point of failure. For the sake of argument, let’s assume you have this unbelievably powerful machine, which is connected to a network that never fails. At this point, please remember the 8 fallacies of distributed computing, which point out that this is only an idealized scenario and will probably never become reality. However, would that configuration be able to meet your needs as the operator? The answer is definitely no. Even such a configuration would still be limited by the physical constraints of the network layer. For instance, the speed of light is a fundamental physical constant that limits the transfer speed for fiber optic cables. This results in an upper boundary on how fast data can be served depending upon the distance it has to travel. Thus, even in an ideal world one thing is absolutely sure: When you’re operating in the context of ultra-large-scale systems, putting all your eggs in one basket is a recipe for total disaster, in other words a single point of failure is always the worst idea.

As already indicated, all the internet giants of today have thousands and thousands of machines and even more users, many of whom issue multiple requests at a time. The keyword to face this challenge is called “traffic load balancing”. This describes the approach for deciding which of the many machines in your datacenters will serve a particular request. Ideally, the overall traffic is distributed across multiple nodes, datacenters and machines in an “optimal” way. But what does the term “optimal” mean in this context? Unfortunately, there’s no definite answer, because the optimal solution depends on various factors, such as:

  • The hierarchical level for evaluating the problem or in other words does the problem exists either inside (local) or outside (global) your datacenter.
  • The technical level for evaluating the problem, which means – is it more likely a hardware or a software issue.
  • The basic type of the traffic you’re dealing with.

Let’s pretend to take the role of the ultra-large-scale system operator Facebook and start by reviewing two typical traffic scenarios: A search request for a mates profile and an upload request for a short video of your cat doing crazy stuff. In the first case, users expect that their search query will be processed in a short time and they get their results quickly. That’s why for a search request the most important variable is latency. In the other case, users expect that their video upload takes a certain amount of time, but also look for that their request will succeed the first time. Therefore, the most important variable for a video upload is throughput. These differing needs of the two requests are crucial to determine the optimal distribution for each request at the global level:

  • The search request should preferably be sent to our nearest datacenter, which could be estimated by the aid of the round-trip time (RTT), because our purpose is to minimize the latency on the request.
  • By contrast, the video upload stream will be routed on a different network path and ideally to a node that is currently underutilized, because our focus is now on maximizing the throughput rather than latency.

On the local level in turn it is often assumed that all our machines within the datacenter are at equal distance to the user and connected to the same network. Therefore, an optimal load distribution follows in the first place a balanced utilization of the resources and protects a single web server from overloading.

There is no doubt, that the above example only depicts a quiet simplified scenario. Under real conditions many more factors have been taken into consideration for an optimal load distribution. For instance, in certain cases it is much more important to direct requests to a datacenter for keeping the caches warm while the datacenter itself is slightly farther away. Another example would be the avoidance of network congestions through routing non-interactive traffic to a datacenter in a completely different region. Traffic load balancing, especially for ultra-large-scale systems operators, is anything but easy to realize and needs to be permanently refined. Typically, the search for an optimal load distribution is taking place at multiple levels, which will be described in the following sections. For the sake of accuracy, it is considered that HTTP requests will be sent over TCP. Traffic load balancing of stateless services such as DNS over UDP is minimally different, but most of the principles that will be described are usually applicable in case of stateless services as well.

First step in the right direction is called “Edge”

To meet the requirements caused by the massive increase in scale of dimensions, effectively all of today’s internet giants have built up an extensive network of points of presence (PoPs) spread around the world named “Edge”. By definition, a PoP is a node within a communication system that establishes connections between two or more networks. A well-known example is the internet PoP, which is the local access point that allows users to connect to the wide area network of their internet service provider (ISP). In our context, Edge takes advantage of this principle: The most of you may be familiar with the basic idea of content delivery networks (CDN), terminating TCP connections close to the user, which results in an improved user experience because of extremely reduced latencies. By way of illustration using the example of Dropbox, you’ll now see a comparison of an HTTP request latency made directly to a datacenter and the same request made through a PoP:

Direct comparison of a connection between user and datacenter with or without a PoP respectively.

As it can be seen, in case of a direct connection between the user in Europe and our datacenter in the USA the round-trip time (RTT) is significantly higher compared to establishing the connection using a PoP. This means that by putting the PoP close to the user, it is possible to improve latency by more than factor of two!

Minimized network congestion as a positive side effect

In addition, the users benefit from faster file uploads and downloads, because latency may also cause network congestion due to long-distance transmission paths. Just as a reminder, the congestion window (CWND) is a TCP state variable that limits the amount of data which can be send into the network before receiving an acknowledgement (ACK). The receiver window (RWND) in turn is a variable which advertises the amount of data that the destination can receive. Together, both of them are used to regulate the data flow within TCP connections to minimize congestion and improve network performance. Commonly, network congestion occurs when packets sent from a source exceed what the destination can handle. If the buffers at the destination get filled up, packets are temporarily stored at both ends of the connection as they wait to be forwarded to the upper layers. This often leads to network congestion associated with packet loss, retransmissions, reduced data throughput, performance degradation and in worst case the network may even collapse. During the so called “slow start”, which is a part of the congestion control strategy used by TCP, the CWND size is increased exponentially to reach the maximum transfer rate as fast as possible. It increases as long as TCP confirms that the network is able to transmit the data without errors. Nevertheless, this procedure is only repeated until the maximum advertised window (RWND) is reached. Subsequently, the so called “congestion avoidance” takes place and continues to increase the CWND as long as no errors occur. This means that the transfer rate in a TCP connection, determined by the rate of incoming ACK, depends on the bottleneck in the round-trip time (RTT) between sender and receiver. However, the adaptive CWND size allows TCP to be flexible enough to deal with arising network congestion, because the path to the destination was permanently analyzed beforehand. It can adjust the CWND for reaching an optimal transfer rate with minimal packet loss.

Let’s keep that in mind and take up again our initial thought to see how latency affects growth of the CWND during file uploads:

Comparison of the impact from latency on connections with or without a PoP repectively.

By using the example of Dropbox, you can see the impacts arising from low latency compared to high latency for connections with or without a PoP respectively. The transfer history of our client with lower latency is progressing through the so called “slow start” more quickly, because as you already know this procedure is depending on the RTT. This client also settles down on a higher threshold for the so called “congestion avoidance”, because its connections have a lower level of packet loss. This is due to the fact that packets spend less time on the internet and its possibly congested nodes. Once they reach the PoP, you can take them into our own network.

Enhanced approach for congestion avoidance

As you may have already guessed the congestion control strategy as described above is no longer an adequate response due to the transfer speeds that are available today. Since late 1980 the internet has managed network congestion through indications of packet loss to reduce the transfer rate. This worked well for many years, because the small buffers of the nodes were well aligned to the overall low bandwidth of the internet. As a result, the buffers get filled up and drop packets that they could not handle anymore at the right time when the sender has begun transmitting data too fast.

In today’s diverse networks the previous strategy for congestion control is slightly problematic. Therefore, in small buffers on the one hand, packet loss often occurs before congestion. Through commodity switches having such small buffers, which are used in combination with today’s high transfer speeds, the congestion control strategy based on packet loss can result in extremely bad throughput because it commonly overreacts. The consequence is a decreased sending rate upon packet loss that is even caused by temporary traffic bursts. In large buffers on the other hand, congestion occurs usually before packet loss. At the Edge of today’s internet, the congestion control strategy based on packet loss causes the so called “bufferbloat” problem, by repeatedly filling these large buffers in many nodes on the last mile which results in an useless queuing delay.

That’s why going forward a new congestion control strategy, which responds to actual congestion rather than packet loss, is much-needed. The Bottleneck Bandwidth and Round-trip propagation time (BBR) developed by Google tackles this with a total rewrite of congestion control. They started from scratch by using a completely new paradigm: For the decision on how fast data can be send over the network, BBR considers how fast the network is delivering the data. For that to happen, it uses recent measurement of the network’s transmission rate and round-trip time (RTT) to generate a model that includes the current maximum bandwidth that is available as well as its minimum current round-trip delay. Afterwards, this model is used by BBR to control both how fast data is transmitted and the maximum amount of data that is possible in the network at any time.

How to detect the optimal PoP locations

After our extensive discursion on network congestion and how it may be avoided, let’s get back to Edge and its network of PoPs. In case of Dropbox, according to the latest information, they have roughly 20 PoP spread across the world. This year it is planned to increase their network by examining the feasibility of further PoPs in Latin America, Middle East and the Asia-Pacific region. Unfortunately, the procedure for selecting PoP locations, which was easy first, now becomes more and more complicated. Besides focusing on available backbone capacity, peering connectivity and submarine cables, all the existing locations have to be considered too.

Their method to select PoPs is still done manually but is already assisted by algorithms. Even with a small number of PoPs and without assistive software it is challenging to choose between, for instance a PoP in Latin America or Australia. Regardless of the number of PoPs, the basic question always remains: What location will benefit the users better? For this purpose, the following short “brute-force” approach helps them to solve the problem:

  1. First, the Earth is divided into multiple levels of so called “S2 regions”, which are based on the S2 Geometry library provided by Google. Through this, it is possible to project all your data in a three-dimensional space instead of a two-dimensional projection what results a more detailed view of the Earth.
  2. Then all existing PoPs are assigned accordingly to the prior generated map.
  3. Afterwards, the distance to nearest PoP for all regions weighted by “population” is determined. In this context the term “population” refers to something like the total number of people in a specific area as well as the number of existing or potential users. This is used for optimization purposes.
  4. Now, exhaustive search is done to find the “optimal” location for the PoP. In connection with that, L1 or L2 loss functions are used to minimize the error for determining the score of each placement. Hence, they try to compensate internet quirks such as the effects of latency on the throughput of TCP connections.
  5. Finally, the new-found location will be added to the map as well.
  6. The steps 3 to 5 are repeated for all PoPs until an “optimal” location is found for each of them.

The attentive reader may have already seen that the above problem can also be solved by more sophisticated methods like Gradient Descent or Bayesian Optimization. This is indeed true, but the problem space is relatively small with roughly 100.000 S2 regions, so that they can just brute-force through it and still achieve a satisfying result instead of running the risk to get stuck on a local optimum.

The beating heart of the Edge

Now, let’s have a closer look at Global Server Load Balancing (GSLB), which is completely without doubt the most important part of the Edge. It is responsible for load distribution of the users across all PoPs. This usually means that a user is sent to the closest PoP, which has free capacity and is not under maintenance. Calling the GSLB the “most important part” here is due to the fact that if it misroutes users to a suboptimal PoP frequently, then it potentially reduces performance and your Edge may be rendered useless. Using the example of Dropbox, commonly used techniques for GSLB should be discussed hereinafter by showing up the pros and cons.

Keep things easy with BGP anycast

Anycast is by far the easiest method to implement traffic load balancing, because it completely relies on the Border Gateway Protocol (BGP). Just as a quick reminder, anycast is a basic routing scheme where the group members are addressed by a so called “one-to-one-of-many” association. This means that packets are routed to any single member belonging to a group of potential receivers, which are all identified by the same destination IP address. The routing algorithm then selects a single receiver from the group based on a metric that relies on the least number of hops. This leads to the fact that packets are always routed to the topologically nearest member of an anycast group.

Nationwide traffic distribution using anycast.

As it can be seen, to use anycast it is sufficient to just start advertising the same IP subnet from all PoPs and the internet respectively. DNS will deliver packets to the “optimal” location as if by magic. Even though you get a rudimentary failover mechanism for free and the setup is quiet simple, anycast has a lot of drawbacks, so let’s have a look at these in detail.

Anycast is not the silver bullet

Above, it was mentioned that BGP automatically selects the “optimal” route to a PoP and from a general point of view this is not incorrect. However, the policy of Dropbox favors network paths that are likely to optimize the traffic performance. But the BGP does not know anything about latency, throughput or packet loss and it relies only on attributes such as path length, which turns out as inadequate heuristics for proximity and performance. Thus, traffic load balancing based on anycast is optimal in most cases, but it would seem that there is a poor behavior on high percentiles if there is only a small or medium number of PoPs involved. However, it looms that the probability of routing users to the wrong destination in networks using anycast decreases with the number of PoPs. Nevertheless, it is possible that with an increasing number of PoPs, anycast may eventually exceed the traffic load balancing method based on DNS, which is discussed below.

Limited traffic steering capabilities

Basically, with anycast control over the traffic is very limited, so that the capacity of a node may not be able to forward all traffic you would like to send over it. Considering that peering capacity is quiet limited, rapid growth in demand can quickly result in an overstrain of the capacity of an existing interconnection. Short-time peaks in demand due to an event or a holiday as well as reductions in capacity caused by failures can lead to variations in demand and available capacity. In any of these cases the decisions BGP makes won’t be affected by surrounding factors, as it is not aware of capacity problems and related conflicts. Nevertheless, you can do some “lightweight” traffic distribution using specific parameters of BGP in the announcements or by explicitly communicating with other providers to establish a peering connection, but all this is not scalable at all. Another pitfall of anycast is that careful drain of PoPs at times of reduced demand is effectively impossible, since BGP balances packets, not connections. As soon as the routing table changes, all incoming TCP connections will immediately be routed to the next best PoP and the user’s request will be refused by a reset (RST).

The crux of the matter is often hard to find

Most commonly, general conclusions about traffic distribution with anycast are not trivial, since it involves the state of internet routing at a given moment. Troubleshooting performance issues with anycast is like looking for a needle in a haystack and usually requires a lot of traceroutes as well as a long wind in communicating with involved providers. It is worth mentioning again, that in the case of draining PoPs, any connectivity change in the internet has a possibility to break existing TCP connections to IP addresses using anycast. Therefore, it can be quiet challenging to troubleshoot such intermittent connection issues due to routing changes and faulty or rather misconfigured hardware components.

Light and shadow of load distribution with DNS

Usually, before a client can send an HTTP request, it often has to look up an IP address first using DNS. This provides the perfect opportunity for introducing our next common method to manage traffic distribution called DNS load balancing. The simplest way here is to return multiple A records for IPv4 and AAAA records for IPv6 respectively in the DNS reply and let the client select an IP address randomly. While the basic idea is quiet simple and easy to implement, it still has multiple challenges.

The first problem is that you have only little control over the client behavior, because records are selected arbitrary and each will attract roughly the same amount of traffic. But is there any way to minimize this problem? On paper, you could use a SRV record to predefine specific priorities, but unfortunately these records have not yet been adopted for HTTP.

Our next potential problem is due to the fact that usually the client cannot determine the closest IP address. You can bypass this obstacle by using an anycast IP address for authoritative name servers and take advantage of the fact that DNS queries will lead to the nearest IP address. Thus, the server can reply addresses pointing to the closest datacenter. An improvement of that builds a map of all networks as well as their approximate geographical locations and serves corresponding DNS replies. However, this approach implies a much more complex DNS server implementation and a maintaining effort to keep the location mapping up to date.

The devil is often in the detail

Of course, none of the solutions above are easy to realize, because of fundamental characteristics of DNS: A user rarely queries authoritative name servers directly. Instead, a recursive DNS server is usually located between a user and the name server. This server proxies queries of the users and often provides a caching layer. Thereby, the DNS middleman has the following three implications on traffic management:

  • Recursive resolution of IP addresses
  • Multiple nondeterministic reply network paths
  • Additional caching problems

As you may have already guessed the recursive resolution of IP addresses is quiet problematic, because instead of the user’s IP address an authoritative name server sees the IP address of the recursive resolver. This is a huge limitation, as it only allows replies optimized for the shortest distance between the location of the resolver and the name server. To tackle this issue, you can use the so called “EDNS0” extension, which includes information about the IP subnet of the client inside the DNS query sent by a recursive resolver. This way, an authoritative name server returns a response that is optimal from the perspective of the user. Although this is not yet standard, the biggest DNS resolvers such as Google have decided to support it already.

The difficulty here is not only to return the optimal IP address by a name server for a given user’s request, but that name server may be responsible for serving thousands or even millions of users spread across multiple regions. For example, a large nationwide ISP might operate name servers for its entire network inside one datacenter, which has interconnections for each provided area. These name servers would always return the IP address that is optimal from the perspective of the ISP, despite there being better network paths for their users. But what does the term “optimal” mean in the context of DNS load balancing? The most obvious answer to this question is a location that is closest to the user. However, there are existing additional criteria. First of all, the DNS load balancer has to ensure that the selected datacenter and its network are working properly, because sending user requests to a datacenter that has currently problems would be a bad idea. Thus, it is essential to integrate the authoritative DNS server into the infrastructure monitoring.

The last implication of the DNS middleman is related to caching. Due to the fact that authoritative name servers cannot flush the caches of a resolver remotely, it is best practice to assign a relatively low time to live (TTL) for DNS records. This effectively sets a lower boundary on how quickly DNS changes can be theoretically propagated to users. But the value has to be set very carefully, since if the TTL is too short, it results in a higher load on your DNS servers plus an increased latency, because clients have to perform DNS queries more often. Moreover, Dropbox concluded that the TTL value in DNS records is fairly unreliable. They used a TTL of one minute for their top level domain (TLD) and in case of changes it takes about 15 minutes to drain 90% of the traffic and in total roughly a full hour to drain additional 5% of traffic.

DNS load balancing in the field

Despite all of the problems that are shown above, DNS is still a very effective way for traffic load balancing in ultra-large-scale systems.

Nationwide traffic distribution using DNS dependent on geographical location.

By using the example of Dropbox, you can see an approach where each PoP has its own unique unicast IP address space and DNS is responsible for assigning different IP addresses to the users based on their geographical location. Just as a quick reminder, unicast is another basic routing scheme where sender and receiver are addressed by a so called “one-to-one” association. This means that each destination IP address uniquely identifies a single receiver. It gives them control over traffic distribution and also allows a careful drain of PoPs. It is worth mentioning that conclusions in the context of a unicast setup are much easier and troubleshooting becomes simpler as well.

Hybrid unicast/anycast combines the best of both worlds

Finally, let’s briefly cover one of the composite approaches to GSLB, which is a hybrid setup with a combination of unicast and anycast. This allows getting all benefits from unicast and the ability to quickly drain PoPs if it is required along with DNS replies corresponding to geographical locations.

Nationwide traffic distribution using a combination of unicast and anycast.

As it can be seen, this approach is used by Dropbox to announce the unicast IP subnet of a PoP and one of its IP supernets simultaneously from all of the PoPs. However, it implies that every PoP needs a configuration that is able to handle traffic destined to any PoP. This will be realized inside a PoP by a L4 load balancer using virtual IP addresses (VIP). Basically, these VIPs are not assigned to any particular machine and usually shared across many of them. However, from the perspective of the user a VIP remains a single, regular IP address. On paper, this practice allows to hide the number of machines behind a specific VIP and other implementation details. This gives them the ability to quickly switch between unicast and anycast IP addresses as needed and also an immediate fallback without waiting for the unreliable TTL to expire. Thereby, a careful drain of PoPs is possible and all the other benefits of DNS load balancing will remain as well. All of that comes at a relatively small operational cost with a slightly more complex setup and may cause trouble in respect of scalability, once the amount of several thousand VIPs is reached. But the positive side effect of this approach is that the configuration of all PoPs becomes clearly more uniform.

Real User Measurement brings light into the darkness

At all the methods for traffic load balancing discussed up until now one critical problem always remains: None of them covers the actual performance which a user experiences. Instead, they rely only on some approximations such as the number of hops used by BGP and the assignment of IP addresses in case of DNS based on geographical location. However, this lack of knowledge can be bridged by using Real User Measurement (RUM). For example, Dropbox is collecting such detailed performance information from their desktop clients. As they said, a lot of time is put into the implementation of a measurement framework, helping them to estimate the reliability of their Edge from the perspective of a user. The basic procedure is pretty simple: From time to time, a subset of clients checks the availability of their PoPs and reports back the results. Afterwards, they extended this by collecting latency information too, which allows them a more detailed view of the internet.

Additionally, they build a separate mapping by joining data from their DNS and HTTP servers. On top of that, they added information about the IP subnet of their clients from the so called “EDNS0” extension, which is sent inside the DNS query by a recursive resolver. Now, they combined this with the aggregated latencies, BGP configuration, peering information and utilization data from the infrastructure monitoring to create a decidedly mapping of the connection between clients and their Edge. Finally, they compared this mapping to both anycast as well as the assignment of IP addresses in case of DNS based on geographical location and packed it into a so called “radix tree”, which is a special data structure for a memory-optimized prefix tree, to upload it onto a DNS server.

To expand the mapping for extrapolation purposes they made their decisions using the following patterns: In case that all samples for a node have the same “optimal” PoP in the end, they conclude that all IP address ranges announced by that node should lead to that PoP. Otherwise, if a node has multiple “optimal” PoPs, they break it down into announced IP address ranges. While doing so, for each one it is assumed that if all measurements in an IP address range end up at the same PoP, this choice can be extrapolated to the whole range as well. With this approach they are able to double their map coverage, make it more robust to changes and generate a mapping based on a smaller amount of data.

Reaching the safe haven

In conclusion, all this back and forth with finding out a user’s IP address based on its resolver, the effectiveness of assigning an IP address in case of DNS based on geographical location, the fitness of decisions made by BGP and so on are all no longer necessary after a request arrives at the Edge. Because from that moment on, you know the IP address of each user and even have a corresponding round-trip time (RTT) measurement. Now, it is possible to route users on a higher level such as embedding a link to a specific PoP in the HTML file. This allows traffic distribution on a more detailed level.

As you may have realized by reading this article, traffic load balancing is a difficult and complex problem that is really challenging, if you want to consider all related factors. In addition to the strategies described above, there are much more traffic distribution algorithms and techniques to reach high availability in ultra-large-scale systems. If you want to know more about the idea of traffic load balancing, or which facts you have to consider for developing a strategy of traffic distribution inside the PoPs, please follow the links at the end of this article.

References and further reading

  1. Dropbox traffic infrastructure: Edge network by Oleg Guba and Alexey Ivanov
    https://blogs.dropbox.com/tech/2018/10/dropbox-traffic-infrastructure-edge-network/
  2. Steering oceans of content to the world by Hyojeong Kim and James Hongyi Zeng
    https://code.fb.com/networking-traffic/steering-oceans-of-content-to-the-world/
  3. Site Reliability Engineering: Load Balancing at the Frontend by Piotr Lewandowski and Sarah Chavis
    https://landing.google.com/sre/sre-book/chapters/load-balancing-frontend/
  4. Directing traffic: Demystifying internet-scale load balancing by Laura Nolan
    https://opensource.com/article/18/10/internet-scale-load-balancing
  5. What is CWND and RWND by Amos Ndegwa
    https://blog.stackpath.com/glossary/cwnd-and-rwnd/
  6. TCP BBR congestion control comes to GCP – your Internet just got faster by Neal Cardwell et al.
    https://cloud.google.com/blog/products/gcp/tcp-bbr-congestion-control-comes-to-gcp-your-internet-just-got-faster
  7. What Are L1 and L2 Loss Functions? by Amit Shekhar
    https://letslearnai.com/2018/03/10/what-are-l1-and-l2-loss-functions.html

Consensus protocols – A key to cluster management?

blogpost

In these times, applications require increasing robustness and scalability, since otherwise they will collapse under the burden of the vast number of users. Cluster managers like kubernetes, Nomad or Apache Marathon are a central factor of this resilience and scalability. A closer look at the insides of cluster managers reveals consensus protocols to be the crucial point. This blog post gives an overview of the most common consensus protocols and their workflows. Furthermore, the concept of a consensus protocol is questioned and potential improvements of the consensus protocols in cluster management are discussed.

Table of contents

List of figures

  1. Container Management Platforms Perferences
  2. Distributed System
  3. Concurrency
  4. State transition
  5. Consensus algorithm
  6. FLP impossibility
  7. Two-phase commit, fault-free execution, phase one
  8. Two-phase commit, fault-free execution, phase two
  9. Two-phase commit, with coordinator failure, phase one
  10. Two-phase commit, with coordinator failure, phase two
  11. Procedure ZooKeeper
  12. Performance test

Motivation

Container management platforms are the preferred choice when it comes to orchestrating containers with high availability, reliability and scalability. The diagram in Figure 1 shows the distribution of container management platforms from 2016 to 2017.

Figure 1: Container Management Platforms Preferences
Figure 1: Container Management Platforms Preferences [1]

In comparison to the other platforms, a clear trend towards kubernetes is evident. In order to understand why such a trend exists, it is necessary to look behind the scenes. Are there differences or similarities between these platforms in terms of consensus protocols? Is this a critical factor for the emergence of this trend? In order to clarify, a number of essential terms need to be addressed.

Distributed system

A distributed system includes a set of distinct processs sending messages to each other and coordinating to achieve a common objective.
Even a single computer can be viewed as a distributed system.
Memory units, input-output channels and the central control unit also represent separate proceses collaborating to complete an objective.
In this blog post the focus is on distributed systems where the processes are spatially distributed across computers [4].

Figure 2: Distributed System
Figure 2: Distributed System [4]

Properties of a distributed system

The following features are particularly important in relation to a distributed system.

1. Concurrency

Nodes or Processes running simultaneously require coordination. As illustrated in Figure 3, this concurrency results in different events at certain points in time.

Figure 3: Concurrency
Figure 3: Concurrency [2]
2. Lack of global clock

The order of the events must be determined. However, there is no global clock that can be used to determine the order of events on the computer network.
The following two factors can be considered to determine whether an event happened before another.

  • Messages are sent before they are received
  • Every computer has a sequence of events

This results in a partial sequence of events of the system. In order to obtain a total sequence of the system's events, an algorithm is required which requires the communication of the computers in this system.
If an algorithm relies only on the order of events, abnormal behavior may occur.
Such abnormal behavior can be avoided by synchronizing physical clocks.
The coordination of independent clocks is rather complex and still clock drifts can occur.
The time and sequence of events are fundamental obstacles in a distributed system with spatially distributed processes [4].

3. Independent failure of components

A key aspect is the insight that components can be faulty in a distributed system.
It is impossible to have an error-free distributed system due to the large number of potential failures.
Faults can be divided into the following three groups.

  • Crash-fail: The component stops immediately without warning.
  • Omission: The component sends a message which does not arrive.
  • Byzantine: The component behaves arbitrarily, it sometimes exhibits regular behavior but also malicious Byzantine behavior. This variant is irrelevant in controlled environments like data centers.

Based on this assumption, protocols should be designed to allow a system to have faulty components, yet achieve a common objective and provide a meaningful service.

Since every system has failures, a major consideration is the ability of the system to survive if its components deviate from normal behavior regardless of whether they are malicious or not.
Basically, a distinction is made between simple fault-tolerance and Byzantine fault-tolerance.

  • Simple fault-tolerance: In systems with simple fault-tolerance, one assumes that components either exactly follow the protocol or fail completely. Arbitrary or malicious behavior is not considered.
  • Byzantine fault-tolerance: In uncontrolled environments, a system with simple fault-tolerance is not particularly useful. In a decentralized system, where components are controlled by independent actors communicating on the public, unapproved Internet, malicious components must also be expected.

The BAR fault-tolerance extends the Byzantine fault-tolerance and defines the following three classes.

  • Byzantine: Components that are malicious
  • Altruistic: Components that always follow the protocol
  • Rational: Components that only follow the protocol when it is convenient [4].
4. Message transmission

Messages are sent either synchronously or asynchronously.

  • Synchronous: In a synchronous system, messages are assumed to be delivered within a fixed time window. Conceptually, synchronous messaging is less complex because it guarantees a response.
    However, this variant is often impracticable in a genuine distributed system, with computers crashing, messages being delayed, or not arriving.
  • Asynchronous: In an asynchronous system, it is assumed that the network delays, duplicates, or sends out-of-order messages for an infinite amount of time.

Replicated State Machine

A replicated state machine is a deterministic state machine distributed across many computers, though acting as a single state machine. The state machine works even if an arbitrary computer fails.
A valid transaction in a replicated state machine results in a transition to another state.
A transaction represents an atomic operation on the database complying with the ACID principle.

Figure 4: State transition
Figure 4: State transition [4]

A replicated state machine is a set of distributed computers all starting with the same initial value.
Each of the processes decides on the next value for each state transition, illustrated in Figure 4.
Achieving consensus implies the collective agreement of all computers on an output value based on the current value. A consistent transaction log thus is obtained for each computer in the system.
A replicated state machine must continuously accept new transactions in the log, even when:

  • Some computers fail,
  • The network fails to send messages reliably,
  • No global clock exists to determine the order of events.

This is the fundamental intent of any consensus algorithm as depicted in Figure 5.

Figure 5: Consensus algorithm
Figure 5: Consensus algorithm [4]

A consensus algorithm achieves consensus when the following three conditions are satisfied:

  • Agreement (or safety): All non-faulty nodes decide on the same output value.
  • Validity: The value to be decided on must have been proposed by a node in the network.
  • Termination (or liveness): All non-faulty nodes may decide on an output value.

Typically a consensus algorithm has three types of actors in the system.

  1. Proposers, often referred to as leaders or coordinators
  2. Acceptors or followers, are the ones who listen to the requests of the proposers and answer
  3. Learners, are the ones who learn the output value resulting from the vote

A consensus algorithm generally has the following procedure.

  1. Elect: In this phase the leader is selected. A leader makes the decisions and proposes the next valid output.
  2. Vote: All non-faulty components listen to the proposed value of the leader, validate it and propose it as the next valid value.
  3. Decide: All non-faulty components must come to a consensus on a correct output value, otherwise the procedure is repeated [4].
FLP impossibility

As described above, there are differences between synchronous systems and asynchronous systems. In synchronous environments, messages are delivered within a fixed time frame. In asynchronous environments, there's no guarantee of a message being delivered.

Figure 6: FLP impossibility
Figure 6: FLP impossibility [3]

Reaching consensus in a synchronous environment is possible due to assumptions about the maximum time required to deliver messages. In such a system, different nodes are allowed to alternately propose new transactions, search for a majority vote, and skip each node if they do not offer a proposal within the maximum time period.

In a fully asynchronous system there is no consensus solution that can tolerate one or more crash failures even when only requiring the non triviality property [4]

The FLP impossibility describes the property of being unable to accept a maximum message delivery time in an asynchronous environment. termination becomes much more difficult, if not impossible. This is necessary since termination conditions must be complied with in order to reach a consensus, meaning every node not having a fault must decide on an output value [4].

To circumvent the FLP impossibility there are two options.

  • Use synchrony assumptions
  • Use non-determinism

Consensus protocols

Consensus protocols can be distinguished into two basic approaches: Synchrony assumption and non-deterministic.

Approach 1: Use Synchrony Assumptions

If messages are sent asynchronously, termination cannot be guaranteed.
How can it be guaranteed that every non-faulty node will choose a value?
Due to asynchronicity, a consensus cannot be reached within a fixed time window.
This leads to the conclusion that consensus cannot always be reached.
One way to prevent this are timeouts. If there is no progress, the system waits until the timeout and restarts the process. Consensus algorithms like Paxos and Raft apply this method [4].

Simple fault-tolerance

The following section describes algorithms of the Simple fault-tolerance category. These differ from the Byzantine fault-tolerance algorithms in terms of either following the protocol or failing. Byzantine nodes may also exhibit malicious behavior.

Two-phase commit

The two-phase commit is the simplest and most commonly utilized consensus algorithm. As the name suggests, the algorithm consists of two different phases.
The first phase is the proposal phase. It involves proposing a value for each participant in the system and obtaining the answers as shown in Figure 7.

Figure 7: Two-phase commit, fault-free execution, phase one
Figure 7: Two-phase commit, fault-free execution, phase one [5]

The second phase is the commit or abort phase. In this phase, the result of voting is communicated to all participants in the system. Also, it is transmitted whether to continue and decide or erase the log, as depicted in Figure 8.

The node proposing the values is referred to as the coordinator. The coordinator is not required to be selected by means of a special procedure. Each node may act as a coordinator and thus start a new round of the two-phase commit.

It is important that the nodes do not reach a consensus on what a value should be, but reach a consensus on whether or not to accept it.

Figure 8: Two-phase commit, fault-free execution, phase two
Figure 8: Two-phase commit, fault-free execution, phase two [5]

In phase 2 each node decides on the value proposed by the coordinator in phase 1 when and only when communicated by the coordinator. The coordinator sends the same decision to each node, so if a node is instructed to determine a value, they all do. Therefore the condition for agreement is satisfied.

The two-phase commit always aborts, except when each node approves. In all cases, the final value of at least one node was voted on. Thus the condition for validity is complied with.

Finally, termination is guaranteed if each node makes progress and finally returns its vote to the coordinator, who passes it on to each node. There are no loops in the two-phase commit, so there is no possibility to continue forever [5].

Crashes and failure

To understand the failures, it is necessary to consider each state the protocol may take and contemplate what occurs if either the coordinator or one of the participants crashes in this state.

In phase one, before messages are sent, the coordinator could crash, as illustrated in Figure 9. This is not too troublesome as it simply means that Two-phase commit is never started.
If a participant node crashes prior to starting the log, then no harm will result until the proposal message does not reach the crashed node, so this fault can be corrected later.

Figure 9: Two-phase commit, with coordinator failure, phase one
Figure 9: Two-phase commit, with coordinator failure, phase one [5]
If the coordinator crashes, but some proposal messages have not all been sent, some nodes have received a proposal and are starting a two-phase commit round, and some nodes are unaware anything happened. If the coordinator does not recover for a long time, the nodes receiving the proposal will block and wait for the result, which may never complete. This can mean nodes have sent their votes back to the coordinator without knowing they failed. Therefore, the protocol cannot simply be aborted due to a possibility that the coordinator awakens again, sees their "commit" votes, and starts phase 2 of the protocol with a commit message.

Therefore, the protocol is blocked on the coordinator and cannot make any progress. The problem of waiting for a participant to fulfill his or her part of the protocol cannot be completely resolved.

To counteract this, another participant can take over the coordinator's work once the coordinator is determined to have crashed. If a timeout occurs at a node, it can be forced to complete the protocol the coordinator started.
As in a phase 1 message, this node can contact all other participants and discover their votes. However, this requires persistence in each node.

It is also possible that only one node knows the result of the transaction. If the coordinator fails in phase 2 before all nodes are told to abort/transmit the decision, as shown in Figure 10.

Figure 10: Two-phase commit, with coordinator failure, phase two
Figure 10: Two-phase commit, with coordinator failure, phase two [5]

However, if another node crashes before the recovery node can end the protocol, the protocol cannot be restored to its original state.

The same applies to phase 2. If the coordinator crashes before the commit message has reached all participants, a recovery node is required to take over the protocol and safely complete it.

The worst-case scenario is when the coordinator is a participant himself and grants himself a vote on the result of the protocol. Then, if it crashes, both the coordinator and a participant are shut down, ensuring that the protocol remains blocked as a result of a single fault [5].

Paxos

The Paxos protocol consists of the following phases.

Phase 1: Prepare request

The proposer picks a new number n and sends a prepare request to all acceptors.
If all acceptors have a prepare request (prepare, n), they send a response (ack, n, n', v') or (ack, n, ,). In order for acceptors to respond with a promise, n must be greater than any number ever received before.
Acceptors now propose the value v of the proposal with the highest number they have accepted, if any. Otherwise, they reply with ^.

Phase 2: Accept request

When the proposer receives the responses of the majority of acceptors, it sends an accept request (accept, n, v) with the number n and the value v to the acceptors. The number n is the number from the phase 1 prepare request.
The value v is the highest numbered proposal among the responses.
If an acceptor receives an Accept Request (accept, n, v), it accepts the proposal unless it has already responded to a Prepare Request with a number greater than n. The value v is the highest numbered proposal among the responses.

Phase 3: Learning phase

Whenever an acceptor accepts a proposal, he answers to all Learners with (accept, n, v).
Learners receive (accept, n, v) from a majority of acceptors, decide v and send (decide, v) to all other Learners. Learners receive (decide, v) and the decided value v.

Each Distributed System contains faults. To counteract these, the decision is delayed in Paxos if a proposer fails. A new number is used to start in phase 1, even if previous attempts have not yet been completed.

Paxos is difficult to understand due to its many deliberately open implementation details.
Questions like: When to be certain if a proposer has failed? or Do synchronous clocks have to be used to set the timeouts? are some of these implementation details.
Leader election, failure detection and log management are also purposely kept open to ensure greater flexibility. However, exactly these design decisions are the biggest disadvantages of Paxos [4].

A Leader election mechanism in Paxos might be realized by a simple algorithm like the bully algorithm.
It starts by sending a server id to all nodes. If an id is received, a node sends a response containing the own id.
Next, a node checks if all responses have lower ids than its own. If this is the case the node is a the new leader.
If the id of a node is higher than a the received id, the node starts its own election.

The procedure of performing multiple Paxos decisions consecutively based on a log is called Multi-Paxos.

Raft

Unlike Paxos, Raft is designed with a focus on intelligibility.
For the first time, Raft introduced the concept of shared timeouts to deal with termination. If an error occurs in Raft, a restart is performed. Nodes wait at least one timeout period until they try to declare themselves leader again. This guarantees progress.

The shared status in Raft is typically represented by a replicated log data structure. Like Paxos, Raft requires a majority of servers that are available to operate correctly. In general, Raft consists of the following three elements.

  • Leader election: If the current leader fails, a new leader must be elected. The leader is responsible for accepting client requests and managing the replication logs of other servers. Data always flows from Leader to other servers.
  • Log replication: The leader synchronizes the logs of all other servers by replicating his own log.
  • Safety: If a server commits a log entry with a particular index, other servers cannot set a different log entry for that index.

Raft servers can have the following three states.

  • Leader: Typically only one leader exists, all other servers in the cluster are followers. Client requests from the followers are forwarded to the leader.
  • Follower: A follower is passive. It only responds to requests from leaders or candidates, or forwards client requests.
  • Candidate: A computer in this state wants to be elected as the new leader.

If a candidate wins an election to be leader, this leader remains for an arbitrary period of time, referred to as the term. Each term is identified by a term number, which is incremented after each term. Each server must persistently store the current term number.

Raft uses the remote procedure calls RequestVotes and AppendEntries.
RequestVotes are used by candidates during elections.
AppendEntries are used by leaders for replication log entries and as heartbeat [6].

Leader election

Leaders periodically send heartbeats to the followers. A Leader election is triggered when a follower does not receive a heartbeat from the leader for a certain period of time.
Next, the follower becomes a candidate and increments his term number.
He now sends RequestVotes to all other participants in the cluster, resulting in the following three options.

  • The candidate receives the majority of votes and becomes leader.
  • If another candidate receives an AppendEntries message, he must check whether the received term number is greater than his own. If the own term number is greater, the server remains in candidate state and the AppendEntries message is rejected. If the own term number is smaller, the server switches back to the follower state.
  • Several servers became candidates at the same time and the vote did not give a clear majority decision. In this case a new election starts and one of the candidates times out [6].
Log replication

Client requests can initially be regarded as write-only. Each request consists of a command, which is ideally executed by the replicated state machine of all servers. A leader who receives a new client request adds it to his log in the form of a new entry. Each log entry contains a client-specific command, an index to identify the position in the log, and the term number to maintain a logical order.
To ensure consistency, a new log entry must be replicated in all followers.
The leader sends the AppendEntries message to all servers until all followers have replicated the entry securely. If all followers have replicated the entry, this entry can be considered committed together with all previous entries.
The leader stores the highest index of committed logs. This index is sent to the followers with every AppendEntries message, so they can check if their state machine is still in the correct order.
So if two different logs have the same index and the same term number, they store the same command and all previous log entries are identical.
If a follower does not find a suitable position for the log entry when receiving an AppendEntries message based on the index and the term number, it rejects that message.
By this mechanism the leader is certain that after a successful response of the AppendEntries request the log of a followers is identical to his own log.

In Raft, inconsistencies are resolved by overwriting the follower logs. First, the leader tries to find the last index matching the followers log. If found, the follower deletes all newer entries and adds new ones [6].

Safety

Raft ensures the leader of a term has all committed entries of all previous terms in his log. This is necessary in order for all logs to be consistent and for the state machines to execute the same commands.
During a Leader election, the RequestVote message contains information about the candidate's log. When a voter's log is more up to date than the candidate's log sending the RequestVote message, it does not vote for that candidate.
Choosing which log is more up to date is based on the term number and the length of the log. The higher the term number and the longer the log, the more up-to-date it is [6].

ZooKeeper Atomic Broadcast protocol (ZAB)

Like Raft, the ZooKeeper Atomic Broadcast protocol (ZAB) achieves high availability by distributing data across multiple nodes. This allows clients to read data from any node. Client writes are also forwarded to the leader. An important design criterion is the incremental assignment of each state change to the previous state. This results in an implicit dependency of the order of the states. Besides the guarantee of replication in order, ZAB defines procedures for Leader election and recovery of faulty nodes.

A term in Raft is defined in ZAB as an epoch of a leader. An epoch is also identified by a number generated by the leader.
The epoch number must also be larger than all previous epoch numbers.
A transaction represents a state change the leader propagates to his followers.
Furthermore, analog to the index in Raft, a sequence number is generated by the leader, which starts at the value 0 and increments.
The epoch number and the sequence number are important to ensure the order of the state changes.
Analogous to the replication log in Raft, each follower in ZAB has a history queue. In this queue all incoming transactions are committed in received order.

In order for ZAB to be executed correctly, the following prerequisites must be met.

  • Replication guarantees reliable delivery, total and causal order.
    • If a transaction is committed by a server, it may be committed to all servers.
    • If a transaction A is committed by a server prior to a transaction B, all servers must commit transaction A prior to transaction B. If a transaction A is committed by a server prior to a transaction B, all servers must commit transaction A prior to transaction B.
    • If a transaction A is committed by a server and another transaction B is sent, transaction A must be placed before transaction B. If a transaction A is committed by a server and another transaction B is sent, transaction A must be put before transaction B. If transaction B is committed by a server, transaction A must be entered before transaction B. If transaction B is committed by a server and another transaction B is sent, transaction A must be put before transaction B. When a transaction C is then sent, C must be put after B.
  • Transactions are replicated as long as the majority of nodes are available.
  • If a node fails and restarts, it must catch up on the missed transactions.

The general procedure outlined below is depicted in Figure 11.

When a leader receives a change update from a client, it generates a transaction with a sequence number and the epoch number. Afterwards, it sends this transaction to its followers. A follower adds this transaction to its history queue and confirms this to the leader via ACK. If a leader has received an ACK from the majority of the nodes, it sends a COMMIT for this transaction. A follower who accepts a COMMIT commits this transaction unless the sequence number received is greater than the sequence numbers in its history queue. This causes the follower to wait for COMMITs from previous transactions before committing.

Figure 11: Procedure ZooKeeper
Figure 11: Procedure ZooKeeper [9]

If the leader crashes, the nodes run a recovery protocol. Both to agree a common consistent state before resuming regular operation and to establish a new leader for transferring state changes.

Since nodes can fail and be restored, multiple leaders can emerge over time, allowing the same nodes to perform a node role more than once.
The life cycle of a node is defined in the following four phases. Each node performs an iteration of the protocol. At any time, a node can cancel the current iteration and start a new one by transitioning to phase 0.
phases 1 and 2 are especially important in terms of coordination for a consistent state for recovery after a failure [7].

0. Leader election phase

Nodes are initialized in this phase. No particular Leader election protocol must be used. As long as the leader election protocol is terminated and chooses a node which is available and the majority of the nodes have voted for it most likely. After termination of the Leader election, a node stores its vote to local volatile memory. When a node n voted for node n0, then n0 is called the prospective leader for n. Only at the beginning of phase 3 a prospective leader becomes an established leader, when it will also be the primary process [8].

1. Discovery phase

In this phase the followers communicate with their prospective leader so the leader collects information about the last transactions his followers have accepted. The intent of this phase is to determine the most recent sequence of accepted transactions between the majority of nodes. In addition, a new epoch is being established so previous leaders cannot make new proposals. Since the majority of followers have accepted all changes from the previous leader, there is at least one follower having all changes accepted from the previous leader in its history queue, meaning the new leader will have them too.

2. Synchronization phase

The synchronization phase completes the recovery part of the protocol and synchronizes the replicas in the cluster using the updated history of the leader from the discovery phase. The leader proposes transactions from its history to its followers. The followers recognize the proposals when their own history is behind the leader's history. If the leader receives approval from the majority of the nodes, it sends a commit message to them. At this point, the leader is established and no longer just a perspective leader.

3. Broadcast phase

If no crashes occur, the nodes remain in this phase indefinitely and send transactions as soon as a ZooKeeper client sends a write request.

To detect errors, ZAB employs periodic heartbeat messages between followers and their leaders. If a leader does not receive heartbeats from the majority of his followers within a certain timeout, he resigns leadership and switches to Leader election in phase 0. A follower also switches to phase 0 if it does not receive heartbeats from its leader within a timeout [7].

Byzantine Algorithms

A Byzantine fault-tolerant protocol should be able to achieve a common objective even in the event of node malicious behavior.
The paper "Byzantine General's Problem" by Leslie Lamport, Robert Shostak and Marshall Pease provided the first proof for the solution of the Byzantine General's problem: It revealed that a system with x Byzantine nodes must have at least 3x + 1 total node to reach consensus.
Byzantine nodes are the cause. If x nodes are faulty, then the system must function correctly after reconciliation with n – x nodes, since x nodes can be byzantine.
In order for the number of non-faulty nodes to exceed the number of faulty nodes, at least n – x – x – x > x is required. Therefore n > 3x + 1 is optimal.
However, the algorithm from the paper "Byzantine General's Problem" only works in synchronous environments.
Byzantine algorithms such as DLS and PBFT prodived for asynchronous environments are significantly more complex [4].

DLS Algorithm

The DLS algorithm was presented by Dwork, Lynch and Stockmeyer in the paper "Consensus in the Presence of Partial Synchrony" as a significant advancement of Byzantine algorithms.
It defines models to reach a consensus in a partially synchronous system. Partially synchronous lies between a synchronous and an asynchronous system, which is defined by the following two statements.

  • There are fixed timeouts until messages are delivered, though these are unknown in advance. The aim is to reach consensus independently of the actual timeouts.
  • The timeouts are known, yet they only apply at an unknown time. The goal is to design a system capable of achieving consensus regardless of when this time occurs.

The process of the DLS algorithm is based on rounds divided into "drying" and "lock-release" phases.

  1. Each round has a proposer and starts with each node giving a value it considers correct.
  2. The proposer suggests a value if at least n -x nodes have given this value.
  3. When a node receives the proposed value from the proposer, the proposer must lure and broadcast this information on the network.
  4. If the proposer learns from x + 1 nodes that they have lured the value, then they commiteted the value as the final value.

For the first time, DLS introduced the terms safety and liveness, which are equivalent to agreement and termination.

In addition to the DLS algorithm, there is also the Practical Byzantine Fault-Tolerance (PBFT) algorithm, available for use in asynchronous environments. However, due to the limited scope of this blog post, it is not discussed here [4].

Approach 2: Non-Deterministic

In addition to the option of using synchrony assumption to bypass the FLP impossibility, non-deterministic algorithms can also be used to achieve this.

Nakamoto Consensus (Proof of Work)

In traditional consensus algorithms, a function f(x) is defined so a proposer and a set of acceptors must all coordinate and communicate to decide on the next value.
Therefore, these algorithms often scale poorly, since each node must be aware of and communicate with every other node in the network. Using the Nakamoto Consensus, nodes do not agree on a value, but f(x) works in such a way that all nodes agree on the probability that the value is correct.
Instead of electing a leader and coordinating with all nodes, a consensus is reached on which node can solve a calculation puzzle the fastest. Nakamoto Consensus assumes that the nodes will use computational effort for the chance to decide the next block. This proof of work consensus is simpler than previous consensus algorithms, eliminating the complexity of point-to-point connections, leader choices, and square communication effort.
However, the Nakamoto Consensus requires time and energy to write a new block to solve the calculation puzzle [4].

Proof of Stake and Proof of Authority

In addition to reaching consensus on resources and mining power via PoW, the mechanisms Proof of Stake (PoS) and Proof of Authority (PoA) proceed differently.
The PoS mechanism operates with an algorithm that selects participants with the highest stakes as validators, assuming the highest stakeholders receive incentives to ensure a transaction is processed. Meanwhile, PoA uses identity as the only verification of the authority to be validated, so there is no need to use mining [10].

Cluster manager

Now, knowing the consensus protocols, a look with regard to the use of consensus algorithms in existing cluster managers is possible.

Cluster Manager Distributed key value store Consensus Protocol
Google Borg Chubby Paxos
Kubernetes/Openshift Etcd Raft
Nomad Raft
Docker Swarm Raft
Apache Marathon based on Mesos Apache Aurora ZooKeeper
Kontena Ectd Raft
Amazon Elastic Container Service (ECS) Consul Raft
Table 1: Overview of consensus protocols in cluster managers

As Table 1 illustrates, the tendency towards Raft is strong [11-14].
Surely this is due to the good comprehensibility and the many implementation requirements compared to Paxos. Although Paxos is used by Google, but rather historically in Borg. ZooKeeper is very similar to Raft and therefore a reasonable choice for Apache Marathon.

Cluster management without a consensus protocol

In contrast to a cluster management with consensus algorithm, the paper "Subordination: Cluster management without distributed consensus" proposes a cluster management without consensus algorithm.
In this paper cluster management is achieved by subdividing the nodes in the cluster via subordination. The main goal is to distribute the workload over a large number of nodes, similar to a load balancer. Cluster management without distrubuted consensus relies on the following conditions.

  • Few frequented network configuration changes
  • It is not intended for managing updates of a distributed database.
  • The node performance and node latency must remain stable.
  • A constant network traffic is assumed [15].

Node Mapping

A fundamental idea of this variant of cluster management is node mapping or the evaluation of nodes.
In general, node mapping is defined as a function that maps a node to a number. This allows nodes to be compared with each other.
Due to Amdahl's law: The higher the link performance, the higher the speedup.
Instead of including the node performance and latency separately in the mapping, the node performance and latency can be correlated. This means that only the ratio is included in the mapping [15].

Subordination Tree

In the subordination tree, each node is uniquely identified by a level and an offset. A distance can be calculated based on this.
To select a leader, a node evaluates all nodes in the network based on the mapping and calculates the distance. The node with the smallest distance and ranking is selected.
Level and Offset is only used for linear topologies (at a switch).
For non-linear topologies, latency is used for mapping.
Instead of maximizing performance, the main goal of the algorithm is to minimize network traffic per time unit if the leader is chosen and the number of nodes is unknown.
Due to the condition of low traffic changes to the network configuration, the algorithm could be initially executed and persisted when the cluster is installed.
The paper includes a performance and subordination tree test.
In the performance test, a full network scan was performed as a basis for comparison and a leader was determined.
The IP mapping was then performed by using the node mapping algorithm and a leader was determined. Figure 12 displays the result [15].

Figure 12: Performance test
Figure 12: Performance test [15]
Using node mapping algorithm, the subordination tree is built up about 200% faster.

The subordination tree test examined whether the resulting trees reached a stable state. For 100-500 nodes the structure of the subordination tree was repeated in order to obtain meaningful results. After 30 seconds the test was aborted to define a temporal upper limit. The results reveal that a stable state was achieved well below 30 seconds.

Discussion

In conclusion, consensus algorithms are a central aspect of cluster management and distributed systems. Older protocols like Paxos have revealed their strengths and weaknesses over time. The experience gained has been incorporated into newer protocols such as Raft and ZooKeeper to improve understanding and robustness.
Byzantine consensus algorithms represent a particularly exciting application. Especially in times of increasing Internet crime and organized hacking, manipulated behavior of individual nodes in the public Internet must be assumed.
New technologies such as blockchain offer a new perspective on matters. For instance, a key-value store could possibly be developed using Proof of Stake or Proof of Authority as a consensus algorithm. This key-value store could be used in a public kubernetes cluster instead of Etcd. This would provide additional protection against Byzantine nodes. On the other hand, there are also incentives against cluster management with a consensus algorithm. The paper "Subordination: Cluster management without distributed consensus" shows a way to manage a cluster without a consensus algorithm.
However, this approach has many implicit conditions which cannot be guaranteed in a real distributed system. Another approach supporting this hypothesis is K3s, which shows that cluster management on a small scale is also possible without a consensus algorithm. K3s uses SQLite3 as database. However, it is possible to use Etcd3. The field of application is IoT, where often fixed IPs are assigned and therefore hardly any changes take place.
Ultimately, there is no generic solution. Currently a tendency to Raft in controlled data centers seems to exist. Nevertheless, depending on the application, a decision on whether to use cluster management with simple consensus, possibly Byzantine consensus or without consensus remains.

References

  1. Survey Shows Kubernetes Leading As Orchestration Platform
    https://www.cncf.io/blog/2017/06/28/survey-shows-kubernetes-leading-orchestration-platform/
  2. The Techglider
    Kartik Singhal – https://techglider.github.io/review/time-clocks-and-ordering-of-events-in-a-distributed-system/
  3. A Cursory Introduction To Byzantine Fault Tolerance and Alternative Consensus
    Alexandra Tran-Alexandra Tran – https://medium.com/@alexandratran/a-cursory-introduction-to-byzantine-fault-tolerance-and-alternative-consensus-1155a5594f18
  4. Let's Take a Crack At Understanding Distributed Consensus
    Preethi Kasireddy-Preethi Kasireddy – https://medium.com/s/story/lets-take-a-crack-at-understanding-distributed-consensus-dad23d0dc95
  5. Consensus Protocols: Two-phase Commit
    Henry Robinson – https://www.the-paper-trail.org/post/2008-11-27-consensus-protocols-two-phase-commit/
  6. Understanding the Raft Consensus Algorithm: an Academic Article Summary
    Shubheksha – https://medium.freecodecamp.org/in-search-of-an-understandable-consensus-algorithm-a-summary-4bc294c97e0d
  7. Architecture Of Zab – Zookeeper Atomic Broadcast Protocol
    Guy Moshkowich – https://distributedalgorithm.wordpress.com/2015/06/20/architecture-of-zab-zookeeper-atomic-broadcast-protocol/
  8. ZooKeeper’s atomic broadcast protocol: Theory and practice, Andr´e Medeiros March 20, 2012
    http://www.tcs.hut.fi/Studies/T-79.5001/reports/2012-deSouzaMedeiros.pdf
  9. A simple totally ordered broadcast protocol, Benjamin Reed, Flavio P. Junqueira
    http://diyhpl.us/~bryan/papers2/distributed/distributed-systems/zab.totally-ordered-broadcast-protocol.2008.pdf
  10. Proof Of Authority: Consensus Model with Identity At Stake.
    POA Network- POA Network – https://medium.com/poa-network/proof-of-authority-consensus-model-with-identity-at-stake-d5bd15463256
  11. Consensus Protocol
    https://www.nomadproject.io/docs/internals/consensus.html
  12. Raft Consensus in Swarm Mode
    https://docs.docker.com/engine/swarm/raft/
  13. The Mesos Replicated Log
    https://mesos.apache.org/documentation/latest/replicated-log-internals/
  14. The Developer Friendly Container & Microservices Platform
    Kontena, Inc – https://www.kontena.io/
  15. Gankevich, Ivan & Tipikin, Yury & Gaiduchok, Vladimir. (2015). Subordination: Cluster management without distributed consensus. 639-642. 10.1109/HPCSim.2015.7237106.

Large Scale Deployment for Deep Learning Models with TensorFlow Serving

Image source

Introduction

“How do you turn a trained model into a product, that will bring value to your enterprise?”

In recent years, serving has become a hot topic in machine learning. With the ongoing success of deep neural networks, there is a growing demand for solutions that address the increasing complexity of inference at scale. This article will explore some of the challenges of serving machine learning models in production. After a brief overview of existing solutions, it will take a closer look at Google’s TensorFlow-Serving system and investigate its capabilities. Note: Even though they may be closely related, this article will not deal with the training aspect of machine learning, only inference.

Inference and Serving

Before diving in, it is important to differentiate between the training and inference phases, because they have completely different requirements.

  • Training is extremely compute-intensive. The goal here is to maximize the number of compute operations in a given time. Latency is not of concern.
  • Inference costs only a fraction of the computing power that training does. However, it should be fast. When you query the model, you want the answer immediately. Inference must be optimized for latency and throughput.

There are two ways to deploy a model for inference. Which one to use largely depends on the use case. First, you can push the entire model to client devices and have them do inference there. Lots of ML features are already baked into our mobile devices this way. T­his works well for some applications e.g. for Face-ID or activity-detection on phones, but falls flat for many other, large-scale industrial applications. You probably won’t have latency problems, but you are limited to the client’s compute power and local information. On the other hand, you can serve the model yourself. This would be suitable for industrial-scale applications, such as recommender systems, fraud detection schemes, intelligent intrusion detection systems and so forth. Serving allows for much larger models, direct integration into your own systems and the direct control and insights that come with it.

Serving Machine Learning at Scale

Of course, it’s never that easy. In most “real-world” scenarios, there isn’t really such a thing as a “finished ML model”. Consider the “Cross-industry standard process for data mining”:

Fig. 1 – Back to the basics: the six phases of CRISP-DM. Source

It might be ancient, but it describes a key concept for successful data mining/machine learning: It is a continuous process [12]. Deployment is part of this process, which means: You will replace your productive models, and you will do it a lot! This will happen for a number of reasons:

  • Data freshness: The ML model is trained on historical data. This data can go stale quickly, because new patterns constantly appear in the real world. Model performance will deteriorate, and you must replace the model with one that was trained on more recent data, before performance drops too low.
  • Model revision: With time, retraining the model might just not be enough to keep the performance up. At this point you need to revise the model architecture itself, perhaps even start from scratch.
  • Experiments: Perhaps you want to try another approach to a problem. For that reason you want to load a temporary, new model, but not discontinue your current one.
  • Rollbacks: Something went wrong, and you need to revert to a previous version.

Version control and lifecycle management aren’t exactly new ideas. However, here they come with a caveat: Since artificial neural networks are essentially “clunky, massive databases” [5], loading and unloading them can have an impact on performance. For reference, some of the most impactful deep models of recent years are a few hundreds of megabytes in size (AlexNet: 240MB, VGG-19: 574MB, ResNet-200: 519MB). But as model performance tends to scale with depth, model size can easily go to multiple gigabytes. That might not be much in terms of “Big Data”, but it’s still capable of causing ugly latency spikes when implemented poorly. Besides ML performance metrics, the primary concerns are latency and throughput. Thus, the serving solution should be able to [4]:

  • quickly replace a loaded model with another,
  • have multiple models loaded at the same time, in the same process,
  • cope with differences in model size and computational complexity,
  • avoid latency spikes when new models are loaded into RAM,
  • if possible, be optimized for GPUs and TPUs to accelerate inference,
  • scale out inference horizontally, depending on demand.

Serving Before “Model Servers”

Until some three years ago, you basically had to build your ML serving solution yourself. A popular approach was (and still is) using Flask or some other framework to serve requests against the model, some WSGI server to handle multiple requests at once and have it all behind some low-footprint web-server like Nginx.

Fig. 2 – An exemplary serving solution architecture. Source: [8]

However, while initially simple, these solutions are not meant to perform at “ultra large” scale on their own. They have difficulty benefiting from hardware acceleration and can become complex fast. If you needed scale, you had to create your own solution, like Facebook’s “FBLearnerPredictor” or Uber’s “Michelangelo”. Within Google, initially simple solutions would often evolve into sophisticated, complex pieces of software, that scaled but couldn’t be repurposed elsewhere [1].

The Rise of Model Servers

Recent years have seen the creation of various different model serving systems, “model servers”, for general machine learning purposes. They take inspiration from the design principles of web application servers and interface through standard web APIs (REST/RPC), while hiding most of their complexity. Among simpler deployment and customization, model servers also offer machine learning-specific optimizations, e.g. support for Nvidia GPUs or Google TPUs. Most model servers have some degree of interoperability with other machine learning platforms, especially the more popular ones. That said, you may still restrict your options, depending on your choice of platform.

Fig. 3 – Exemplary model server for an image recognition task. Source: [10]

A selection of popular model serving and inference solutions includes:

  • TensorFlow Serving (Google)
  • TensorRT (Nvidia)
  • Model Server for Apache MXNet (Amazon)
  • Clipper
  • MLflow
  • DeepDetect
  • Skymind Intelligence Layer for Deeplearning4j

TensorFlow-Serving

By far the most battle-tested model serving system out there is Google’s own TensorFlow-Serving. It is used in Google’s internal model hosting service TFS², as part of their TFX general purpose machine learning platform [2]. It drives services from the Google PlayStore’s recommender system to Google’s own, fully hosted “Cloud Machine Learning Engine”. TensorFlow-Serving natively uses gRPC, but it also supports RESTful APIs. The software can be downloaded as a binary, Docker image or as a C++ library.

Architecture

The core of TensorFlow-Serving is made up of four elements: Servables, Loaders, Sources and Managers. The central element in TensorFlow Serving is the servable [3]. This is where your ML model lives. Servables are objects, that TensorFlow-Serving uses for inference. For example, one servable could correspond to one version of your model. Servables can be simplistic or complicated, anything from lookup-tables to multi-gigabyte deep neural networks. The lifecycles of servables are managed by loaders, which are responsible for loading servables to RAM and unloading them again. Sources provide the file system, where saved models are stored. They also provide a list of the specific servables, that should be loaded and used in production, the aspired versions. Managers are the broadest class. Their job is to handle the full life cycle of servables, i.e. loading, serving and unloading the aspired versions. They try to fulfill the requests from sources with respect the specified version policy.

Fig. 4 – TensorFlow-Serving architecture overview. Source: [3]

When a servable is elevated to an aspired version, its source creates a loader object for it. This object only contains metadata at first, not the complete (potentially large) servable. The manager listens for calls from loaders, that inform it of new aspired versions. According to its version policy, the manager then executes the requested actions, such as loading the aspired version and unloading the previous one. Loading a servable can be temporarily blocked if resources are not available yet. Unloading a servable can be postponed while there are still active requests to it. Finally, clients interface with the TensorFlow-Serving core through the manager. Both requests and responses are JSON objects.

Simple Serving Example

Getting started with a minimal setup is as simple as pulling the tensorflow/serving Docker image and pointing it at the saved model file [16]. Here I’m using a version of ResNet v2, a deep CNN for image recognition, that has been pretrained on the ImageNet dataset. The image below is encoded in Base64 and sent to the manager as a JSON object.

Fig. 5 – Some random image to predict. Source

The prediction output of this model consists of the estimated probabilities for each of the 1000 classes in the ImageNet dataset, and the index of the most likely class.

Fig. 6 – Model output, class 771 corresponds to “running_shoe”.

Performance

Implementing and hosting a multi-model serving solution for an industrial-scale web application with millions of users, just for benchmarks, is slightly out of scope for now. However, Google provides some numbers that should give an idea of what you can expect TensorFlow-Serving to do for you.

Latency

A strong point of TensorFlow-Serving is multi-tenancy, i.e. serving multiple models in the same process concurrently. The key problem with this is avoiding cross-model interference, i.e. one model’s performance characteristics affecting those of another. This is especially challenging while models are being loaded to RAM. Google’s solution is to provide a separate thread-pool for model-loading. They report that even under heavy load, while constantly switching between models, the 99th percentile inference request latency stayed in the range from ~75 to ~150 milliseconds in their own TFX benchmarks [2].

Throughput

Google claims that the serving system on its own can handle around 100,000 requests per second per core on a 16 vCPU Intel Xeon E5 2.6 GHz machine [1]. That is however ignoring API overhead and model complexity, which may significantly impact throughput. To accelerate inference on large models with GPUs or TPUs, requests can be batched together and processed jointly. They do not disclose whether this affects request latency. Since late February (TensorFlow-Serving v1.13), TensorFlow-Serving can now work directly in conjunction with TensorRT [14], Nvidia’s high-performance deep learning inference platform, which claims a 40x increase in throughput compared to CPU-only methods [15].

Usage and Adoption

In their paper on TFX (TensorFlow Extended), Google presents their own machine learning platform, that many of its services use [2]. TFX’ serving component, TFS², uses TensorFlow-Serving. As of November 2017, TensorFlow-Serving is handling tens of millions of inferences per second for over 1100 of Google’s own projects [13]. One of the first deployments of TFX is the recommender system for Google Play, which has millions of apps and over a billion active users (over two billion if you count devices). Furthermore, TensorFlow-Serving is also used by companies like IBM, SAP and Cloudera in their respective multi-purpose machine learning and database platforms [2].

Limitations

Today’s machine learning applications are very much capable of smashing all practical limits: DeepMind’s AlphaGo required 1920 CPUs and 280 GPUs running concurrently in real-time, for inference, for a single “client” [6]. That example might be excessive, but the power of deep ML models does scale with their size and compute complexity. Deep learning models today can become so large that they don’t fit on a single server node anymore (Google claims that they can already serve models up to a size of one terabyte in production, using a technique called model sharding [13]). Sometimes it is worth investing the extra compute power, sometimes you just need to squeeze that extra 0.1 percent accuracy out of your model, but often there are diminishing returns,. To wrap it up, there may be a trade-off between the power of your model versus latency, throughput and runtime cost.

Conclusion

When you serve ML models, your return on investment is largely determined by two factors: How easily you can scale out inference and how fast you can adapt your model to change. Model servers like TensorFlow-Serving address the lifecycle of machine learning models, without making the process disruptive in a productive environment. A good serving solution can reduce both runtime and implementation costs by a significant margin. While building a productive machine learning system at scale has to integrate a myriad different steps from data preparation to training, validation and testing, a scalable serving solution is the key to making it economically viable.

References and Further Reading

  1. Olston, C., Fiedel, N., Gorovoy, K., Harmsen, J., Lao, L., Li, F., Rajashekhar, V., Ramesh, S., and Soyke, J. (2017). Tensorflow-serving: Flexible, high-performance ML serving.CoRR, abs/1712.06139
  2. Baylor, D., Breck, E., Cheng, H.-T., Fiedel, N., Foo, C. Y., Haque, Z., Haykal, S., Ispir, M., Jain, V., Koc, L., Koo, C. Y., Lew, L., Mewald, C., Modi, A. N., Polyzotis, N., Ramesh, S., Roy, S., Whang, S. E., Wicke, M., Wilkiewicz, J., Zhang, X., and Zinkevich, M. (2017). Tfx: A tensorflow-based production-scale machine learning platform. In Proceedings of the 23rd ACM SIGKDD International Conference on Knowledge Discovery and Data Mining, KDD ’17, pages 1387–1395, New York, NY, USA. ACM.
  3. TensorFlow-Serving documentation – https://www.tensorflow.org/tfx/guide/serving (accessed 11.03.2019)
  4. Serving Models in Production with TensorFlow Serving (TensorFlow Dev Summit 2017) – https://www.youtube.com/watch?v=q_IkJcPyNl0 (accessed 11.03.2019)
  5. Difference Inference vs. Training – https://blogs.nvidia.com/blog/2016/08/22/difference-deep-learning-training-inference-ai/ (accessed 11.03.2019)
  6. Challenges of ML Deployment – https://www.youtube.com/watch?v=JKxIiSfWtjI (accessed 11.03.2019)
  7. Lessons Learned from ML deployment – https://www.youtube.com/watch?v=-UYyyeYJAoQ (accessed 11.03.2019)
  8. https://hackernoon.com/a-guide-to-scaling-machine-learning-models-in-production-aa8831163846 (accessed 11.03.2019)
  9. https://medium.com/@maheshkkumar/a-guide-to-deploying-machine-deep-learning-model-s-in-production-e497fd4b734a (accessed 11.03.2019)
  10. https://medium.com/@vikati/the-rise-of-the-model-servers-9395522b6c58 (accessed 11.03.2019)
  11. https://blog.algorithmia.com/deploying-deep-learning-cloud-services/ (accessed 11.03.2019)
  12. https://the-modeling-agency.com/crisp-dm.pdf (accessed 11.03.2019)
  13. https://ai.googleblog.com/2017/11/latest-innovations-in-tensorflow-serving.html (accessed 12.03.2019)
  14. https://developer.nvidia.com/tensorrt (accessed 12.03.2019)
  15. https://medium.com/tensorflow/optimizing-tensorflow-serving-performance-with-nvidia-tensorrt-6d8a2347869a (accessed 12.03.2019)
  16. https://medium.com/tensorflow/serving-ml-quickly-with-tensorflow-serving-and-docker-7df7094aa008 (accessed 12.03.2019)
  17. https://www.slideshare.net/shunyaueta/tfx-a-tensor-flowbased-productionscale-machine-learning-platform (accessed 12.03.2019)

The Renaissance of column stores

While attending the lecture ‘Ultra Large Scale Systems’ I got introduced into the quite intriguing topic of high-performance data storage systems. One subject which caught my special attention were column-oriented database management systems (column stores) about which I decided to give a presentation. Being quite lengthy and intricate, I realized that the presentation left my colleagues more baffled than informed. So I decided to write a blog post to recapitulate the topic for all those who were left with unanswered questions that day and for all the rest out there who might be interested in such matters. I believe this article, even though depicting a quite technical and specialized topic, is nevertheless of general interest because it shows how a system can be optimized for performance by emphasizing on inherent design characteristics.

Introduction

So what are column stores and what do we need them for?

This may be the most eminent question that crosses the mind of people who hear the term ‘column stores’ for the first time. Well let me tell you what they aren’t, a euphonic buzzword which, once uttered, will capture the attention of every IT geek in close vicinity. However, a rather matured technology that has been around since the early 70s and which has been going through constant architectonic refinements that allowed it to establish a foothold on the field of data storage systems used for large scale systems or big data management [1, 2]. Nevertheless, because of their quite specific area of application, column stores still cover a rather opaque field of technical innovation. This article, therefore, tries to provide a brief overview of the subject by giving insights into the architecture, design concepts and current technical advancements concerning column stores.

Most modern database management systems (DBMSs) rely on the  N-ary Storage Model (NSM). Here records are contiguously stored starting from the beginning of each disk page while using an offset table at the end of the page to position the start of each tuple (record). Thus, within each page the tuples are stored in sequence until the maximum page length of the storage system has been reached and a new page has to be created (figure 1). Database systems centered on this model show good access times when executing queries that either insert or modify single tuples or that result in a projection of a limited number of complete tuples [3]. The major drawback of this model, however, is its poor cache performance because it often burdens the cache with unnecessary attributes [4]. In contrast, column stores follow an entirely different concept called Decomposition Storage Model (DSM) where tables are vertically fragmented storing each attribute in a separate column (figure 1). The different attribute values for each tuple can then be reassembled by correlating their absolute position within each page. Another approach is to use binary relations based on an artificial key (surrogate) that allows reconnecting the different attributes to generate a partial or complete reconstruction of the initial tuple [5]. The performance advantage of this model can be seen when executing queries that require operations on entire columns. Those include aggregation queries (where only subsets of the entire data are required) or scan operations. Furthermore, since the data composition of each column is very homogeneous with little entropy, much better compression ratios can be reached [6]. This becomes even more accentuated when increasingly larger datasets have to be processed.

Figure 1: Row-based vs. column-based storage models

Diving into the internals

Trying to determine whether to use a row-oriented or a column-oriented storage system will inevitably result in pondering about the pros and cons of the above-mentioned architectures. It is clear that both systems have their strengths and weaknesses and the choice, like so many times, entirely depends on the problem to be solved. This may be elaborated by taking a closer look into the subject using an example. Let’s take the table depicted in figure 1 and imagine a row-oriented database system stored information about company employees in a similar manner. In that case, queries resulting in key lookups and extraction of single but complete records of employees would be executed with high performance by the system. This could be, for example, the search for a record of a specific employee by its ‘ID’ or ‘Name’. This process could even be improved by putting indexes on high cardinality columns like the ‘ID’, for example, which would further speed up the search. Thus, an application or service operating on the database soliciting requests of that classification would definitely benefit from the advantages provided by a row-oriented storage system.

However, what about a request to determine the average age of all male employees stored in the database. This kind of analytical query could in the worst case result in a complete scan of the entire table and would generate a completely different strain on the system [7]. Even though it could be mitigated by the use of composite indexes which, however, are only feasible when the table contains a small number of columns. Latter on the other hand is not the case in many big data storage systems where rather hundreds of columns per table are the norm. Working with composite indexes here will sooner or later produce an immense processing overhead which in the long run would consume substantial system resources [8]. This ultimately means, that for systems containing tables with sizes in the range of several hundred gigabytes, many analytical queries could potentially initiate sequential scans of the entire dataset. For this scenario, column stores represent a better option because the query execution would, by design, be limited to only those attributes required for the final projection. This would spare computation resources by avoiding the necessity to scan large amounts of irrelevant data and, as a result, lead to overall better performance of the system. Consequently, the right choice of the database system should, therefore, be guided by the demands posed by the services operating on it, because they ultimately define the predominant query structure processed by the system.

Query processing models

Figure 2: Row-scanner implementation

To understand the subjects in the sections that follow, some principle design aspects on how row and column stores execute queries have to be elaborated. Thus, when comparing the implementations of query execution between the two systems, fundamental differences become obvious. Capitalizing on the previous example, let’s observe the execution of the following simple SQL query “SELECT Name, Profession FROM Employees WHERE Age > 30”. The expected result from the query would be a list of the names and professions of all employees being older than 30 years. The request leads to low-level database operations where scanning processes on the corresponding table will be performed to gather the necessary datasets. In the center of every query execution are scanners that apply predicates on tuples, generate projections and provide their parent operators with the corresponding output data.

In the case of the row-scanner implementation (figure 2), the execution process is quite straightforward. Here the data is fetched from the storage layer in the form of record batches on which the scanner will perform filter operations. The data will then be forwarded to the parent operator which aggregates the data to assemble the final projection [9]. Now in case of the column store, the process looks considerably different. As illustrated in figure 3, operations are executed based on single columns rather than entire tables. Here the initial scan operation is performed on a single column containing the data on which the predicate has to be applied. Hence, in the first step, values are filtered by submitting them to predicate evaluation, leaving only a subset of the original dataset. However, instead of returning the values directly, only a list of their corresponding column positions is returned. In the steps that follow, the positions are correlated with the columns containing the requested attributes to extract the corresponding values which are then aggregated to assemble the projection [9]. Thus, the example already indicates why analytical queries may perform better on column stores than on row stores. Instead of scanning the entire table to generate an output consisting of only a small subset of all attributes of the extracted records, operations are limited to that subset of attributes from the beginning, resulting in a significant reduction of the operational overhead.

Figure 3: Column-scanner implementation

Bound to be optimized

Simply storing data in the form of columns will not bring the improvements that can be expected from column stores. Actually, with few exceptions, they usually get outperformed by row stores in most scenarios. Consequently, a number of optimization techniques have been adopted over the past years yielding significant performance enhancements. This allowed column stores to be successfully utilized in areas where large datasets have to be handled like, for example, data warehousing,  data mining or data analytics [4]. Therefore, the following section will give a brief overview of a selected number of optimization techniques which have been integrated into many modern column store systems today.

Compression

Given the characteristics of columnar data, using compression on such structures seems to be the most obvious approach to reduce disc space usage. Indeed, values from the same column tend to fall into the same domain and, therefore, display low information entropy and more value locality [10]. Those qualities allow compressing one column at a time while even permitting different compression algorithms for individual columns. In addition, if values are sorted within a column, which is common for column store systems, that column will become remarkably compressible  [6]. Another technique is ‘frequency partitioning’ where a column is reorganized in such a way, that each page of the column shows as low information entropy as possible. To accomplish this, certain column stores reorganize columns based on the frequency of values that appear in the column and allow, for example, frequent values to be stored together on the same page [11]. The improvements of such methods are apparent and investigations suggest that while row stores allow average compression ratios of 1:3, column stores usually achieve ratios of 1:10 or better. Finally, in addition to lowering disk space usage, compression also helps to improve query performance. If data is compressed, then less time is spent in I/O operations during query execution because of reduced seek times, increased buffer hit rates and less transfer time of data from disk into memory and from there to the CPU [6, 10].

Dictionary Encoding

This form of compression works fine on data sets composed of a small number of very frequent values. For each value appearing in a column, an entry is created in a dictionary table. The values in the column are then represented by integer values referencing the positions in this table. Furthermore, dictionary encoding can not only be applied to single columns but also to entire blocks [12]. Another advantage of dictionary compression is that it allows working with columns of fixed length if the system keeps all codes of the same width. This can further maximize data processing speeds in systems that rely on vectorized query execution.

Figure 4: Examples of ‘Dictionary’ encoding and ‘Run-Length’ encoding

Run-Length Encoding

The encoding is well suited to compress columns containing repeating sequences of the same value by reducing them to a compact singular representation. Here, the column entries are replaced by triple values describing the original value, its initial start position and its frequency (run-length). Hence, when a column starts with 20 consecutive entries of the value ‘male’ than these entries can be reduced to the triple (‘male’, 1, 20). This compression form works especially well on sorted columns or columns with reasonable-sized runs constituted of the same value [10].

Figure 5: Examples of ‘Bit-Vector’ encoding, ‘Differential’ encoding and ‘Frame of Reference’ encoding

Bit-Vector Encoding

In this type of encoding for each unique value in a column, a bit-string with the same length as the column itself is generated. The string contains only binary entries designating a ‘1’ if the value the string is associated with exists at the corresponding position in the column, or a ‘0’ otherwise. ‘Bit-Vector’ encoding is frequently used when columns have a limited number of unique data values. In addition, there is also the possibility to further compress the bit-vector allowing to use the encoding even on columns containing a larger amount of unique values [10].

Differential Encoding and Frame of Reference Encoding

‘Differential’ encoding expresses values as bit-sized offsets from the previous value. A value sequence beginning with ’10, 8, 6, 12′ for example, can be represented as ’10, -2, -2, 6′. The bit-size for the offset value, however, is fixed and cannot be changed once established. Therefore, special escape codes have to be used to indicate values whose offset cannot be represented with the specified bit-size. The encoding performs well on columns containing sequences of increasing or decreasing values, thus, demonstrating value locality. Those can be inverted lists, timestamps, object IDs or sorted numeric columns. As a variation of the concept, there is also the ‘Frame of Reference’ encoding which works in a very similar way. The main difference here is that the offsets do not refer to the direct predecessor but to a reference value within the set. For example, the previous sequence ’10, 8, 6, 12′ would be represented as ’10, -2, -4, 2’ with ’10’ being the reference value [13].

Operations on compressed data

Figure 6: Layout of a compression block

Performance gains through compression can be maximized when operators are able to directly act on compressed values without the need for prior decompression [14]. This can be achieved through the introduction of buffers that consist of column data in a compressed format providing an API which allows query operators to work directly on the compressed values. Consequently, a component wrapping an intermediate representation for compressed data termed a ‘compression block’ is added to the query executor (figure 6). The methods provided by the API can be utilized by query operators to directly access compressed data without having to decompress and iterate through it [6, 10]. The illustration in figure 7 should exemplify the design by showing how a query execution on compressed data would look like. The query is aimed to determine the number of male and female employees who work as accountants. In the first step, a filter operation is performed on the sorted and ‘Run-length’ encoded ‘Profession’ column by calling the corresponding API method which returns the index positions of those values passing the predicate condition (Profession = ‘Accountant’). The positions can then be used to delimit the corresponding region within the ‘Bit-Vector’ encoded ‘Gender’ column. Finally, the number of males and females working as accountants can be calculated using a corresponding API method that sums up all occurrences of the value ‘1’ within the interval. As seen, for none of the operations any decompression of the scanned data was necessary.

Figure 7: Example depicting query operations on compressed data

Vectorized execution

Figure 8: Tuple-at-a-time vs. vectorized execution

Most of the traditional implementation strategies for the query execution layer are based on the ‘iterator’ or ‘tuple-at-a-time’ model where individual tuples are moved from one operator to another through the query plan tree [10]. Each operator normally provides a next() method which outputs a tuple that can be used as input by a caller operator from further up the execution tree. The advantage of this approach is that the materialization of intermediate results is minimal. There is, however, another alternative called ‘vectorized execution’ where in contrast to the ‘tuple-at-a-time’ model,  each operator returns a vector of N tuples instead of only a single tuple (figure 8). This approach offers several advantages [6]:

  1. Reduction in interpretation overhead by limiting the amount of function calls through the query interpreters.
  2. Improved cache locality by being able to adjust the vector size to the CPU cache.
  3. Better profiling by allowing operators to execute all expression evaluation work in a vectorized fashion (i.e. array-at-a-time), keeping the overhead for measurements of individual operations low.
  4. Taking advantage of the columnar format by reading larger data batches of N tuples at a time allowing array iteration with good loop pipelining techniques, so that operations can repeatedly be executed within one function call.

Early vs. late materialization

One fundamental problem when designing the execution plan of queries for column stores is to determine when the projection of columns should occur. In a column store, information of a logical entity or object is distributed over several column pages on the storage medium. As a result, during the execution of most queries, several attributes of a singular entity have to be accessed. In many cases, however, database outputs are expected to be entity-based and not column-based. Consequently, during every execution, the information scattered over multiple columns has to be reassembled at some point, to form ‘rows’ of information about the entity. This joining of tuples is a process very common for column stores and has been coined with the term ‘materialization’ [15]. In this context, there are principally two design concepts that address the problem of column projection called ‘Early Materialization’ and ‘Late Materialization’. During query execution, most naive column stores first select the columns of relevance, construct tuples from their component attributes, and then execute standard row store operations on the resulting rows. This approach of constructing tuples early in the query plan called ‘Early Materialization’ will in many cases result in better performance for analytical queries when compared to those seen in typical row stores, however, much of the potential of column stores will still be left untouched.

Modern column stores adapted the concept of ‘Late Materialization’ where operations are performed on the basis of single columns as long as possible and projection of columns occurs late in the query plan. From this rises the necessity to work with intermediate ‘position lists’ to join operations that have been conducted on individual columns. This lists can be represented as a simple array of bit strings or as a set of ranges of positions. Those position representations can then be intersected to extract the values of interest which then confluent into the final projection [15]. Thus, the concept of ‘Late Materialization’ offers several advantages that result in significant performance boosts which can be attributed to the following characteristics:

  1. Given specific selection and aggregation operations, it is possible to completely skip the materialization of some tuples.
  2. Decompression of data for the reconstruction of tuples can be avoided which allows the continuous operation on compressed data in memory.
  3. Cache performance can be improved while operating directly on column data because irrelevant attributes for given operations can be omitted.
  4. Efficient CPU usage through operations on highly compressible position representations which, given their structure, are well suited for CPU processing.
Figure 9: Analytical query to be executed using late materialization

In the following, a typical query execution using late materialization as implemented in modern column store systems will be described. Here the query consists of a simple SQL-statement aimed to determine the number of all female employees over 30 years of age ordered by professions (figure 9). In this example, the intermediate lists are expressed in the form of bit-vectors representing the positions of those values that passed the predicates and on which bit-wise ‘AND’ operations can be executed.

The query execution illustrated in figure 10 is a select-project operation where essentially two columns of a table (Employees) are filtered, while subsequently a sum aggregation is performed to generate the projection. Thus, in the first step, the two predicates are applied to the ‘Age’ and ‘Gender’ columns which results in two bit-vectors representing only those values which passed the predicates. These are then intersected by applying a bit-wise ‘AND’ operation and the resulting bit-vector then used to extract the corresponding values from the ‘Profession’ column. In the last step, the results are aggregated to assemble the projection by grouping and summing the values from the previous operation.

Figure 10: Execution of query using late materialization

Virtual IDs

A possible way to structure columns within a column store is to associate individual columns with an identifier like, for example, a numeric primary key. Adding an identifier in such an explicit fashion, however, unavoidably introduces redundancy and increases the amount of data to be stored on the disk. To solve this problem, modern database systems try to avoid additional columns containing solely IDs by substituting them with virtual identifiers which represent the position (offset) of the tuple in the column [5]. The design can be further enhanced by implementing columns composed of fixed-width dense arrays. This allows storing attributes of an individual record at the same position across all the columns of a table. In combination with offsets, the design permits to significantly improve the localization of individual records. A value at the i-th position of a table EMP, for example, could be located and accessed by just calculating ‘startOf(EMP)+i*width(EMP)’.

Database cracking

Sorted columns are a helpful measure to significantly improve the performance of column stores, including the realization of high compression ratios. However, common approaches require a complete sorting of columns in advance, demanding idle time and workload knowledge. More dynamic approaches have brought forward architectures aiming to perform such tasks incrementally by combining them with query execution. The principal motivation is to continuously change the physical data store with every executed query. Sorting is consequently done adaptively in a continuous manner and limited to the accessed sections of a column [16]. Therefore, each query performs a partial reorganization of all processed columns making subsequential access faster. This process is called ‘Database cracking’ which allows using the database system immediately once data is available. It is an interesting approach to adaptive indexing because on every range-selection query, the data is reorganized and compartmentalized using the provided predicates as pivots. In that manner, the optimal performance is achieved incrementally without the prior need to analyze the expected workload, tune the system and create indexes. Thus, figure 11 shows an example of a search request where two consecutive queries search and ‘crack’ a column (‘Age’). The first query subdivides the column into three pieces while the second query further improves the partitioning process. The final result is a column that is partially sorted and comprised of five value ranges. Consequently, the structure of the column data represents a reflection of the query structure and thus constitutes an adaption to the data requirements of the applications (services) accessing the database.

Figure 11: Adaptive indexing of column using database cracking

Conclusion

The column store concept, albeit nearly half a century old, has received considerable attention during the last decade. This is due to the fact that column stores exceed when it comes to performing analytical-style processing of large datasets [9]. This has made them the storage system of choice especially for applications operating with OLAP-like workloads which rely on very complex queries often involving complete datasets. Investigations have shown, however, that substantial improvements of the basic concept of column stores are necessary to truly reap the benefits such an architecture may provide. Therefore, several optimizations have been introduced over the past years, some of which have been discussed here. Those include compression, API-based compression buffers, vectorized processing, late materialization, virtual IDs and database cracking. All of which aim to significantly improve the processing time by tackling performance issues from different angles. When adding such optimization techniques column stores outperform row stores by an order of magnitude on analytical workloads [8]. This also indicates that the architectural design will be of interest even in the years to come because of the ever-growing number of large-scale, data-intensive applications with high workload. Those include scientific data management, business intelligence, data warehousing,  data mining, and decision support systems.

Finally, there are also still directions for future developments that are worth investigating like, for example, hybrid systems that are partially column-oriented. Those could be realized in the form of architectures that store columns grouped by access frequency or that adapt to access patterns allowing to switch between column-oriented and row-oriented table structures when needed. Another issue to be addressed are the loading times of column stores, which still do not perform well when compared with row stores, especially if there are many views to materialize. Thus, studies on possible new algorithms that could alleviate the problem by substantially improve read performance would surely be an interesting field for future investigations.

References

[1] S. Melnik, A. Gubarev, J. J. Long, G. Romer, S. Shivakumar, M. Tolton and T. Vassilakis, Dremel: Interactive Analysis of Web-scale Datasets, Proceedings of the VLDB Endowment, VLDB Endowment, 2010, Vol. 3(1-2), pp. 330-339.

[2] D. J. Abadi, P. A. Boncz and S. Harizopoulos, Column-oriented database systems, Proceedings of the VLDB Endowment, VLDB Endowment, 2009, Vol. 2(2), pp. 1664-1665.

[3] D. Bößwetter, Spaltenorientierte Datenbanken, Informatik-Spektrum, Springer, 2010, Vol. 33(1), pp. 61-65.

[4] A. El-Helw, K. A. Ross, B. Bhattacharjee, C. A. Lang and G. A. Mihaila, Column-oriented Query Processing for Row Stores, Proceedings of the ACM 14th International Workshop on Data Warehousing and OLAP, ACM, 2011, pp. 67-74.

[5] S. Idreos, F. Groffen, N. Nes, S. Manegold, K. S. Mullender and M. L. Kersten, MonetDB: Two Decades of Research in Column-oriented Database, IEEE Data Engineering Bulletin, 2012, Vol. 35(1), pp. 40-45.

[6] D. J. Abadi, Query execution in column-oriented database systems, Massachusetts Institute of Technology, 2008.

[7] D. J. Abadi, Column Stores for Wide and Sparse Data, CIDR, 2007, pp. 292-297.

[8] D. J. Abadi, S. R. Madden and N. Hachem, Column-stores vs. row-stores: how different are they really?, Proceedings of the 2008 ACM SIGMOD international conference on Management of data, 2008, pp. 967-980.

[9] S. Harizopoulos, V. Liang, D. J. Abadi and S. Madden, Performance tradeoffs in read-optimized databases, Proceedings of the 32nd international conference on very large data bases, 2006, pp. 487-498.

[10] D. Abadi, S. Madden and M. Ferreira, Integrating compression and execution in column-oriented database systems, Proceedings of the 2006 ACM SIGMOD international conference on management of data, 2006, pp. 671-682.

[11] V. Raman, G. Swart, L. Qiao, F. Reiss, V. Dialani, D. Kossmann, I. Narang and R. Sidle, Constant-Time Query Processing, IEEE 24th International Conference on Data Engineering (ICDE ’08), IEEE Computer Society, 2008, pp. 60-69.

[12] P. Raichand, A short survey of data compression techniques for column oriented databases, Journal of Global Research in Computer Science, 2013, Vol. 4(7), pp. 43-46.

[13] J. Goldstein, R. Ramakrishnan and U. Shaft, Compressing relations and indexes, Proceedings 14th International Conference on Data Engineering, IEEE, 1998, pp. 370-379.

[14] O. Polychroniou and K. A. Ross, Efficient lightweight compression alongside fast scans, Proceedings of the 11th International Workshop on Data Management on New Hardware, 2015, pp. 9.

[15] D. J. Abadi, D. S. Myers, D. J. DeWitt and S. R. Madden, Materialization strategies in a column-oriented DBMS, IEEE 23rd International Conference on Data Engineering, 2007, pp. 466-475.

[16] S. Idreos, M. L. Kersten and S. Manegold, Self-organizing Tuple Reconstruction in Column-stores, Proceedings of the 2009 ACM SIGMOD International Conference on Management of Data, ACM, 2009, pp. 297-308.

Queueing Theory and Practice – OR: Crash Course in Queueing

What this blog entry is about

The entry bases on the paper “The Essential Guide to Queueing Theory” written by Baron Schwartz at the company VividCortex which develops database monitoring tools.
The paper provides a somewhat opinion-oriented overview on Queueing Theory in a relatively well understandable design. It tries to make many relations to every day situations where queueing is applied and then provides a couple of annotation methods and formulas as a first approach to actual calculations.
This blog entry forms a summary of that paper but focuses on queueing in the context of ultra large scale web services and adds examples and information from my own knowledge and trivial research.

The goal of this blog entry is to provide fellow computer science and media programmers who consider working in the field of web servers an overview about queueing that might be helpful at some point to understand and maybe even make infrastructure- and design-decisions for large scale systems.

Note that some annotations do not match exactly to the paper from Schwartz because there seems to be no consensus between it and other sources either. Instead the best fitting variants have been chosen to fit this summary.

Queueing and intuitition

The paper sums up the issue as “Queueing is based on probability. […] Nothing leads you astray faster than trusting your intuition about probability. If your intuition worked, casinos would all go out of business, and your insurance rates would seem reasonable.”
Due to the skills required for survival in prehistoric times as well as due to most impressions we make nowadays in everyday life, the human mind is best suited for linear thinking. For example the Neanderthal knew that if he gathers twice as many edible fruits, he will eat off them twice as long (just imagine yourself today with snacks in the supermarket : ) ).

An exception to this is movement prediction where we know intuitively that a thrown object will fly a parabolic path. This part already comes to a limit though when we think for example of the famous “curved free kick” (Keyword: Magnus effect).
However when we try to think theoretically about things in our mind alone, we tend to resort solely to linear proportions between two values.

As we will see and calculate later, in queueing theory, relations are not just parabolic but unpredictably non-linear. A value can grow steadily for a long while until reaching a certain point and then leap rapidly towards infinity.
Let’s look at an example: You have a small web-server providing a website that allows the user to search and display images from a database.

Depending on how broad the search is, preparing and sending the search-results to the user takes a random amount between one and 3 seconds – on average that means 2 seconds. Also on average you expect 25 customers per minute.
Now the question is, how long will the user have to wait on average?

Intuitively (at least if you would not be a software dev ; ) ) you might say: Barely over two seconds. After all, handling 25 requests of 2s average each, requires just 50 seconds and thus only 5/6 (83.3%) of the systems time.
Unfortunately the reality would be a website with 5 seconds of wait time on average and far higher outliers.

The reasons are that with random request processing duration as well as random arrival time, request will naturally overlap and waiting time is automatically wasted. Whenever the server has no request to handle, those seconds are entirely lost. When multiple requests occur simultaneously later, it is not possible to “use up” that spare idle time from earlier. Instead time continues to flow and the requests have to be enqueued and wait.

The following graph shows the relation between “Residence Time” which is the whole time between arriving with a request and leaving with it (aka the site loading time) and “Utilization” of the server.

“Hockey Stick Graph” visualizing the relation between residence time and server utilization

We see a non-linear graph that is sometimes called “hockey stick graph” due to its markant shape. It shows well how the tipping point is somewhere around 80%. Past this amount of utilization, average wait time skyrockets.

The Residence Time and Utilization are only two of several values that we need to define to be able to talk on a common ground about queueing theory.

The common ground

To form a base of words and abbreviation see the following table.

MetricUnitSymbolDescription
Arrival RateRequests per timeAThe frequency of new requests to the system.
Queue LengthWaiting requestsQHow many requests are waiting in a queue on average.
Requests (in concurrency)RequestsRTotal requests currently waiting or being serviced.
Wait timeTimeWTime requests spend waiting in a queue
Service timeTimeStTime a request needs to be serviced on average. For example how long it takes to assemble the website data.
Residence time (latency)TimeRtTotal time from placing the request and returning the output. If ignoring data transfer delays, this naturally is W + St.
UtilizationFractionUPercentage of utilization of the servers. Means the ration between busy-time and idle-time.

Basic formulas

Many of the parameters listed above are related to each other through a handful of basic formulas that are mostly trivial once you understand them.
The first and most common one is referred to “Little’s Law” as it has been formulated and proved in 1961 by John D.C. Little:

R = A * Rt

Where R is the number of total requests in the system (in queue and currently serviced), A the arrival rate and Rt the total time of a request from arrival to having been serviced.
The relation is fairly straight forward as it simply says, the longer requests take in the system and the more often they occur, the more requests will accumulate in the system.
This relationship can be resolved for the queue length as well:

Q = A * W -> Queue length = arrival rate * wait-time in queue

Another important formula is the Utilization Law:

U = A * St -> Utilization = arrival rate * service time of a request

Logical; the more services arrive and the longer time they need to be serviced, the higher utilization will be on average. To calculate this for multiple servers, just divide the utilization by servers. Of course this is for the theoretical approach only as real web servers will inevitably have an overhead through load balancing.

Last we have another important formula that says that the residence time Rt which is the latency of a request, is equal to the service time divided through the idling-percentage of the servers (1 – utilization).

Rt = St / (1-U)

Those formulas however do not allow you to predict much of how a real system will behave yet because they require you to know at least one crucial thing like average queue length, wait time in queue or the utilization.
To compute any of those, a more advanced set of formulas are needed. However before we can calculate something for a system, we first need to decide how to describe such a system.

Kendall’s annotation

A way to fulfill exactly that and describe approximately how a service system has been designed, is the “Kendall’s annotation”. It allows to differ between systems by major parameters.
Up to six parameters, each anotated by a letter, are separated by dashes:

A/T/S/P/R/D

Unfortunately the exact letters for every parameter differ a lot between sources. Therefore this blog entry describes a variant that appeared senseful to the author. Most sources use exactly the same order of parameters though.
Of all parameters, the first three are most significant as the others can be their default values for many systems.

A

The first parameter describes with what behavior the service-requests arrive. Most commonly that is one of the following letters:

  • M or G: Memoryless or General; means the requests occur randomly (there’s no memory of the last request). This results in an exponential distribution.
  • Mx: Random occurence of x requests at once.
  • D: Degenerate distribution; A deterministic or fixed time between requests
  • Ek: Erlang Distribution; Erlang Distribution with k as the shape

T

T describes the service time distribution. The same letters as for (A) are common.

S

S describes the number of services that serve requests in parallel.

P

P denotes the number of places in the system including services and queues. When this limit is reached, new requests are denied.
If omitted, this parameter is ‘infinite’ (INF).

R

R is the number of possible requesters. This is only relevant if the number of requesters is relatively low because if a significant fraction is already in queues, new requests naturally become more scarce.
If omitted, this parameter is ‘infinite’ (INF).

D

D describes the de-queueing behavior. The following abbreviations are common:

  • FIFO: First in first out
  • LIFO: Last in first out
  • SIRO: Service in random order
  • PNPN: Service order based on a priority value for every request

If omitted, this parameter is ‘FIFO’ by default.

Example

Following the Kendall’s annotation, a possible variant of a web service system is:

M/M/(s)/(b)/INF/FIFO

Where (s) is the number of requests that can be processed in parallel and (b) is the number of places for requests in memory in total.
A service system described like that means it expects requests at random intervals* and each requiring random time to process. It expects an infinite number of potential requesters because even for the largest system it is not feasible to be able to handle a significant fraction of all people with internet access at the same time**. It utilizes FIFO queues.

* Of course a system can make predictions like higher demand on certain times on the day, but that is not immediate enough because it is not a correlation between two requests. However for some types of sites (like social media for example) it can be assumed that after a user has accessed the first time, subsequent requests will occur as he keeps using the site. This type of behavior cannot be modelled with Kendall’s annotation.

** However for services available only to special, registered users the number may be limited.

Predicting the Behavior of single-server designs

A major point about those relatively theoretical approaches at queueing systems are the possibility to make certain predictions about their behavior in practice. The two numbers of interest are queue length and wait time.

In the context of the service of a large scale system, the queue length is relevant to determine the required memory for the system (or every particular machine).
The wait time on the other hand is significant for the user’s satisfaction with the whole service.
Indirectly the results of those predictions determine the required performance of the system to avoid an utilization that results in high waiting times (both tending towards infinity) as seen in the “hockey stick graph” earlier and usually in high queue lengths as well.

First it makes sense to look at the formula generating said graph for a queue that has the Kendall annotation M/M/1. That means it has random request- and service time, infinite requesters and queue-memmory and unitlizes a FIFO principle. That all runs on a single server.
The formula is:

R = S / (1 – U)

Where R is the total wait time (“residence time”), S the service time and U the utilization of the server.
Following this, the residence time is proportional to 1/(1-U). That’s often referred to as the stretch factor because it describes how much the real total wait time is stretched compared to the time needed to process a request after it left the queue.

Try out the formula here with this interactive online tool (U has been used as x and R as y).
You will notice that the lower S is, the more the critical percentage of utilization can be put off. As a rule of thumb the following can be deduced: Halving the idle capacity, doubles the whole, average response time!

Using the graph formula together with “Little’s Law” mentioned earlier, it is possible to compute further values related to the queue:

R = U / (1-U)

Where R is the average number of customers currently in the system (in queue and currently serviced).

Q = U² / (1 – U)

Where Q is the average queue length.

Eventually the time W requests wait in a queue before being serviced can be computed on average as:

W = U*St / (1-U)

Where St the service time and U the utilization of the server.

Predicting the behavior of multi-server designs

The “correct method”: The Erlang formulas

Agner Erlang who pioneered in the field of telecommunications formulated a series of equations in his field. For example one to predict how many telephone lines would be needed to carry an expected volume of calls.
A modern form of the formula involves the unit nowadays known as “Erlang” that describes the service demand. Indirectly the amount of Erlang is equal to the amount of concurrency in the optimal case and therefore the number of servers required to handle all requests.

A practical example are the backbone-telephone lines. Naturally one telephone line can “service” 60 minutes of talk in one hour. That results in exactly 1 Erlang.
Now if the real requests sum up to 600 1-minute calls in one hour, that results in 600 minutes of talk and therefore 10 Erlangs.
In practice of course a backbone with 10 lines and said demand would mean that calls would often need to wait for a free line.

This is where Erlang’s formula ‘C’ comes into play:

ErlangC where A is the request load in Erlangs and M is the number of servers.

This massive, internally iterative formula calculates the probability that a new request has no free line and therefore has to wait in a queue.
A is the request load (the current demand) in Erlangs and M is the number of total servers (or telephone lines in this example).

Wolfram Alpha allows to see this formula in action.

As expected, for the edge-case of 10 Erlang of demand and 10 available servers, the probability that the new request has to wait is practically 100% because the probability that all calls coincidentally line up accurately one after another is negligible low.
With 11 servers or telephone lines that result in a system that allows more overlapping, the result of the Erlang C formula is already about 68%.

Applying “Little’s Law” it is again possible to derive different formulas to compute desired values, however Erlang’s formulas are not easy to apply and very unintuitive. For this reason, an approximation has been found.

The aproximated method

For the case that there is one waiting queue but Xs servers, a modification of the earlier basic formula can be used:

Rt = St / (1-U^Xs)

Where S is still the service time and U the utilization.
By applying the number of servers as an exponent to the utilization, the formula is equal to the old formula in the case of 1 server. Furthermore for other cases it results only in an underestimation of total request time of up to 10%. More information can be found in Cpt 2 of “Analyzing Computer System Performance With Perl::PDQ (Springer, 2005)” by Neil Gunther.

One common queue or one queue per server?

Using the formulas, the answer to this question is always that a single queue is more efficient. The logical reason becomes clear if we keep in mind that processing requests also takes random amount of time. This can result in a situation where one server is occupied with a large request while the other server has handled its whole queue already and now has to idle.
This is why server infrastructure tends to use “load balancers” that maintain a queue of user requests and spreads them to servers.
However because transmitting requests from the balancer to a server is taking time too, the servers usually hold queues themselves to ensure to be able to work constantly.

Nevertheless, sophisticated algorithms are required for load balancers to ensure a stable system especially under unusually high load or when servers drop out. This topic is handled in other blog entries.

You can experiment with the Erlang C formula modified to compute the residence time depending on the number of servers here.
This shows how a higher number of servers also allows for better utilization of the whole system before the wait time rises too much.

Problems and Limitations

Certain precautions have to be taken before the computations above can be used in a reliable way.

Exact measurements

The two key values of the system, service time S and current utilization U need to be known accurately. This can be tricky in server environments where random network- and infrastructure-delays are added under certain circumstances or where servicing can require waiting on other, secondary services (common nowadays when building serverless websites).
A quality measure of modern server management systems are the ability to determine this and especially the utilization of the system.

Ensuring exponential distribution

While this is typical, it has to be ensured that service times do spread sufficiently close to an exponential distribution.

Cross-dependency

Modern websites are often built “serverless” and require assembling data from different sources. It is possible to assume an abstraction and only view the desired layer of services (like only the outer that delivers to the users, or for example the system inside that delivers only the images to the other process that assemble the website). However things become less predictable when a sub-service utilizes a resource that is currently needed by a different request.

Conclusions

Due to those reasons, the authors of the major paper this blog entry is based on, suggest that the queueing theory described above is most suited for everyday and physical problems. There they can be applied for relatively simple decisions. Additionally they can be used to find possible problem-causes when an actual system does not behave as assumed. In the end they mainly help to adjust peoples personal intuition that often is wrong due to the non-linearity of relations in queueing problems.

Personally I find that knowing about queueing theory and having experimented with the formulas can indeed open ones eyes to what is actually predictable and what is not. Furthermore together with modern observation tools for server systems, I would definitely suggest trying to apply the concepts and formulas to verify whether a certain system is working in an optimal way or not. Last but not least they can form a starting point when beginning to design a server infrastructure.

Writing High Performance Code on Modern Hardware

Today, with the use of modern hardware combined with optimized high performant code, it is an easy task to process more than 500 million images per day on a single machine. Small improvements in the underlying implementations can have extreme large impacts on the execution time and are therefore fundamentally important to handle the huge amount of data in modern large scale systems. Furthermore, we can dramatically reduce the costs of our infrastructure and stay competitive by optimizing our implementations. To show how this is possible and what optimizations we can use in our everyday programmer life, let us have a look at the following imaginary task:

The Task

Our example task is to calculate the average color of an image as fast as possible. The task is inspired by various applications, like Google Images, which contains many images and shows the average color of the underlying image, before all data is available on the device. This can be used to hide the latency and to give a more fluid feeling to the application. At first glance, there is nothing special about it but if we think about how many images are uploaded to modern large scale systems and how much computing power we would need to process them, things are getting pretty interesting. The calculation of the average color of an image gives a good example, where we can apply fundamental optimizations to our code. To compute the average color, we have to process every pixel of an image which can be a very expensive task, especially, if we have to deal with high resolution images. Unfortunately, for the developers who have to deal with these high resolution images, modern devices have high resolution cameras and displays which increase the computational costs. To get the correct average color, we would have to square the individual values, and calculate the average color based on the squared values. But in our case, it is enough to simply add the values from every color channel together without squaring them, and divide it by the number of pixels. This will give us a slightly darker average color which is no problem for our task and furthermore, we will get performance improvements based on this simplification, as we will find out in the following sections.

Choosing a Language

Today, many programming languages exist, each with its own advantages and disadvantages and it became a challenge to choose the perfect language for a given task. If we want to write high performant code, things are getting even more complicated because for every languages there are different optimization techniques and we can find different test cases, where they can beat other languages. To test which language is the right one for our purpose, I implemented the basic calculation of the average color with the languages I commonly use: C++, Java and Python with Numpy. Although, I already expected that the good old C++ will win this fight, I was surprised about the results:

Calculation time in % when calculating the average color value with different programming languages

One thing should be noted, these results can only be seen as a basic overview about how much improvements we can get, if we choose different languages for specific tasks. These differences should not be transferred to a general comparison between the performances of these languages! The real improvements always depend on the specific task and in this case, the basic C++ implementation is already twice as fast as the Java solution and more than ten times faster than Python without any optimizations!

Know your Data

For further improvements, we have to take a look at our data, because then we can find ways to optimize our memory accesses. In most cases, 8 bit per color channel (Red, Green, Blue) is used to store our data. Additionally, an alpha value is stored as a fourth color channel to represent the transparence of our picture. So all in all, we have 4 color channels per pixel, each containing a number between 0 and 255. Our pixels are stored like the plain-PPM file format, where the RGBA values of all pixels are listed one after another. If we calculate one channel after another, we can get a low performance, based on inefficient memory access. If we use libraries, from which we do not exactly know how they store the data, we can easily have an inefficient memory access without even noticing. The imaginary used API could have stored our images in a different way, whereby the color channels are stored in four separate arrays. This could be useful in some cases, but if we now have a function to access a single pixel, we have created a memory inefficient access. Due to a memory inefficient access, where we calculate one channel after another, the calculation time increases drastically:

Calculation time in % with bad or efficient memory access

We, as programmers, are more familiar with Integer or Float data types, that are in most cases represented by 32 bits. We use them basically everywhere when we have enough available memory, even if we could use smaller data types. We do not care about the small decrease of our memory footprint, but the reduced memory consumption is not everything we can get from more suitable data types. Due to suitable data types, we can get additional performance improvements:

Calculation time in % with different datatypes

Now our compiler has additional information about the data and can use more or even better optimizations. With this small change, we reduce the calculation time by more than 40% and this only by storing our data in a Char with 8 Bits instead of an Integer with 32 Bits!

Know your Hardware

If we know our hardware, we can use further optimization techniques to get the best out of our system. Modern CPUs come with many cores and therefore, with huge performance gains. Additionally, they can use a technique that is called vectorization, whereby our hardware can make multiple calculations in fewer steps and, if this is not enough, we can also utilize the raw computing power of modern GPUs.

Vectorization

Vectorization uses special hardware registers, by which it is possible to make calculations faster. These registers are limited in size, often 128-Bit or 256-Bit, and we can fill them with our data. In our case, we add 4D vectors together. Normally, we have to make four additions, one for each element of the vector but if we use vectorization, this could be done in a single step. First, I implemented a basic SIMD (Single Instruction Multiple Data) vector calculation where we can add two 4D vectors, each stored in 128-Bit, together in a single step. But this simple approach increased rather than reduced the calculation time, how can this be? Our compiler does a great job in optimizing our code, whereby he already tries to use vectorization automatically! This is especially visible in the performance improvements we got by using 8 Bits to store our data, now, the compiler could detect this and could add more values together in a single step with automatic vectorization. It was not an easy task to implement a faster vectorization solution, but we can still get some improvements by using AVX2 (Advanced Vector Extensions) instructions with 256-Bit registers. We could store 32 8-Bit values in these registers but because we need more bits to store our sum, this representation is not enough. The next bigger data type would be 16-Bits where we can add 16 values each with 16 bits together in a single step. With 16 bits we can sum 256 values together if we do not square the values, without losing data and with this knowledge we can get again performance improvements:

Calculation time in % with vectorization techniques

Multiprocessors

Modern CPUs are multiprocessors, to get performance gains by parallelization instead of the nearly impossible increase of clock rate. By using multiprocessors, we can distribute the work over multiple cores and can fully utilize the CPU. For our task, we use six threads corresponding to six hardware cores, where every thread calculates the average color of an individual image. Due to the fact that multiple threads do not access the same data, we are free of race conditions, which makes our life easier. With six hardware cores, we would expect that we also will be able to process six times more images, but starting and waiting for threads also consumes time, so that we end up with an 4.5 times faster implementation than the single threaded version.

GPUs

The next step to get more performance, is to use GPUs. GPUs are a great choice if it comes to raw calculation performance, based on their hardware architecture. To keep it simple, GPUs have way more cores than CPUs but GPU cores are more lightweight than CPU cores, which means we do not have thousand individual CPU cores running concurrently on a GPU. But if we are aware of our hardware architecture, they can be executed nearly concurrently and we can get huge performance improvements especially for calculation intensive tasks. Many programmers have not even touched GPU programming, but today it is quite easy to get good performance, even without heavy optimization or hardware knowledge. For our task, even a very simple and unoptimized OpenCL solution is better than our optimized multicore C++ implementation. We perform a simple parallel sum on our color vectors, by which we start as many GPU-Threads as we have pixels in our image. First, every GPU thread loads a single value into local memory, then we calculate the sum of 256 Elements and store the average color of these elements on our GPU. We can repeat this steps until we have the average color of our whole image, and that’s basically all we have to do on the GPU side to get a 25% faster solution! Another advantage is, that GPUs often scale better with larger data as CPUs. This is very helpful for our high resolution images:

Relative execution time on different resolutions

Conclusion

The used programming language and suitable data types can heavily improve our performance without complicated optimizations. This is a simple way to write more performant code. Furthermore, we can integrate this easy changes in our everyday work to improve our implementations. If we want further optimizations, we get stuck in the endless space of possible techniques and hardware dependent optimizations, but even with common techniques, we can get great performance improvements. We can use vectorization, multicore CPUs and GPU programming to get the best out of our systems. Especially with GPU programming we can get great results, even without heavy optimization and furthermore, GPU programming became more easy in the past years and it is easy to adopt it in our systems. With this techniques, it was possible to reduce the calculation time of our example task to less than 1% compared to the simple Python implementation. It is always hard to talk about general optimization techniques, but I hope that the results of our imaginary task give some motivation and suggestions what can be achieved with modern hardware and optimized code:

Calculation time in % for all optimizations

Apache Kafka – The one Stream Processor to rule them all?

If there is one statement that can be made about the current developments in the realm of distributed systems, it would probably be how most developers are turning away from a centralised, monolithic architecture and move towards a microservice architecture. This type of architecture proved itself as much more flexible and robust for the modern world where more and more software is offered as a cloud-based solution. By splitting up systems into smaller parts, they can be updated more easily and crashed services can be recovered faster. These services can be containerized with Docker, so quickly putting up and pulling down parts of the infrastructure became very easy. On most occasions it is simply less work to trash a running software instance and recreate it, than logging into the instance and trying to fix what is broken.

Continue reading

A Dive into Serverless on the Basis of AWS Lambda

Hypes help to overlook the fact that tech is often reinventing the wheel, forcing developers to update applications and architecture accordingly in painful migrations.

Besides Kubernetes one of those current hypes is Serverless computing. While everyone agrees that Serverless offers some advantages it also introduces many problems. The current trend also shows certain parallels to CGI, PHP and co.

In most cases, however, investigations are limited to the problem of cold boot time. This article will, therefore, explore Serverless functions and their behavior, especially when scaling them out and will provide information on the effects this behavior has on other components of the architecture stack. For example, it is shown how the scaling-out behavior can very quickly kill the database.

Continue reading

Improved Vulnerability Detection using Deep Representation Learning

Today’s software is more vulnerable to cyber attacks than ever before. The number of recorded vulnerabilities has almost constantly increased since the early 90s. The strong competition on the software market along with many innovative technologies getting released every year forces modern software companies to spend more resources on development and less resources on software quality and testing. In 2017 alone, 14.500 new vulnerabilities were recorded by the CVE (Common Vulnerability and Exposures) database, compared to the 6.000 from the previous year. This will continue in the years to come. [1]

Continue reading