Conf42 Chaos Engineering 2023 - Online

Streaming Aggregation on Cloud Scale Telemetry

Video size:


How does one serve telemetry metrics at cloud scale? At Confluent, raw telemetry is flowing in at 5 million metrics per second. This session will showcase how we tackled these challenges with Kafka Streams and Apache Druid.


  • Shay Lin talks about the streaming aggregation of cloud scale telemetry. Telemetry is the unit measurement for observability as these foundation of chaos engineering. This is often considered as the prerequisite of continuous experiment. We talk about Kafka reliability in context of KLS engineering.
  • This is actually made available in some of these databases today. Your partition strategy should be highly coherent with how you're aggregating your data. The more dimension your data set have, the more likelihood that you're going to serve a broader set of use cases.


This transcript was autogenerated. To make changes, submit a PR.
Hi everyone, this is Shay Lin. This talk I'm going to talk about the streaming aggregation of cloud scale telemetry. First we are going to talk about why were we even talking about telemetry? In what context? Then we are going to touch upon the evolution of telemetry serving platform at confluent and finally deep dive into the architecture. Telemetry is the unit measurement for observability as these foundation of chaos engineering. As we all know, observability is here to establish measurements for the knowns. It helps us to discover the unknown, unknowns. And finally, this is often considered as the prerequisite of continuous experiment. So we're going to talk about Kafka reliability in context of KLS engineering. For those who aren't familiar with Apache Kafka, the Kafka brokers are a compute unit within the Kafka cluster. By nature, brokers handle read and write requests. Simple. It also handles data replication and the partitioning strategy. As simple as it sounds, if we zoom into any given broker, there are quite a few things happening. There's your network threads, there's your IO threads, there's caching disk I o. Everything is happening. So now that we mastered our knowledge of Apache Kafka, let's try and do a global deployment as an SRE. If you look at this graph, you're scared. If your backend engineer or product manager approach you and say, hey shay, can we roll out this feature tomorrow please? And your boss asks, what if us west two is down? Those are the questions you have to answer in order to support continuous rollout and experimentation of your products. The first thing an SRE would do is to start instrumenting telemetry. It means having your client send out telemetry, having your brokers send out telemetry so you have your basics elements coming in. It would probably look like something like this, different elements coming from everywhere and all into a big giant data store. So what do we do about it? After a while, we quickly find out. The usage patterns are as follows. Number of total network invites in the last hour. We're looking for facts. We're also looking for trends. For example, what's the cpu usage in the last seven days? We also try to use this data for diagnostics. And more importantly, things also start to become complex in terms of attribution. You want to see whether there are potential correlations between cpu usage, trend and fetch requests. Think of a typical datadog dashboard. Now that we have a data store rolling and all the telemetry rolling up flowing in things start to make sense a little bit you might have based on the data that you are looking at, you have some basic ideas like clients are often getting declined for unknown reasons and you come up with certain hypotheses such as your fan out could be a problem. Your deployment cannot just support too many connections at the same time. But in many situations, a simple data store cannot support these complex diagnose or attribution analysis. That's where we bring in time series optimized data stores, otherwise known as OLAP datastore. We can use Apache Druid as an example. Here in the bottom, we can simply assume data is flowing in in these near real time manner into the segments. We have a query engine that's smart enough to take in the incoming queries and then fetch data in the corresponding segments. So this works great. Now we have better visibility and faster compute in order to answer questions such as number of fetch requests. As your business grow, you'll probably run into these situation. Your segments are getting filled up quickly. So how does segments even get filled up in the first place? I'd like to think of it as filling a cup of water. You have continuous water raw telemetry that's coming in from a fire. These each water bottle has a capacity limit. Once it's filled up, we push this away, bring the new water bottle and go from there. So you will see segments gets created all the time. And just to put things in perspective, oftentimes for cloud providers like confluent data is flowing in at millions of events per second. And then you can easily accumulate petabyte scale amount of data within an hour. On the consumption side, which is on the upper part of this diagram, you have increasing amount of interest of pulling similar data set. Give me the number of fetch requests for the Kafka cluster XYZ in the last hour and the question is often asked by many different parties. You have your internal customers such as billing Sres, who's making the continuous monitoring, and then on the other side of the house we have your external customers. Obviously for privacy reasons, you don't want external customers to be able to view data of other customers. However, they are very interested in how their own kafka cluster is doing. When we look at the status of these things, to serve highly confluent ingestion and queries at the same time is extremely hard. Some of the concerns that arises are repetitive queries on hot entities. The compute and serving costs are getting extremely expensive. As you can imagine, segments have been created, queries coming in concurrently. And more importantly, it's very hard to control what customer a thinks about this metric versus what billing thinks about this metric. And the goal is to have everyone come to the consensus of hey, this is how we define this particular metric and go from there to solve the problem of serving telemetry at scale. The solution that we landed on is to push asynchronous aggregations through kafka. What that means is, yes, data is still falling into these raw segments. That's all the way on the left. Instead of serving repetitive queries to many different customers and internal usages, the goal is to publish predefined aggregations that are commonly reused everywhere. This is done through Kafka streams, which I'll talk more in a little bit, and then once that's pre aggregated, once the telemetry gets aggregated, we publish it back to our customers as well as druid to store the aggregation segments. Here we have a nice set of Kafka stream topologies. How it works is these is all event driven. Once a segment gets filled up, there are certain signals being sent out. So our service essentially listens to signals coming out of the segments and then start to compute from there. We have a central repository registry for all the metric definitions. That means what are the predefined aggregations that we want to serve? In the first topology, there is a global task manager that tries to break down the task into these smallest unit that we can work with oftentimes. That depends on how many clusters we need to aggregate on what's the metric granularity in terms of space and time. Once the aggregation tasks are created, things are straightforward subtopology. Two in the middle handles all the processing request with external compute. And finally, those predefined aggregations are published into Kafka. Again here. Just to note, there are additional processing that might be needed per your consumption contract. For example, here we are using open telemetry for publishing telemetry metrics. That means certain semantics are applied upon what if it's a content metric, it's a gauge metric, et cetera. You want to make sure that you process and maintain the state in order for your consumers to consume it correctly so that the data isn't duplicated over time. Aside from the Kafka streams solution that we landed on, there are obviously alternative architectures to support this. There are two general routes. One offline custom roll up tasks. This is actually made available in some of these databases today. For example in P zero. This is done through Star Tree index. Essentially, if we look at the current data set, there's likely five different dimensions, and then the goal is to reduce it into, say, three dimensions out of the five. This is done by setting the Star Tree index, and Pinot will handle it through background tasks called mediums. On the other side, our party druid also offers injecting custom roll up tasks and handle it for you. On the other hand, of the things we can always process raw telemetry through stream processing. What we did find out through POC is that your partition strategy should be highly coherent with how you're aggregating your data. If you're not doing so, chances are that you're replicating the data set throughout the process processing schemes, which is often less agile and very expensive. With the scale of data that we are talking about picking one of the architectures, it is understandable that you always need to look at your current state of systems. For example, how's the data store cluster doing? Is it powerful enough to do some additional custom roll ups? How about adding additional clusters? And on the other hand, you need to understand your consumption semantics well, to understand whether shrink processing will meet your needs, we're talking about two dimensions here. One processing through space, other processing through time. And the more compressed your data is, which is these lower end of this curve, it's likely that your compressed data is only going to be able to serve a narrower set of use cases. However, on the other hand, the more dimension your data set have, the more likelihood that you're going to serve a broader set of use cases. So defining what set of predefined aggregations so defining what set of predefined aggregations you want to perform in this kind of aggregation platform in this kind of aggregation engine is key to success. So now we're getting into a much better state. As a site reliability ability engineer, you look at this graph at any given time, correlations are very clear attribution. It's very easy to dive into a specific region.

Shay Lin

Staff Software Engineer @ Confluent

Shay Lin's LinkedIn account Shay Lin's twitter account

Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways