Conf42 Machine Learning 2024 - Online

Optimising Apache Spark and SQL for improved performance

Abstract

Unlock the secrets of Apache Spark optimisation. Dive into manual aspects and technical know-how for enhancing performance. Our talk provides practical solutions to optimise Spark’s functionality, empowering you with the skills to achieve peak performance in your data-driven projects.

Summary

  • Today we are going to discuss performance tuning of Apache Spark. Spark three is quite good, is good enough. But problems are still happening every now and then. If you know how to tune the performance, then you save time and money.
  • Martin Martin is CEO of Tantus Data. The company helps customers with everything data related. Today he will share some of the lessons learned from multiple projects. The techniques we will be applying are very much matching what you would do in production.
  • The Sparkui job has been running for 23 minutes. The output size so far is quite small. But the PO is 150 gigs of memory spill, 80 gigs of disk spill. That already shows us that there is some inefficiency. Whether we should be optimizing that or not, it very much depends.
  • Shuffle is something expensive in general, because it involves disk, network and so on. There are many things which are going wrong with exchanging of the data with. In this code we are doing just a single shuffle. So let's try to dig into the problematic tasks, the task which keep restarting.
  • How do you fix a skew in your data? First of all, you really need to understand where the problem is coming from. Upgrade Spark if you are not on spark version three yet. In many cases, Spark three allows you to avoid this kind of manual fixes.
  • The problem here is that all the records are processed within just a single executors. The simple technique to avoid this kind of behavior is salting in the product table. Instead of processing all the nine records in the same task, it will be processed by two separate tasks.
  • Lazy evaluation is something which is thought at the very beginning of every single spark curse. In some cases it really leads to growing complexity of the execution plan. If your execution plan is growing out of hand, it's worth considering checkpointing or materializing results on the way.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi guys. Today we are going to discuss optimizing of Apache Spark. Today we are going to discuss performance tuning of Apache Spark. And you might ask, why would I care? Spark three is quite good, is good enough. I don't really need to pay attention to that. But the reality is that problems are still happening every now and then. So if you know how to debug them, if you know how to tune the performance, then you save time of yourself, you save time of your team, but that also means saving money you spend on the infrastructure, saving money you spend on your cloud. Bill my name is Martin, I'm CEO of Tantus Data. At Tantusdata we help our customers with everything data related, from infrastructure through data engineering up to machine learning in production. And we also help with trainings. And today I'll share our lessons learned from multiple projects we have been helping with. And let me get started with short history of Spark. Because depending of which spark version you are working with, which spark version you had experience with, it very much depends on what impressions you've had, what experience you've had. So Spark version one was very unstable construct and you very often had to run to your admins, to your ops for some help for restarting of some services. Then Spark version two was much more stable. But still there was quite a big chance that you run a sequel, a sequel, which is perfectly correct, but it doesn't work. It doesn't work because of the data distribution, it doesn't work because of size of the data. If it doesn't work, if you don't know how to solve this kind of problem, all you could do, you could go and run asking for help or randomly selecting some random parameters with some hope that it will help, but usually it need not help. And then with Spark version three, the situation improved significantly because many of the common problems have been resolved. You have adaptive query execution, which basically does lots of optimizations for you, but still there is a chance it will fail, it will be too slow and you have to do the manual tuning. And today I'll show you some of these examples, some of examples which are not fixed even if, even in spark three, if SQL is good enough these days, in many cases, yes, in many cases you are good with SQL or with understanding of spark API, but I'll be sharing the cases where SQL is just let's get started with case number one and a short disclaimer is about what kind of cases. I'm showing cases which are based on production use cases, but they are very much simplified so they are simplified. We are dealing with very, very simplistic schema. We are working with very simplistic SQL, but the techniques we will be applying are very much matching what you would do in production. So how you debug, how you find out what the problem is, how you fix that, it's pretty much production like. And so it's very practical. So let's get started with case number one. In case number one, we will be processing events. We'll be processing a table with events. And that table will have just three columns, user id, event id, and a timestamp. And the whole point is that we are collecting some events, and let's say we are collecting them because we would like to understand how our users are interacting with a mobile app. So we understand where the glitch is, what is confusing to the users, and so on. And we would like to understand how much time they spend on a specific event. So we would like to calculate current event timestamp and the next event timestamp. So we understand which action is taking lots of time, which is potentially a confusing one. So we are searching for a potential of two to improve. So we are searching for the next event. We have some users and timestamp here we are looking at just single user. I pre sorted the event timestamp. We want to fill in an extra column which is next timestamp. And that is super simple on a data which is already pre sorted because the next timestamp would be exactly this one. And then for the second row, the next timestamp would be and so on. So it is quite simple. And if you would like to implement the code, then this is what it would look like in it doesn't look like a rocket science. We are just using a lead function. We do it over partition, data partition by user id and ordered by event timestamp. Super simple code. I don't really want to dig into that more, but the point is the code is perfectly fine, the code is correct. And let's see what will happen when you go to the Sparkui. If you go to the Sparkui, you will realize that the job is running. It has been running for 23 minutes. Nothing to be worried about yet. We don't know the size of the data yet. So might be perfectly fine, might be not. What's a bit worrisome is number of tasks which are failing. We have already three tasks which are failing, but that doesn't necessarily mean something is wrong. We are talking about distributed system. It could be a network glitch and spark is supposed to recover from some random failures. So that's the whole point of having system like spark. It will hide some of the complexities, some of the failures from you if it was just random. But let's go a bit deeper. Let's go into the job details. In the job details, we have a single active stage which has been running for 23 minutes, and we have 126 tasks completed. One task is still waiting, is still running. If we go to that stage details, we can see extra information. So, first of all, the output size so far is quite small. It's just five gigs. But the PO is 150 gigs of memory spill, 80 gigs of disk spill. So it's, it is quite a lot. And that already shows us that there is some inefficiency. Whether we should be optimizing that or not, it very much depends. It very much depends on what exactly we are optimizing for. And it doesn't necessarily mean that if you see some spill that you have to jump into the optimization, because it could be quite hard. Let's see if we actually can see anything else which is boring. So if we can have a look at the diagram, we can see lots of those green, very tiny tasks. So most of the tasks are completing in no time. There is this one task has been running for quite a lot of time. There is another one which has been running for some time, and it failed again. And it's attempt number one. The previous one was attempt number zero. Now it's attempt number one. So we keep restarting the same task failing, and it takes most of the job time. That's already something we should look closer into. So what we have now, we have another attempt of the same task, and it's running for the fourth time. In the task overview, we can see all the tasks which has failed. We can see the single one which is running. This memory spill is actually produced only, but by those tasks, there is no more memory spill. We can see that they are taking some time, as we could see on the graph. And we also can see some, we can see some errors, and they are not very descriptive, maybe, except the disk space one. But overall, we can see errors, which doesn't really give us a good clue why it's happening. And other than that, we can see other tasks which are completing within 10 seconds, which are producing like 40 megabytes of data. Nothing really suspicious about that. So let's try to dig into the problematic tasks, the task which keep restarting. If we look here, we can see the view from the SQL tab in spark and that gives us pretty much an execution plan with lots of details. First information we can see is that we are reading 100gb of data, nothing very special. It's 1 billion records. It is a lot, but it not something we shouldn't be able to process, right? Right. We are dealing with a distributed system, we are dealing with very small records. It shouldn't be too bad. And if we scroll down, we can see this first box we always should be paying attention to is called exchange. And that's pretty much means we are shuffling the data over the network. We are exchanging the data over the network. And this is something you need to pay attention to simply because first of all, it's expensive in general. Shuffle is something expensive in general, because it involves disk, network and so on. But other than that, it's good to pay attention to these kind of boxes. There are many things which are going wrong with exchanging of the data with. And in this code we are doing just a single shuffle. It's good to hover over the exchange box and understand what it's caused by. And it's here we are hash partitioning based on user id simply because we are explicitly in the, in the SQL code, we tell that we want to partition the data based on user id. But if your code is much bigger, it's good to understand what is the correlation between what in the spark UI and part of the code. And that helps you understand that, because usually you join by, you have multiple joins, you join by multiple different columns. And this is a good hint to narrow it down. Which part of the code is responsible for a specific box in the execution plan in the SQL tab? Okay, but if we check how many bytes were written by the shuffle, we are talking about 40 gigs. And the top partition is 45 megabytes. It is small, nothing to be worried about. But on the other hand, how many bytes have been after the shuffle, then it's completely different story. The top partition is 6gb, and that's huge. That means top partition is 67gb and that partition will be processed by just a single, by a single task. And that partition will be causing the problems because the, the whole rule is that single partition is processed by a single task. So the way it looks like is that whenever you read the data here we have virtual slide with hdfs. But that applies to s three, that applies to delta. Whatever data source you have, parc always have the concept of partition in mind, and it reads the data in chunks. So you read a chunk of the data within a task, you do some processing on the fly and eventually you prepare the data for shuffle. Here we are organizing the data based on the user id, simply because we are partitioning by user id. But it's the same with joins. If you join by a specific column, the data will be organized by that column, and then you have many tasks like that. And what will happen during the shuffle? All the chunks with specific user will go to the same, and that's perfectly fine. And the second user will probably go to another task, and that's all perfectly fine as long as the data can fit into the executor, as the data can be processed by the executor. If your user's user base, the event base is not balanced, if single user is producing too many events, you have a problem. Then you have a problem with maybe just a single task which cannot really process that data. And these kind of problems were very common in spark version two. Then with adaptive query execution it's been improved because for instance, like if you have this problem with left join, it's automatically balanced. But we are looking at the window function example where it is not balanced and we are struggling with that. That's exactly the situation we are struggling with. So how do we solve that? How do you fix a data which is imbalanced? How do you fix a skew in your data? First of all, you really need to understand where the problem is coming from and that your spark job is failing because of the skew. And once you understand that, then you can think, okay, maybe it is a bug, maybe it is a bug in my data, and then you simply have to fix that. Maybe you can think of filtering out the problematic users because in that given processing, you don't really need them. If that doesn't help, then you need to find out a way, figure out a way of completing the processing. And before I show you how to do that in this specific use case, I just want to make one more note, one more note, which is upgrade Spark if you are not on spark version three yet, because the problem I'm showing you is still something you have to fix manually. But in many cases, Spark three allows you to avoid this kind of manual fixes. So for instance, if you are doing a left join, Spark would automatically fix it for you if you are with the newest version. So keep that in mind, but let's see how we can potentially solve this problem. So the scenario I showed you is that we are calculating the next timestamp, so we are pulling it from the next record and so on. And the scenario I'm showing you is that we are calculating user one data in the same partition. But if this user is producing so many records, why don't we process that user or all the users, so we don't have to process all the, all the records at once. We have, we can process them day by day, and that's quite easy to implement. The only problem with that will be that the last record of each day will be now, unless we fix that. And coming up with a fix is really not a rocket science. So if we look into that here, we are calculating an extra column, which is a bit of math to tell which day is that. So we are not dealing with timestamp anymore. We are also dealing with number of a day. We are defining a bunch of windows for the processing, because we will be processing the user not only based on user id, but also on a window which is user id and a day. And then we calculate a bunch of kind of helper data frames, which is the events with nodes. Then after that we fix the nodes and then we do the union and merge them together. But the whole point is not really how exactly this code works. It's not a rocket science. The whole point is that if we do that, if we do this kind of processing, we explicitly tell Spark that we want to do day by day processing. And this is what we will see in the spark UI. So we don't have this single event with or single task, which is taking most of the time and eventually failing. We see the distribution being very even and the top task is taking 210 seconds. You might argue that it probably not optimal either. But the point is the job was running for 20 minutes and it would never really complete. I just killed that. And now it's running less for less than 1 minute and it completes and everything is fine. So if you do this kind of optimizations, really need to understand why you are optimizing this. In our case, the job was just failing, so you had to fix that. But if you do further optimizations, you need to think what is worth your time. Maybe you want to just move on and move to another problem. Maybe you want to optimize for the wall clock time because your customers are waiting for the data. Maybe you want to optimize for the resource utilizer because you want to optimize the cloud bill. And maybe you want to avoid optimization because you don't want to complicate your code. You don't want to make your code more complex simply because you want the maintenance. All right, so let's have a look at case number two. Case number two, which on the surface could look very similar to the previous one, but it would be slightly different. So let's have a look. We are not looking into the code. We just try to very quickly figure out what's going on based on just Sparkui. So we have a spark drop, which is again running for 20 minutes. After an hour or so, I killed it because it could never really complete. And what we can see is a very similar situation to the previous job, which had tasks almost immediately completing. And this single task, which is problematic, which is running for probably it would be very hard to complete. And you might say, okay, but it looks like exactly the same situation. The difference would be when we look into active stages and if you look at the output and the shuffle read, it looks like we are processing almost no data. In the shuffle read phase we have just 20 megabytes. So it looks like we are processing almost no data. Yet the job cannot complete. In the SQL tab we can confirm that the input is small. After doing a bit of kind of self joining, we can see that the sortmer join phase is processing the record number, 4 billion something. And it, the number is growing. So even though we are reading not that many records, we are producing lots of record. I'll do a bit of a shortcut because of the time limitation, but bottom line is that products table with order id and we are joining the product table with itself based on the order id, because we would like to do an analysis of products r board. And the problem with Cartesian join, that if you have three records as an input size is nine, then for ten you get hundred in an output, but it grows really quickly. So for 10,000 records, you end up with hundreds, millions of records. It's nothing new. It's a problem which is known for a long time. You just need to be aware that if you are doing self joined, then the number of records will be huge. But the problem here is that all the records are processed within just a single executors. Single executor. Simply because Spark is doing that based on the order id is our joint key and the same order id is being processed in the same task. Unless we explicitly. The very simple technique to avoid this kind of behavior is salting in the product table. If we join it with itself, we on one hand we add a random value. Here I'm using the random value from range one up to two so it can fit into the slides right hand side table. We are generating every single possible combination of SAl. So we duplicate the records. But the benefit of doing that is that instead of all the nine records being processed in one task, with these three producing six records with the remaining three. So instead of processing all the nine records in the same task, it will be processed by two separate tasks. And this way we distribute the load. So just so we are on the same page doing something in cartesian join, we are not limiting the number of records. The number of records will still be the same. You just produce lots of records. But what you're doing is you're parallelizing the execution, you make it better distributed. So end of the day, we can see many tasks which are all doing something which are all running for quite some time, and the top task is running for eleven minutes. You could argue we could split it even further. The tasks are still processing quite a lot of data. Probably we can split that. But the point is we won't be waiting forever. This might be good enough. You might want to tune the saud value we used in this case, I believe we used 180 or something like that. So a bit more than on the. Alright, so let's have a look at case number three and case number four. And they are grouped together because they are both related to lazy evaluation. So lazy evaluation is something which is thought at the very beginning of every single spark curse. Yet it is something which is very easy to forget about. And the whole concept is that if you read some data frame and then you do some complex transformation, then that complex transformation does not happen immediately. The park just remembers what it has to do, and it does it when it's absolutely necessary. So it's a, it's an optimization concept, so it doesn't have to do every single action immediately. If you for some reason decide to do a while and then do this kind of complex transformation in a loop, you need to be aware that this will not be memorized anywhere. Spark doesn't have any cache for that which is happening immediately. Parc will be just remembering more and more operations to be done, and the complexity of every single iteration will grow. So the first iteration it takes 23 seconds and a few jobs to be completed. Then after some more iterations, it takes 1 minute and several jobs to be completed. And then it takes nine minutes and growing really fast. And the same with the complexity. Initial one is initial job is quite simple, then it's a bit more complex. And then we go here, we scroll down even more and we are keep repeating the same operations, but we do them from scratch in every iteration unless we do something about that. So what is the conclusion here. First of all, loops are suspicious because it's somewhat like doing a batch within a batch system so far has its own operators, which are batch operators. But we are using loops simply because we can. And sometimes it's okay, but in some cases it's really suspicious. In some cases it really leads to growing complexity of the execution plan. But you can grow the execution plan even without the loop. So if your execution plan is growing out of hand, if you can see that spark slowing down because of how complex, how many operations you have to do, then it's worth considering checkpointing or materializing the results on the way. And it's, on one hand, it's a good kind of hack, or it's a good, let's say, workaround for very big. But on the other hand, it could be good idea to split very complex code, very complex, dug into modules, and that could help, you could help even with the code maintenance. So it could serve two purposes. But other than that, something which is very much related to the lazy execution is whether our code is deterministic or not, because if it's not deterministic. So for instance, we want to randomly generate values because we want a unique. It becomes very tricky. It becomes very tricky because let's say we want to split our data frame into two data frames because of the machine learning, split the split for machine learning. Then if we write a code like that, we generate a column with random value and we generate two different data frames, and we hope that the split based on that random value is, is happening. So you have two data frames which are not overlapping. Then you might get surprised, because when you write data frame one, then you write data frame two. The random generation will happen from the very beginning, it will happen from scratch. And that's a real problem I actually have seen. It will fail. It will create overlaps, it will create lost records. And this is a LinkedIn message I received from a friend. He was fuming because of that. He spent lots of time. It was really significant bug because it was introducing an overfitting effect in their machine learning pipeline. So whenever you deal with random values, first of all, check if you don't have this kind of mechanism like splitting data frames in the standard library, because that you do have. But if you do something a bit more custom, then make sure you materialize result, because that's the safest. That is the safest one. It might be not the most kind of time efficient, but it's the safest. You don't risk that someone else will come do some modification in your code without knowing what is happening. So what will be the conclusion from all the examples I showed you? First of all, you really need to know what you are optimizing for, whether it's your time, the compute time, the wall clock time. It really helps if you understand where the bottleneck is. If these kind of cases are new to you, I would suggest that you invest a bit of time for experimenting and learning a bit more about how spark works under the hood. But if you feel that you don't have these kind of problems on daily basis, you should at least know when to ask for help and who to ask for help. Because maybe you are a data scientist. You want to focus on the data science, not spark debugging. You don't want to be a PhD in spark, but then at least know when you are blocked and make sure you know where to ask for help. So for instance, maybe there is a data engineer which is good at these kind of things. Feel free to reach out to me on LinkedIn and I will be very happy to talk to.
...

Marcin Szymaniuk

CEO @ TantusData

Marcin Szymaniuk's LinkedIn account



Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways