Open Source Batch and Stream Processing: Realtime Analysis of Big Data

Abstract

Since the beginning of Big Data, batch processing was the most popular choice for processing large amounts of generated data. These existing processing technologies are not suitable to process the large amount of data we face today. Research works developed a variety of technologies that focus on stream processing. Stream processing technologies bring significant performance improvements and new opportunities to handle Big Data. In this paper, we discuss the differences of batch and stream processing and we explore existing batch and stream processing technologies. We also explain the new possibilities that stream processing make possible.

1 Introduction

A huge amount of information is generated everyday by social media, e-mails, sensors, instruments and enterprise applications, to mention a few resources. This amount of data brings a lot of challenges according to volume, velocity and variety. In the past two years, 90% of all data was created and the amount of data will double every two years. This data comes in a variety of formats and types, each of it requires a different way to process the generated data [9].

Batch processing was the most popular choice to process Big Data. The most notable batch processing framework is MapReduce [7]. MapReduce was first implemented and developed by Google. It was used for large-scale graph processing, text processing, machine learning and statistical machine translation. MapReduce can process large amounts of data but is only designed for batch processing. Today’s demands rely on real-time processing of Big Data that will finish in seconds [19]. For this demand, various stream-processing technologies have been developed. In this paper we will focus on Apache Spark Streaming [22] and Apache Flink [6], which are the most famous tools for stream processing [12].

In this work we will explain the concepts of batch processing and stream processing in detail while introducing the most popular frameworks. After that we introduce new opportunities that stream processing provides to face today’s issues where a response is needed in seconds.

2 Related Work

Big Data analysis is an active area of research but comparisons of Big Data analysis concepts are difficult to find. Most research papers focus on comparing stream processing frameworks on performance. In this work we will focus on open source technologies. There are widely used proprietary solutions like Google Millwheel [1], IBM InfoSphere Streams [3] and Microsoft Azure Stream Analytics [8] we won’t discuss in this paper.

Lopez, Lobato and Duarte describe and compare the streaming platforms Apache Flink, Apache Spark Streaming and Apache Storm [15]. The work focuses on processing performance and behaviour when a worker node fails. The results of each platform are analysed and compared.

Shahrivari compares the concepts between batch processing and stream processing [19]. In detail, the work compares the performance of MapReduce and ff Apache Spark Streaming with different experiments.

Unlike the mentioned papers, we will focus on the difference between batch processing and stream processing and discuss the new opportunities of stream processing instead of comparing performance measurements.

3 Batch Processing

Batch jobs run in the background without any interaction from an operator. In Theory a batch job gets executed in a specific time window between the end of a workday and the start of the next workday to process millions of records which will take hours to execute. This time window will increase with availability requirements. Batch processing is still used today in organisations and financial institutions [10].

Individual batch jobs are usually organized into calendar periods. Common batch schedules are daily, weekly, and monthly batches. Weekly and monthly batch schedules are mostly used for technical tasks like backups, integrity checks or disk defragmentation. Functional tasks should be executed on a daily schedule. Typically jobs on a daily basis are, data processing and transferring. Organizing the batch schedule can save effort in the development cycle. To categorize a job, a simple rule of thumb is to determine if it has to do a functional or a technical task. To reduce batch execution time, performing jobs in parallel is a key factor [10].

There are two different architectures of how batch jobs should be executed: As scripts or as services. The major differences are logging and control. Batch jobs that will run as a service usually report their status through log files and can be controlled over a control panel which is provided by the system. Batch jobs that are triggered over the command line, report their progress through streams and an appropriate exit code. The batch scheduler will terminate the job if it’s necessary [10].

3.1 MapReduce

MapReduce is a programming model that enables processing and generating large amounts of data. The model defines two methods: map and reduce.

Figure 1: Pseudo code of counting the number of occurrences of each word in a large selection of documents.

The map function takes a key/value pair as input to generate an intermediate set of key/value pairs. The reduce function takes an intermediate key and intermediate values associated to that key, as input and returns a set of key/value pairs [7]. Figure 1 illustrates the MapReduce programming model of a real-world use case.

4 Stream Processing

Stream processing refers to real-time processing of continuous data [14]. A stream processing system consists of a queue, a stream processor and real time views [16].

In a system without a queue, the stream processor has to process each event directly. This approach cannot guarantee that each event gets processed correctly. If the stream processor dies, there is no way to detect the error. A cluster would be overwhelmed by the incoming amount of data it has to process. A persistent queue helps to address these issues. Writing events to a persistent queue before processing the data will buffer the events and it allows the stream processor to retry an event when it fails [16]. An example for a modern queue system is Apache Kafka [13].

The stream-processor processes incoming events in the queue and then updates the real time views. There are two models of stream-processing that have emerged in the recent years: Record-at-a-time and micro-batched [16].

Record-at-a-time stream processing The record-at-a-time processing model processes tuples independently of each other, updates the internal state and sends out new records in response. This leads to inconsistency, when different nodes process different data that arrive at different times. The model handles recovery through replication which requires twice the amount of hardware. This is not optimal for large clusters [22]. To be scalable with high throughput, the systems run in parallel across the cluster [16].

Micro-batch stream processing The micro-batch stream processing approach processes the tuples as discrete batches. A batch is processed in a strong order until completion before moving on to the next batch. To know if a batch has been processed before, each batch has its own unique identifier that always stays the same on every replay [16].

4.1 Apache Spark Streaming

Apache Spark Streaming is an extension to the Apache Spark cluster computing engine. It was developed to overcome the challenges of the record-at-a-time processing model. Spark Streaming provides a stream programming model for large clusters called discretized streams (D-Streams). In D-Streams, streaming computation will be treated as a series of deterministic batch computations on small time intervals [22].

To generate an input dataset for an interval, the received data during that interval is stored reliably across the cluster. To generate new datasets as a response, after each interval the datasets are processed via deterministic parallel operations. To avoid replication by using lineage, the new datasets will be stored in resilient distributed datasets (RDDs) [21]. A D-Stream allows users to manipulate grouped RDDs through various operations [22].

Figure 2: Each RDD contains data from a certain time interval [24].

D-Stream provides consistency, fault recovery and integration with batch systems to bring batch processing models to stream processing. Apache Spark Streaming lets users mix together streaming, batch and interactive queries to build integrated systems [22].

4.2 Apache Flink

Apache Flink is a stream-processing framework and an Apache top-level project. The core of Apache Flink is a distributed streaming data-flow engine which is optimized to perform batch and stream analytics [6]. The distributed streaming data-flow engine executes programs called dataflow graphs which can consume and produce data [4].

Dataflow graphs consists of stateful operators and data streams. The stateful operators implement logic of producing or consuming data. Data streams distribute the data between all operators. On execution Dataflow graphs parallelize operators into one or more instances called subtasks and split streams into one or more stream partitions [6].

Figure 3: Code showing the Apache Flink dataflow programming model [23].

Apache Flink is a high-throughput, low-latency streaming engine and optimized for batch execution using a query optimizer [6]. Dataflow graphs are optimized to be executed in a cluster or cloud environment [20].

5 Stream Processing Opportunities

Batch processing is still needed for legacy implementations and data analysis where no efficient algorithms are known [6]. Nevertheless, stream processing offers new opportunities to face issues where the result is needed in seconds instead of hours or days.

5.1 Machine Learning

Machine learning for Big Data is dominated by online machine learning algorithms. In streaming there is a need for scalable learning algorithms that are adaptive and inherently open-ended [11]. This makes online machine learning optimal for stream processing where the algorithm has to adapt new patterns in the data dynamically.

Apache Flink Apache Flink brings together batch processing and stream processing. This makes Apache Flink very suitable for machine learning [11]. Apache Flink provides the machine learning library FlinkML. FlinkML supports the PMML standard for online predictions [5].

Apache Spark Apache Spark provides a distributed machine learning library called MLlib. MLlib provides distributed implementations of learning algorithms that can serve (but not limited to) linear models, naive Bayes, classification and clustering. MLlib can be integrated with other high-level libraries, for example Apache Spark Streaming. Apache Spark Streaming enables the development of online learning algorithms with MLlib on realtime data streams [17].

Detecting cases of fraud is an ongoing area of research. A study from 2016 estimated, that credit card fraud is responsible for over 20 billion dollars in loss worldwide [18]. It is important to detect credit card fraud immediately after a financial transaction has been made. Today, credit card fraud can be detected with supervised or unsupervised machine learning models [2]. For an instant detection, online machine learning on realtime data stream serves the needed technology to face this issue.

6 Conclusion

This paper explains the two data analysis concepts batch processing and stream processing. Since realtime analysis is needed to face the issues of today’s demands, batch processing is still being used for legacy implementations and data analysis where no efficient algorithms are known. Stream processing offers new opportunities to handle big data and response with an immediate result to the user.

References

[1] Tyler Akidau, Alex Balikov, Kaya Bekiroglu, Slava Chernyak, Josh Haberman, Reuven Lax, Sam McVeety, Daniel Mills, Paul Nordstrom, and Sam Whittle. Millwheel: Fault-tolerant stream processing at internet scale. In Very Large Data Bases, pages 734–746, 2013.

[2] Bart Baesens, Veronique Van Vlasselaer, and Wouter Verbeke. Fraud Analytics Using Descriptive, Predictive, and Social Network Techniques: A Guide to Data Science for Fraud Detection. Wiley Publishing, 1st edition, 2015.

[3] Chuck Ballard, Kevin Foster, Andy Frenkiel, Bugra Gedik, Michael P. Koranda, Deepak Senthil, Nathanand Rajan, Roger Rea, Mike Spicer, Brian Williams, and Vitali N. Zoubov. Ibm infosphere streams: Assembling continuous insight in the information revolution. IBM Redbooks publication, 2011.

[4] Ilaria Bartolini and Marco Patella. Comparing performances of big data stream processing platforms with ram 3 s (extended abstract).

[5] Andr´as Bencz´ur, Levente Kocsis, and R´obert P´alovics. Online machine learning in big data streams. 02 2018.

[6] Paris Carbone, Asterios Katsifodimos, † Kth, Sics Sweden, Stephan Ewen, Volker Markl, Seif Haridi, and Kostas Tzoumas. Apache flink TM : Stream and batch processing in a single engine. IEEE Data Engineering Bulletin, 38, 01 2015.

[7] Jeffrey Dean and Sanjay Ghemawat. Mapreduce: A flexible data processing tool. Commun. ACM, 53, 01 2010.

[8] Charles Feddersen. Real-time event processing with microsoft azure stream analytics. Jan 2015.

[9] Mugdha Ghotkar and Priyanka Rokde. Big data: How it is generated and its importance.

[10] Dave Ingram. Design – Build – Run: Applied Practices and Principles for Production-Ready Software Development. Wrox, 2009.

[11] W. Jamil, N-C. Duong, W. Wang, C. Mansouri, S. Mohamad, and A. Bouchachia. Scalable online learning for flink: Solma library. In Proceedings of the 12th European Conference on Software Architecture: Companion Proceedings, ECSA ’18, New York, NY, USA, 2018. Association for Computing Machinery.

[12] J. Karimov, T. Rabl, A. Katsifodimos, R. Samarev, H. Heiskanen, and V. Markl. Benchmarking distributed stream data processing systems. In 2018 IEEE 34th International Conference on Data Engineering (ICDE), pages 1507–1518, April 2018.

[13] Jay Kreps. Kafka : a distributed messaging system for log processing. 2011.

[14] Anuj Kumar. Architecting Data-Intensive Applications. Packt Publishing, 2018.

[15] M. A. Lopez, A. G. P. Lobato, and O. C. M. B. Duarte. A performance comparison of open-source stream processing platforms. In 2016 IEEE Global Communications Conference (GLOBECOM), pages 1–6, Dec 2016.

[16] Nathan Marz and James Warren. Big Data: Principles and best practices of scalable realtime data systems. Manning Publications, 2015.

[17] Xiangrui Meng, Joseph Bradley, Burak Yavuz, Evan Sparks, Shivaram Venkataraman, Davies Liu, Jeremy Freeman, DB Tsai, Manish Amde, Sean Owen, and et al. Mllib: Machine learning in apache spark. J. Mach. Learn. Res., 17(1):1235–1241, January 2016.

[18] David Robertson. The nilson report, issue 1096. Oct 2016.

[19] Saeed Shahrivari. Beyond batch processing: Towards real-time and streaming big data. Computers, 3, 03 2014.

[20] Daniel Warneke and Odej Kao. Nephele: Efficient parallel data processing in the cloud. In Proceedings of the 2nd Workshop on Many-Task Computing on Grids and Supercomputers, MTAGS ’09, New York, NY, USA, 2009. Association for Computing Machinery.

[21] Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauly, Michael J. Franklin, Scott Shenker, and Ion Stoica. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. In Presented as part of the 9th USENIX Symposium on Networked Systems Design and Implementation (NSDI 12), pages 15–28, San Jose, CA, 2012. USENIX.

[22] Matei Zaharia, Tathagata Das, Haoyuan Li, Scott Shenker, and Ion Stoffi ica. Discretized streams: An e cient and fault-tolerant model for stream processing on large clusters. In Proceedings of the 4th USENIX Conference on Hot Topics in Cloud Ccomputing, HotCloud’12, pages 10–10, Berkeley, CA, USA, 2012. USENIX Association.

[23] Dataflow Programming Model. https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html

[24] Discretized Streams (DStreams). https://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams

Kubernetes: from Zero to Hero with Kompose, Minikube, k3sup and Helm — Part 1: Design

This is part one of our series on how we designed and implemented a scalable, highly-available and fault-tolerant microservice-based Image Editor. The series covers the various design choices we made and the difficulties we faced during design and development of our web application. It shows how we set up the scaling infrastructure with Kubernetes and what we learned about designing a distributed system and developing a production-grade Kubernetes cluster running on multiple nodes.

Continue reading

How to build fault-tolerant software systems

June 4th, 1996 – Ariane 5 rocket explodes a few seconds after being launched. The disaster was caused by a simple software error [1].

A brief introduction to the fundamental concepts of Erlang and Elixir


Ever since the first electronic systems have been created, engineers and developers have strived to provide solutions to guarantee their robustness and fault-tolerance. Thereof arose the understanding that developing and building a fault-tolerant system is not an easy task, because it requires a deep understanding of how the system should work, how it might fail, and what kinds of errors could occur. Indeed, it became obvious that successful error detection and management are essential for the accomplishment of fault tolerance. That is, once an error has occurred, the system might be able to tolerate it by replacing the offending component, using an alternative means of operation, or raising an exception. However, architectures relying on such approaches exhibited considerable complexity, and thus, resulted in unpredictable and less reliable systems. Consequently, the development of robust and error resistant systems has become an ongoing endeavor for engineers and software developers alike, who evermore intent to develop new approaches to solve this enduring problem inherent to complex systems.

Continue reading

Reproducibility in Machine Learning

The rise of Machine Learning has led to changes across all areas of computer science. From a very abstract point of view, heuristics are replaced by black-box machine-learning algorithms providing “better results”. But how do we actually quantify better results? ML-based solutions tend to focus more on absolute performance improvements (measured by metrics) instead of factors like resilience and reproducibility. On the other hand, ML models have a significantly growing impact on humans. One can argue that the danger is negligible for applications like playing games but with direct impacts like self-driving in production, there comes a responsibility. This responsibility was strengthened not only by laws such as the EU General Data Protection Regulation (GDPR).

Nevertheless, the objective of this post is not to philosophize about the dangers and dark sides of AI. In fact, this post aims to work out common challenges in reproducibility for machine learning and shows programming differences to other areas of Computer Science. Secondly, we will see practices and workflows to create a higher grade of reproducibility in machine learning algorithms.

Background

Having a software engineering background, my first personal experience of programming in machine learning felt like going back in time. Many frameworks are evolved and highly used in practice (TensorFlow, keras, pytorch, …) but other’s are still in the early stages and evolve quickly. This fact shouldn’t be surprising regarding the short history of current ML implementations. However, the definition of frameworks differs from other areas of Computer Science. Tensorflow and others create an abstraction layer for the underlying mathematical operations and indeed simplify processes like training, optimizations and more. But for me, they are closer to a toolkit of operations than a cookbook with best-practices.

Especially scientific results are often implemented with the same toolkit but as a standalone project. For this reason, the grade of reusability of such implementations is often low. Research scientists are interested in the most recent publications but there is no baseline project which can be used for different approaches, models and datasets. It’s more about copying and pasting workflows, downloading datasets and hacking it together. However, the research in ML is now establishing programming paradigms which exist in other parts of computer science for decades. Further, I am thankful to anyone contributing to state-of-the-art implementations in the first place. Thus, we will move the scientific scope into the background from now on.

Taking a more practical approach into account, Jupyter notebooks are often used as a starting point to explore data and different approaches. They are a great tool to evaluate a proof-of-concept and to showcase initial findings. However, notebooks tend to be chaotic with increasing complexity. In certain aspects, we can compare the workflow to creating an MVP in software engineering. You can reuse the created MVP as a setup for the productive application, but you shouldn’t expect a clean and extensible architecture then.

A machine learning workflow

