In a recent project, some of my fellow students and I developed a simple hosting provider that enables users to start Docker containers on a remote server using a CLI tool.
During this project, we developed a Go-based backend service that exposed a REST API to the client and handled the connection to Terraform via RabbitMQ, authentication via Keycloak, and data persistence using DynamoDB. This resulted in the backend being a crucial service that most functionality relied on. Therefore, it was important for us to enable monitoring of the backend. This included the collection of logs containing information about the current events inside the backend as well as the current health of the instance.
zerolog
As we were using zerolog to handle logging within the application, we encountered a simple issue: we needed the logs stored in a place that Grafana could easily query. While there are systems that would have worked for our Docker Compose setup, we found that pushing the logs to Loki from the application itself was the most usable and sustainable solution.
So we opted to use the hooks feature of zerolog to publish the logs to Loki. First off, what are zerolog hooks?
Zerolog allows us to pass in a struct with a Run function when creating the logger. This Run function is called whenever a log event is emitted. Therefore, hooks can easily be utilized to extend the functionality of zerolog. A simple example of this would be the following:
package main import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) func init(){ log.Logger = log.Hook(LokiHook{}) } func main() { for { log.Info().Msg("Sample log message") time.Sleep(5) } } type LokiHook struct { } func (h LokiHook) Run(e *zerolog.Event, level zerolog.Level, msg string) { e.Str("hello", "world") } |
This will produce the following output:
{"level":"info","time":"2024-02-28T16:18:25+01:00","hello":"world","message":"Sample log message"} |
Great! We have the ability to execute a function on a log event. But where do we go from here?
Loki
Well, first of all, it’s important to have a look at how log messages are published to Loki: Loki exposes an API that uses either JSON or protobuf to allow clients to push logs to the Loki instance. In our case, we will focus on the JSON side of things. The endpoint /loki/api/v1/push
expects a JSON payload of the following format:
{ "streams": [ { "stream": { "label": "value" }, "values": [ ["<unix epoch in nanoseconds>", "<log line>"], ["<unix epoch in nanoseconds>", "<log line>"] ] } ] } |
This means that we simply have to write a zerolog hook that publishes the current log line to Loki this way. That is easy enough, right?
While yes, this is simple to implement, it is easy to make a possibly fatal mistake. In our current approach, every log event results in an HTTP request. Now imagine a scenario where we wanted to horizontally scale our service. Given enough instances and users, we have essentially just created a tool to DDoS our own Loki instance. So obviously, we cannot proceed with this approach. Additionally, making an HTTP request is a rather slow process. Meaning depending on the frequency of log events, this may lead to a lot of hooks running at the same time. Mind you: All those hooks are goroutines. Given enough load, this could result in a stack overflow.
Implementation
Anyway, enough of the doomsday scenarios, let’s get to fixing:
The most straightforward approach to this problem is the introduction of a buffer. This buffer is filled with the log events that occur and once it reaches a certain size, it sends the entire batch of log statements at once. Using this tactic we can drastically reduce the amount of requests send to our Loki instance.
However, tying the log event push to the current size of the buffer has one major drawback. If for some reason there are no log events over a long period of time, this could result in logs being published with extreme delay. As a result of this, Loki would be an inconsistent data source and could be more confusing than helpful when investigating incidents.
This results in the need for a second condition that enables log events to be pushed to Loki. Instead of this second threshold being reliant on the current size of the buffer, it will simply depend on the time since the last time logs were pushed to Loki. To enable this check of the two threshold values, we will employ a simple goroutine that checks the time since the last push as well as the current size of the buffer. Additionally the buffer is split for every log level so the log level for the given events can be passed to Loki.
This results in a struct with the following properties and the function bgRun
as the base for the goroutine:
type LokiClient struct { PushIntveralSeconds int // This will also trigger the send event MaxBatchSize int Values map[string][][]string LokiEndpoint string BatchCount int } func (l *LokiClient) bgRun() { lastRunTimestamp := 0 isWorking := true for { if time.Now().Second()-lastRunTimestamp > l.PushIntveralSeconds || l.BatchCount > l.MaxBatchSize { // Loop over all log levels and send them for k := range l.Values { if len(l.Values) > 0 { prevLogs := l.Values[k] l.Values[k] = [][]string{} err := pushToLoki(prevLogs, l.LokiEndpoint, k) if err != nil && isWorking { isWorking = false log.Error().Msgf("Logs are currently not being forwarded to loki due to an error: %v", err) } if err == nil && !isWorking { isWorking = true // I will not accept PR comments about this log message tyvm log.Info().Msgf("Logs are now being published again. The loki instance seems to be reachable once more! May the logeth collecteth'r beest did bless with our logs") } } } lastRunTimestamp = time.Now().Second() l.BatchCount = 0 } } } |
Based on the afore mentioned spec of the Loki API implementing the HTTP call to Loki is also simple enough:
type lokiStream struct { Stream map[string]string `json:"stream"` Values [][]string `json:"values"` } type lokiLogEvent struct { Streams []lokiStream } /* This function contains *no* error handling/logging because this: a) should not crash the application b) would mean that every run of this creates further logs that cannot be published => The error will be returned and the problem will be logged ONCE by the handling function */ func pushToLoki(logs [][]string, lokiEndpoint string, logLevel string) error { lokiPushPath := "/loki/api/v1/push" data, err := json.Marshal(lokiLogEvent{ Streams: []lokiStream{ { Stream: map[string]string{ "service": "demo", "level": logLevel, }, Values: logs, }, }, }) if err != nil { return err } req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%v%v", lokiEndpoint, lokiPushPath), bytes.NewBuffer(data)) if err != nil { return err } ctx, cancel := context.WithTimeout(req.Context(), 100*time.Millisecond) defer cancel() req = req.WithContext(ctx) req.Header.Set("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() return nil } |
And thats almost all the work done! Lets just apply that to our little example from earlier:
package main import ( "strconv" "time" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) var lokiClient LokiClient func init() { lokiClient = LokiClient{ PushIntveralSeconds: 10, // Threshhold of 10s MaxBatchSize: 500, //Threshold of 500 events LokiEndpoint: "http://localhost:3100", BatchCount: 0, Values: make(map[string][][]string), } go lokiClient.bgRun() log.Logger = log.Hook(LokiHook{}) } func main() { for { log.Info().Msg("Sample log message") time.Sleep(1 * time.Second) } } type LokiHook struct { } func (h LokiHook) Run(e *zerolog.Event, level zerolog.Level, msg string) { lokiClient.Values[level.String()] = append(lokiClient.Values[level.String()], []string{strconv.FormatInt(time.Now().UnixNano(), 10), msg}) lokiClient.BatchCount++ } |
To see the whole example checkout the repository.
When using large batch sizes ensure that your Loki instance is configured to allow batches of the given size.
Leave a Reply
You must be logged in to post a comment.