For a better understanding, the following figure shows a typical workflow and the components of development in Data Science:

  1. Load and preprocess data, bring it into an interpretable form for our ML model.
  2. Code a model and implement the block-box magic that empowers AI.
  3. Train, Evaluate and fine-tune the model over days, weeks or months.
ML workflow
https://cloud.google.com/ml-engine/docs/tensorflow/ml-solutions-overview

After an initial implementation and similar to software lifecycles, we have the following steps:

  1. Deploy the program (model) to our dedicated infrastructure (Cloud, local).
  2. Use the model in production.
  3. Monitor the application and its predictions.
  4. Maintain the source code, implement new features and deploy new versions.

Frameworks like TensorFlow provide tools to read data, train models and evaluate them with different metrics. Further, approaches like TensorFlow Serving address the second part of the workflow to deploy models on infrastructure for production. Nonetheless, these tools don’t explicitly address reproducibility issues in ML. For a better understanding, the following section goes one step back by pointing out these challenges.

Challenges in ML reproducibility

In contrast to other fields of computer science, the results are non-deterministic. In other words, the same source code produces different results for the same dataset. Reasons mostly lie in implementation details such as random initializations of parameters or randomly shuffled datasets.

However, the baseline for collaboration in software engineering is a project environment where changes can be reproduced. There is not enough time in this blog post to discuss concepts like Versioning and Continous Integration, but in general, they lead to projects that are less error-agnostic due to automatic testing and deploying. Furthermore, contributors are able to comprehend changes and reproduce it on their environment (if guidelines and rules are followed).

Having the non-determinism in mind, the  objective for ML is a process in which results can be produced having the exact same results. The following aspects address this issue:

  1. Versioning of models: Models should be versioned and any changes be transparent.
  2. “Results without context are meaningless.“ (https://www.pachyderm.io/dsbor.html). For reproducibility, the collection of metadata is essential. Running a model on a dataset and versioning it does not answer questions such as: Where did the data come from? How can we rerun the model with an updated dataset?
  3. A reproducibility flag should enable a mode in which features causing non-deterministic results such as random initialization are disabled.

Coming back to Jupyter notebooks as a baseline for ML projects, they are a nightmare from a versioning and reproducibility perspective. First, notebook cells can run in different orders which makes the results hard to understand. Secondly, the actual source of notebooks is illegible which basically means that one can’t understand changes between versions regarding the source code. So as software developer, just imagine checking out a version and searching for changes in the application because source differences are meaningless.

So, how in practice?

We have gained an intuition for challenges in ML development and learned that versioning and collecting metadata is crucial for reproducibility. We are now answering the question of how to address these issues with in practice.

Data versioning tools

Data version control (DVC) or datmo are open source production tools for model management. In other words, they are versioning tools similar to git but address additional data scientist needs. One fundamental need is the integration of large files from different data sources. Git is not made to handle large files and the source for machine learning data is often on a different source (Public Cloud, Customer’s infrastructure).

Thus, the Git Large File Storage (LFS) replaces large files such as data input files with text pointers inside Git and stores the data on a remote server. Going one step further, we don’t want the data as part of the tool but the entire workflow. For a better understanding, the following figure illustrates a very basic scenario for DVC.

https://dvc.org/doc/use-cases/data-and-model-files-versioning

We publish and check out the code using a remote hub (Gitlab, Github, …) just as usual. On top of this, we use DVC to publish and retrieve data versions using a different hub. This separation of code and data makes it possible to test different program versions on various data versions. This is particularly useful to reproduce a model’s performance over time in production (after collecting new data).

On top of this, the tools address the following features (not exclusively):

  • Language- and Framework-agnostic: Implement projects in different languages (Python, R, Julia, ..) using different frameworks (TensorFlow, PyTorch, …)
  • Infrastructure-agnostic: Deploy models to different environments and infrastructures (Google Cloud, AWS, local infrastructure)

However, the infrastructure-agnostic feature comes with a drawback. DVC or datmo lack pipeline execution features for build pipelines, monitoring or error handling. The philosophy of these tools is to be very generic without running servers. They are slim command line tools without user interfaces.

Pachyderm

In order to come closer to continuous integration, we need deployment pipelines and modular infrastructure. The goal is an automated processes of releasing new versions, testing it on a staging environments and deploying it to customers. The borders between infrastructure and programming are blurring and the same should apply to machine learning. The two keywords that pop up in every (modern) infrastructure are containers (docker) and Kubernetes. Say hi to Pachyderm.

Pachyderm runs on top of Kubernetes which makes it deployable to any service that supports Kubernetes (Google Cloud Platform, AWS, Azure, Local infrastructure). Further, it integrates git-like features to version code as well as data and it shares many of its names (repository, pipeline, …). With Pachyderm, we configure continuous integration pipelines with container images.

Image result for pachyderm data science
https://www.slideshare.net/joshlk100/reproducible-data-science-review-of-pachyderm-data-version-control-and-git-lfs-tools

The above figure shows a baseline workflow in Pachyderm. Assuming that we already created a repository and coded our model, we put a file to our dedicated data storage. We then create pipelines whose configurations are written as JSON-files and can look as follows:

{
  "pipeline": {
    "name": "word-count"
  },
  "transform": {
    "image": "docker-image",
    "cmd": ["/bin", "/pfs/data", "/pfs/out"]
  },
  "input": {
      "pfs": {
        "repo": "data",
        "glob": "/*"
      }
  }
}

By executing the above configuration with the Pachyderm command line interface pachctl create-pipeline -f above_pipeline.json, we run the commands in cmd within the container created from an image defined in image. Further, we use remote storages like S3 or store it on the Pachyderm File System (pfs). PFS is a distributed filesystem which is, until a certain level, comparable to the Hadoop file system (HDFS) where MapReduce-Jobs are replaced by Pachyderm pipelines (see 8).

After creating the pipeline above, Pachyderm will launch worker pods on Kubernetes. These worker pods will remain up and running, such that they are ready to process any data committed to their input repositories.

KubeFlow: Distributed large-scale deployment

As described beforehand, Pachyderm let us create scalable and manageable ML pipelines on Kubernetes. Although Pachyderm can be parallelized in a map/reduce-style way, the pipelines mostly rely on single nodes and non-distributed training (multiple GPUs, but not multiple nodes). Having a different approach in mind, KubeFlow mainly focuses on standards to deploy and manage distributed ML on Kubernetes. It integrates tools like Distributed TensorFlow or TensorFlow Serving and further JupyterHub which improves the process of developing in teams on shared notebooks.

However, KubeFlow (as of now) lacks tools that orchestrate Data Science workflows as seen earlier (Data Preprocessing, Modelling, Training, Deployment, Monitoring, …). It leaves these responsibilities up to the developer. Since this blog post mainly focuses on reproducibility in Machine Learning, KubeFlow does not answer these questions satisfactory. Consequently, further concepts are out-of-scope for this post while not being less exciting for productive and large-scale ML engineering.

Nevertheless, reproducibility and productivity should go hand in hand. For this reason, KubeFlow and Pachyderm can be jointly used in practice. In such a scenario, Pachyderm would provide the reproducibility through pipelines and KubeFlow would bring with the ease of deployment and distributed framework integrations (see 67 for more details).

So what should I use?

After an introduction to tools such as DVC and Pachyderm, one last question remains: Which is the best tool in production? And as always the answer is – it depends. DVC can improve productivity in smaller teams to organize and version projects and link the source code to the data. However, for organizations willing to introduce a workflow, richly-featured tools such as Pachyderm are the way to go. Taking one step further, KubeFlow paves the way for large-scale and distributed applications.

From a different point of view, the discussion behind it could be seen as a discussion about Kubernetes itself. This is fairly more wide-reaching and asks fundamental questions such as: Can we pay someone to set up and maintain the Kubernetes Cluster? Are our applications and workflows complex enough (multiple nodes, not multiple GPUs) to justify the overhead of Kubernetes? Unfortunately, we can’t answer these questions in this blog post.

Wrapping it up

In Machine Learning, programs can have the same meaning and even speak the same language but output different results because context matters and implementations are full of (intended) randomness. Further, development in ML is very sensitive to changes and even small differences can have high impacts on the result. For reproducibility, we have to record the full story and keep track of all changes.

Frameworks such as DVC or Pachyderm help to keep track of not only the code but also the data. Furthermore, they use pipelines to reproduce results and simplify collaborative projects. This increases the reproducibility and corresponds to the responsibility in ML. On top of this, the tools are a first step towards the fulfillment of laws like the GDPR because results can at least be reproduced. However, these solutions are to some extent immature and evolve quickly (however, just like everything else in ML). It is still a long way to go to obtain practices in ML which are comparable to standards in software engineering.

Related Sources and further reading

  1. Collaboration Issues in Data Science (accessed: 25.02.19): https://github.com/iterative/dvc.org/blob/master/static/docs/philosophy/collaboration-issues.md
  2. Hold Your Machine Learning and AI Models Accountable (accessed 25.02.19): https://medium.com/pachyderm-data/hold-your-machine-learning-and-ai-models-accountable-de887177174c
  3. How to Manage Machine Learning Models (accessed: 25.02.19): https://www.inovex.de/blog/how-to-manage-machine-learning-models/
  4. Introducing Kubeflow – A Composable, Portable, Scalable ML Stack Built for Kubernetes (accessed: 26.02.19): https://kubernetes.io/blog/2017/12/introducing-kubeflow-composable/
  5. Machine-Learning im Kubernetes-Cluster (German, accessed: 25.02.19): https://www.heise.de/developer/artikel/Machine-Learning-im-Kubernetes-Cluster-4226233.html
  6. Machine Learning Workflow (accessed: 26.02.18): https://cloud.google.com/ml-engine/docs/tensorflow/ml-solutions-overview
  7. Pachyderm and Kubeflow integration (accessed: 26.02.18): https://github.com/kubeflow/kubeflow/issues/151
  8. Pachyderm File System (PFS, accessed: 26.02.18): https://docs.pachyderm.io/en/v1.3.7/pachyderm_file_system.html
  9. Provenance: the Missing Feature for Rigorous Data Science. Now in Pachyderm 1.1 (accessed 25.02.19): https://medium.com/pachyderm-data/provenance-the-missing-feature-for-good-data-science-now-in-pachyderm-1-1-2bd9d376a7eb
  10. Reproducibility in ML: Why It Matters and How to Achieve It (accessed: 25.02.19): https://determined.ai/blog/reproducibility-in-ml/
  11. Reproducible data science: review of Pachyderm, Data Version Control and GIT LFS tools (slides, accessed: 25.02.19): https://www.slideshare.net/joshlk100/reproducible-data-science-review-of-pachyderm-data-version-control-and-git-lfs-tools
  12. The Data Science – Bill of Rights (accessed: 25.02.19): https://www.pachyderm.io/dsbor.html

Experiences from breaking down a monolith (3)

Written by Verena Barth, Marcel Heisler, Florian Rupp, & Tim Tenckhoff

DevOps

Code Sharing

Building multiple services hold in separated code repositories, we headed the problem of code duplication. Multiple times a piece of code is used twice, for example data models. As the services grow larger, just copying is no option. This makes it really hard to maintain the code in a consistent and transparent way, not to mention the overhead of time required to do so. In the context of this project, this issue was solved by creating an own code library. Yes, a library with an own repository which not directly builds an executable service. But isn’t it much work to always load and update it in all the services?  Yes it is – as long as you are not familiar with scripting. Therefore the build management tool gradle is a big win. It gives you the opportunity to write your own task to be executed, such like the packaging of a java code library as maven package and the upload to a package cloud afterwards. Good thing there is the free package host provider packagecloud.io around, which allows a storage size of 150MB for free. When the library was hosted online, this dependency could easily loaded automatically by the gradle dependency management.

By the use of this approach, the code development process could focus on what it really needs to – the development and not code copying! Also the team did more think about how to design the code to be more flexible, to be able to reuse it in another service. Of course it was an overhead of additional work, but the advantages outweigh. If the library is to be updated, this could achieved by an increment of its version number. Therefore all services can change only the version number and get the new software automatically updated.

CI/CD

To bring development and operations closer together we set up a CI/CD-Pipeline. Because we wanted to have a quick solution to support the development as fast as possible by enabling automated builds, tests and deployments, we had to choose a tool very early. We came up with the alternatives GitLab hosted by our University or setting up Jenkins ourselves. We quickly created the following table of pros and cons and decided to use HdM’s GitLab mainly because it is already set up and contains our code.

Our first pipeline was created ‘quick and dirty’, and it’s main purpose was just to build the projects with Gradle (in case of a Java project), to run it’s tests and to deploy it to our server. In order to improve the pipeline’s performance we wanted to cache the Gradle dependencies which turned out to be not that easy. Building the cache as the official GitLab Docs described it did not work, neither the workaround to set the GRADLE_USER_HOME variable to the directory of our project (which was mentioned very often, e.g. here and here). The cache seemed to be created but was deleted again before the next stage began. We ended up pushig the Gradle Wrapper in our repository as well and using it to build and test our application. Actually it is recommended anyway to execute a build with the Wrapper to ensure a reliable, controlled and standardized execution of the build. To make use of the Wrapper you need to make it executable (see “before_script” command in the code below). Then you’re able to build your project, but with other commands, like “./gradlew assemble” instead of “gradle build”.

image: openjdk:11-jdk-slim-sid

stages:
 - build
 # [..]

before_script:
 - chmod +x gradlew
 - apt-get update -qy

build:
 stage: build
 script:
    - ./gradlew -g /cache/.gradle clean assemble

# [..]

In the end we improved the time needed from almost four to about two and a half minutes.

Having this initial version in use we spent some more time on improving our pipeline. In doing so we found some more pros and cons of the different tools we compared before and a third option to think about.

The main drawbacks we found for our current solution were, that HdM does not allow docker-in-docker (dind) due to security reasons and GitLab container registry is disabled to save storage. In return we read that the docker integration is very powerful in GitLab. The added option GitLab.com could solve both the problems we had with HdM’s GitLab. But we came up with it too late in the project because we were already at solving the issues and didn’t want to migrate all our repositories. Also company-made constraints might always occur and we learned from solving them.

Our GitLab Runner

To solve our dind problem we needed a different GitLab Runner because the shared runners provided by HdM don’t allow docker-in-docker for security reasons. Trying to use it anyway makes the pipeline fail with logs containing something like this:

docker:dind ...
Waiting for services to be up and running...
*** WARNING: Service runner-57fea070-project-1829-concurrent-0-docker-0 probably didn't start properly.
Health check error:
service "runner-57fea070-project-1829-concurrent-0-docker-0-wait-for-service" timeout
Health check container logs:
Service container logs:
2018-11-29T12:38:05.473753192Z mount: permission denied (are you root?)
2018-11-29T12:38:05.474003218Z Could not mount /sys/kernel/security.
2018-11-29T12:38:05.474017136Z AppArmor detection and --privileged mode might break.
2018-11-29T12:38:05.475690384Z mount: permission denied (are you root?) 
*********

To use our own runner there are some possibilities:

  1. Install a runner on a server
  2. Install runners locally
  3. Integrate a Kubernetes cluster and install a runner there

Since we already have a server the first option is the easiest and makes the most sense. There are tutorials you can follow straight forward. First install the runner and then register the runner for each GitLab repository you want to allow to use this runner. The URL and token you need to specify for registration can be found in GitLab under Settings -> CI/CD -> Runners -> Set up a specific Runner manually.  It is also help provided to choose the executor, which needs to be specified on registration.

We chose Docker as executer because it provides all we need and is easy to configure. Now the runner can be started with “gitlab-runner start”. To be able to use docker-in-docker some more configuration is necessary but all changes to the config file “/etc/gitlab-runner/config.toml“ should automatically be detected and applied by the runner. The file should be edited or modified using the “gitlab-runner register” command as described here. For dind the privileged = true is important that’s why it already occurred in the logs above. Finally Docker needs to be installed on the same machine as the runner. The installation is described here. We chose to install using the repository. If you don’t know which command to choose in step 4 of “Set up the repository” you can get the information with “uname -a”. We also had to replace the “$(lsb_release -cs)” with “stretch” as mentioned in the Note. To figure out the parent Debian distribution we used “lsb_release -a“.

Pipeline Setup

Now that we solved our docker-in-docker problem we can set up a CI pipeline that first builds our project using a suitable image and then builds an image as defined in a corresponding Dockerfile.

Each service has its own Dockerfile depending on it’s needs.For the Database service image for example we need to define many environment variables to establish the connection between the database and message broker. You can see it’s Dockerfile below.

FROM openjdk:8-jdk-slim

RUN mkdir /app/
COPY build/libs/bahnalyse-database-service-1.0-SNAPSHOT.jar /app
WORKDIR /app

ENV RABBIT_HOST 172.17.0.2
ENV RABBIT_PORT 5672

ENV INFLUXDB_HOST 172.17.0.5
ENV INFLUXDB_PORT 8086

CMD java -jar bahnalyse-database-service-1.0-SNAPSHOT.jar

The frontend Dockerfile is splitted in two stages: The first stages builds the Angular app in an image which inherits from a node image version 8.11.2 based on the alpine distribution. For serving the application we use the nginx alpine image and move the dist-output of our first node image to the NGINX public folder. We have to copy our nginx configuration file, in which we define e.g. the index file and the port to listen to, into the new image as well. This is how the final frontend Dockerfile looks like:

# Stage 1 - compile Angular app

FROM node:8.11.2-alpine as node

WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build

# Stage 2 -  For serving the application using a web-server

FROM nginx:1.13.12-alpine

COPY --from=node /usr/src/app/dist /usr/share/nginx/html
COPY ./nginx.conf /etc/nginx/conf.d/default.conf

Now let’s look at our gitlab-ci.yml file shown below:

image: docker:stable
 
variables:
  DOCKER_HOST: tcp://docker:2375/
  DOCKER_DRIVER: overlay2
 
services:
  - docker:dind
 
stages:
  - build
  - test
  - package
  - deploy
 
gradle-build:
  image: gradle:4.10.2-jdk8
  stage: build
  script: "gradle build -x test"
  artifacts:
    paths:
      - build/libs/*.jar
 
unit-test:
  image: gradle:4.10.2-jdk8
  stage: test
  script:
    - gradle test
 
docker-build:
  only:
  - master
  stage: package
  script:
  - docker build -t $CI_REGISTRY_IMAGE:latest -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
  - docker login -u token -p $IBM_REGISTRY_TOKEN $CI_REGISTRY 
  - docker push $CI_REGISTRY_IMAGE:latest
  - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
 
server-deploy:
  only:
  - master
  image: kroniak/ssh-client
  stage: deploy    
  script:    
  - echo "$CI_SSH" | tr -d '\r' > pkey
  - chmod 400 pkey    
  - ssh -o stricthostkeychecking=no -i pkey root@bahnalyse.mi.hdm-stuttgart.de "docker login -u token -p $IBM_REGISTRY_TOKEN $CI_REGISTRY && docker-compose pull bahnalysebackend && docker-compose up --no-deps -d bahnalysebackend"

Compared to our first version we now make use of suitable Docker images. This makes the jobs faster and the file clearer. Most of the first parts are taken from this pretty good tutorial, so we’ll keep the explanations short here. At first we specify docker:stable as default image for this pipeline. This overrides the one defined in the runner configuration and can be overridden in every job again. Using the “services” keyword we also add docker-in-docker to this image. The variable DOCKER_HOST is required to make use of dind because it tells docker to talk with the daemon started inside of the service instead of the default “/var/run/docker.sock” socket. Using an overlay storage driver improves the performance. Next we define our stages “build”, “test”, “package” and “deploy” and then the jobs to run in each stage.

The gradle-build job now uses the gradle image with the version matching our requirements. This includes all the dependencies we need to build our jar file with “gradle build”. We use the -x test option here to exclude the tests because we want to run them in a separate stage. This gives a better overview in the GitLab UI because you see what went wrong faster. Using “artifacts” we can store the built jar file to the specified path. There it gets available for other jobs as well as downloadable from the GitLab UI.

In the test stage we simply run our unit tests using “gradle test”. This needs to compile again because we excluded the tests from the jar in our build task.

In the package stage we create a Docker image including our jar file. Using the “only” keyword we specify that this should only happen in the master branch. The first line of the “script” block uses a backend Dockerfile mentioned above in the root directory of the project (specified by the dot at the end of the line) to create the image.

For the following steps to work we need to solve our second problem: the absence of the GitLab Container Registry in HdM’s GitLab. A registry is a storage and content delivery system, holding named Docker images, available in different tagged versions. A common use case in CI/CD is to build the new image in the pipeline, tag it with something unique like a timestamp and as “latest”, push it to a registry and then pull it from there for deployment. There are alternatives to the registry integrated in GitLab we will discuss later. First let’s finish the explanations of the yaml file. We followed the just described use case of the registry. As something unique we chose the commit hash because the images get saved with a timestamp in the registry anyway. It is accessible using the predefined environment variable $CI_COMMIT_SHA. We also defined environment variables for the login credentials to the registry so that they don’t appear in any files or logs. Using environment variables like the name of the image can also help to make the registry easier exchangeable because this file could stay the same and only the variables would need to change. They can be defined in the GitLab UI under Settings -> CI/CD -> Environment variables.

In the deploy stage we used a public image from docker hub that has ssh installed so that we don’t have to always install it in the pipeline what costs time. A more secure solution would be to create such an image ourselves. We login to our server using a ssh key saved in the CI_SSH environment variable. Then run the commands on the server to login to our registry, pull the latest image and start it. To pull and start we use docker-compose. Docker Compose is a tool for defining and running multi-container Docker applications. It is mainly used for local development and single host deployments. It uses a file by default called docker-compose.yml. In this file multiple services can be defined with the Dockerfiles to build them or with the name including registry to get them from as well portmappings and environment variables for each service and dependencies between them. We use the –no-deps option to restart only the service where the image has changed and -d to detach it into the background otherwise the pipeline never stops.

Choosing a Registry

Since we cannot use the registry integrated into GitLab we considered the following alternatives:

  1. Set up our own registry
  2. Use docker hub
  3. Use IBM Cloud Registry (or other cloud provider)

The first approach is described here. Especially making the registry accessible from outside e.g. from our pipeline make this approach much more complicated than the other solutions. So we discarded this one.

Instead we started out using the second approach, docker hub. To login to it the $CI_REGISTRY variable used in the gitlab-ci.yml file should contain “index.docker.io” or it can just be omitted because it is the default for the docker login command. Besides the ease of use the unlimited storage is its biggest benefit. But it has also some drawbacks: You get only one private repository for free. To use this repository for different images makes it necessary to distinguish them using tags what is not really their purpose. Also login is only possible with username and password. So using it from a CI pipeline forces a team member to write its private credentials into GitLab’s environment variables where every other maintainer of this project can read them.

For these reasons we switched to the IBM Cloud Registry. There it is possible to create a user with its own credentials only for the pipeline using the IBM Cloud IAM-Tools or just creating a token to use for the docker login. To switch the registry only the GitLab environment variable $CI_REGISTRY needs to be adjusted to “registry.eu-de.bluemix.net” and the login needs to be updated, too (we changed from a username and password approach to the token one shown in the file above). Also the amount of private repositories is not limited and you get another helpful tool on top: Vulnerability-Checks for all the images. Unfortunately the amount of free storage is limited. Since our images are too big we got access to HdM’s paid account. So to minimize costs we had to ensure that there are not too many images stored in this registry. Since logging in to IBM Cloud’s UI and removing old images manually is very inefficient we added a clean-up job to our pipeline.

The possibilities to such a clean up job work are quite limited. There is no simple docker command for this, like docker login, push or pull. Probably the most docker-native way is would be using the docker REST API as described here. But this is only accessible for private cloud customers at IBM. The other approach described in the mentioned blogpost is deleting from the filesystem what is even less accessible in a cloud registry. So we have to use an IBM Cloud specific solution. Some fellow students of us had the same problem and solved it using the IBM Cloud CLI as described in their blogpost. We were looking for a solution without the CLI-tools for IBM Cloud and found a REST API that could do the job which is documented here. But for authorization you need a valid bearer token for which to receive in a script you need to use the CLI-tools. We chose to use this API anyway and ended up with the following additional job in our gitlab-ci.yml file:

registry-cleanup:
  stage: deploy
  script:
  - apk update
  - apk add curl
  - curl -fsSL https://clis.ng.bluemix.net/install/linux | sh
  - ibmcloud plugin install container-registry
  - apk add jq
  - ibmcloud login --apikey $IBM_API_KEY -r eu-de
  - ibmcloud iam oauth-tokens | sed -e 's/^IAM token:\s*//g' > bearertoken.txt
  - cat bearertoken.txt
  - >-
      curl
      -H "Account: 7e8029ad935180cfdce6e1e8b6ff6910"
      -H "Authorization: $(cat bearertoken.txt)"
      https://registry.eu-de.bluemix.net/api/v1/images
      |
      jq --raw-output
      'map(select(.RepoTags[0] | startswith("registry.eu-de.bluemix.net/bahnalyse/testrepo")))
      | if length > 1 then sort_by(.Created)[0].RepoTags[0] else "" end' > image.txt
  - >-
       if [ -s image.txt ] ;
       then 
       curl -X DELETE
       -H "Account: 7e8029ad935180cfdce6e1e8b6ff6910"
       -H "Authorization: $(cat bearertoken.txt)"
       https://registry.eu-de.bluemix.net/api/v1/images/$(cat image.txt) ;
       else
       echo "nothing to delete" ;
       fi

We run it at deploy stage so it could run in parallel to the actual deploy job if we had more than one runner.

First we install the required tools curl, IBM Cloud CLI and jq. This should be done by creating and using an appropriate image later. Then we login using the CLI-tools and get a bearer token. From the answer we need to cut off the beginning because it is (sometimes) prefixed with “IAM token: “ and then write it into a file. Curl is used to call the REST API with the headers for authorization to set and receive all the images available in our registry. We pipe the output to jq which is a command line tool to parse JSON. We select all the images with the same name as the one we just created. If there are already more than two we sort them by the created timestamp, grab the oldest one and write its name, including the tag, to file. If there are only two or less of these images we create an empty file. The –raw-output option of jq omits the quotes that would be around a JSON output. Finally we check if the file contains an image and delete it via API call if there is one. Somehow the else block, telling that there is nothing to delete, doesn’t really work yet. It is probably something wrong with the spaces, quotes or semicolon, but debugging a shell script defined in a yaml file is horrible so we’ll just live with our less talking pipeline. The yaml format also makes the >- at the beginning of a command necessary, otherwise the yaml is invalid. In our case an error like “(<unknwon>): mapping values are not allowed in this context at line … column …” occurred.

Conclusion

Our aims for the implementation of the application Bahnalyse was to play around with modern technologies and practices. While learning a lot about architectural patterns (like SOA and microservices), cloud providers, containerization and continuous integration, we successfully improved the application’s architecture.

We found out that the pure implementation of architectural principles is hardly possible and rarely makes sense. Although we initially wanted to split our monolith up into several microservices we ended up creating a SOA which makes use of both, a microservice and services which are composed or make use of other services. To put it in a nutshell, we can conclude there might never be a complete roadmap on which architecture or technology fits your needs the best. Further, a microservice architecture is not the universal remedy, it also entails its drawbacks. In most cases you have to evaluate and compare those drawbacks of the different opportunities available and decide which really succeeds your business case.

Outlook

Further points to take a look at would be improving our password management. Currently we save our credentials in GibLab’s environment variables which offers a security risk, because in this way every maintainer working at our project with GitLab is able to see them. We want to avoid this e.g. by outsourcing it to a tool like a Vault by HashiCorp. It is a great mechanism for storing sensitive data, e.g. secrets and credentials.

Another thing to focus on is the further separation of concerns into different microservices. A perfect candidate herefore is the search service of which the frontend makes use of to autocomplete the user’s station name input. It’s independent of any other component and just sends the user input to the VVS API and returns a collection of matching station names.

Finally deploying Bahnalyse to the cloud would be an interesting thing for us to try out. We already figured out which cloud provider fits our needs best in the first part of our blog post series. The next step would be to explore the IBM Cloud Kubernetes service and figure out the differences between deploying and running our application on a server and doing this in the cloud.

Building a fully scalable architecture with AWS

What I learned in building the StateOfVeganism ?

Final setup for the finished project (created with Cloudcraft)

By now, we all know that news and media shape our viewson these discussed topics. Of course, this is different from person to person. Some might be influenced a little more than others, but there always is some opinion communicated.

Considering this, it would be really interesting to see the continuous development of mood communicated towards a specific topic or person in the media.

Continue reading