Conf42 Machine Learning 2023 - Online

Orchestrating data and ML workflows with Apache Airflow

Video size:

Abstract

Automate MLOps and data pipelines in one data-driven, tool agnostic workflow with Apache Airflow and the Astro Python SDK. See how Airflow orchestrates the ingestion of brain tumour MRI data into DuckDB, as well as fine-tuning a HuggingFace classifier, all in one dynamic workflow!

Summary

  • The talk will talk about orchestrating data and ML workflows with Apache Airflow. The main part of the talk will be a walkthrough this data driven ML pipeline in airflow. And I will close with a few ML and airflow resources in case you want to get started on your own.
  • What is ML orchestration? It's a fairly new term. It is part or a subset of ML ops. You have overlap between machine learning, DevOps, and data engineering. And then over all of that, you have orchestration.
  • And this is what we call ML orchestration for this talk. If you are working in the ML sphere, you have your typical data science machine learning process. Along the way here you have a lot of steps that could be automated. This is where airflow comes in. We try to automate everything that is somehow possible to automate.
  • Apache Airflow is a tool to schedule and monitor your data pipelines or any other of your Python code. It's the most popular open source choice for this use case with 12 million downloads per month. There is a large and vibrant open source software community.
  • Airflow stands for directed acyclic graph. A dag is comprised of the different tasks. The other great strength of airflow are predefined operators. You can orchestrates data pipelines and ML pipelines in the same place.
  • Gliomas and meningiomas are two different types of brain tumors. Sometimes the differentiation between the two types is fairly easy. Of course, never conclusively. You would never make a conclusive diagnosis from one picture. But it is something that I could imagine an ML model could be learning.
  • Using Airflow and the astropython SDK, I created eight separate workflows. Each of those DAX has one task that updates what I call a data set. And then the testing part, the part that tests my fine tuned model runs every time that flag has been raised. This is a super new feature of Airflow 2.6.
  • The task called create DuckdB. Pool creates an airflow pool, which is an airflow specific feature that is a collection of worker slots. This is used to better handle resources to group tasks that you never want to use too many worker slots at the same time.
  • The Astro is an open source tool for next generation DAC offering. It tries to create tasks or utility functions that are tool agnostic in itself. With one change of a variable I could change any task that is using the astropython SDK from one to another blob storage.
  • So I wrapped the whole model fine tuning in a custom operator. That's why it's so great to have things in a modular way. Whenever this task completes successfully, the flag will go up to the airflow environment. I want to be notified if the task was successful. This is fully customizable.
  • Using Astro convenience tasks or Astro SDK operators. Can turn any SQL query into a task that runs on any table that you put in. Makes airflow much more modular and much more versatile and flexible if you ever change an underlying tool.
  • Astro CLi is another open source tool that helps you use airflow. Once you have the Astro CLI installed, you can run Astro start next and it will start up an airflow environment with two example dags for you ready to use. Here is how easy it is to create a custom operator.
  • This is just a very simple ML and data orchestrating pipeline. There is much more that you can do here. For example, you could use dynamic task mapping to fine tune your models. At scale, I would try to use a relational database with the option of parallel writing.
  • There are a lot of resources that are specific to ML use cases. The main take home message here is airflow is the one central place that can orchestrates both your data and your ML. Pipelines can have complex interdependencies and it is fully tool agnostic.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi and welcome to everyone who decided to watch this talk. I am very excited to talk to you about orchestrating data and ML workflows with Apache Airflow because there are quite a few topics that I want to talk about. I thought I'd give you a quick overview over what you can expect in this talk. First, I'm going to talk about the term ML orchestration. Next, just in case you've never used Airflow before, I'm going to give you a little crash course with everything you need to get started. Then I will talk about the data that I was analyzing that I built my pipeline around, and the main part of the talk will be a walkthrough this data driven ML pipeline in airflow, both with screenshots of the Airflow UI and with little code snippets highlighting certain airflow features that I think will be useful for you if you build your own ML pipelines. The features I'm going to highlight will be data sets, asynchronous tasks, and deferrable operators. I will talk about dynamic tasks, the Astro SDK, my own custom huggingface operators, and lastly, about how I enabled slack alerting for certain events in my pipeline. After all that, I will hop over in the actual airflow environment. So you have a short demo of what this looks like when it's running, and I will talk to you about the results that I got for the data that I was analyzing. Lastly, we will talk or think about what could be next for this pipeline and for ML and Airflow in particular. And I will close with a few ML and airflow resources in case you want to get started on your own. All right, what is ML orchestration? It's a fairly new term. It is part or a subset of ML ops. Now, if you work in any capacity in the data sphere, you know that there are many different job titles and responsibilities. There are machine learning engineers, data engineers, DevOps engineers, data scientists, and a lot of times they do overlapping things. And this is the same situation here with the different terms. You do have some overlap between machine learning and DevOps. You have overlap between machine learning, DevOps, and data engineering, which is commonly what we call ML Ops. And then over all of that, you have orchestration. And an orchestrator is a tool that sits on top of your stack that you're using for a certain action, your ML stack or your data engineering stack, and then goes to each tool and tells it what to do at what time. So an orchestrator is kind of like the conductor in an orchestra that makes sure that all of the instruments are playing in tune on the right schedules, and that everything happens at the right time. And airflow is one of those orchestration tools. And airflow is very multipurpose in that you can see that little orchestrator sphere overlaps all of the other spheres, and there is an overlap between mlops and the orchestration or airflow. And this is what we call ML orchestration for this talk, this will be very familiar for you. If you are working in the ML sphere, you have your typical data science machine learning process. Usually you get some data, and the first rule of data is it's never in the format that you need and it always needs cleaning. So the first steps are usually data extraction, validation, what is classically part of the data engineering sphere. Then usually the data scientist comes in, does some manual exploration, engineers the features, builds the model, and lastly, the machine learning engineer, most of the time puts all of this in production. And along the way here you have a lot of steps that could be automated. Not all of them, of course. The exploratory analysis of the data scientist is really hard to automate. But a lot of things, once you know what the data is, what it looks like, and you know your model can really be automated. This is where airflow comes in, because you know that from any workflow in your life, the automated steps are very quick. But as soon as someone comes in, that's the cause of your latency. If someone has to copy paste something or if someone has to manually kick off the next script, that will really pull your process and it will make it much longer. And that's what we are trying to combat with airflow. We try to automate everything that is somehow possible to automate. All right, that's ML orchestration. Now, if you've never used Airflow before, I'm going to give you a very short crash course. What is Apache Airflow? It is a tool to schedule and monitor your data pipelines or any other of your Python code. Airflow is very versatile, so you can schedule anything that you can define in Python code, and that's why it's a multipurpose orchestrator. It's the most popular open source choice for this use case with 12 million downloads per month. And everything in Airflow is written as Python code. You're going to see what that code looks like. If you're using Python for machine learning, it will be very familiar to you. And that's also some of the big strengths of airflow, because it's defined in Python code, you can have CI CD on your pipelines process and you can infinitely scale and extend what you're doing with airflow. And of course, if after this talk you want to get started, there is a large and vibrant open source software community. We have our own slack. There are a lot of alters on stack overflow. There's the whole open source GitHub. Please get in touch if you have any issues or want to learn more. Now, apart from being defined in Python code, Airflow comes with a very functional and very pretty UI. What you see here is the overview. Over the Airflow UI you can see all of the dags. This is what we call one pipeline in Airflow stands for directed acyclic graph. And here you can see we have nine dags. I have some of them paused, some of them are activated, and I will not go through everything that you see here, but there's a lot of observability. You know exactly how your workflows are scheduled, when it ran the last time, when the next one will be, and what happened to the tasks? You can see green and red here on the screen. Green means something was successful, red means something failed. And there are other colors. So there's a lot of things you can see at one glance. And of course you can drill down here and look at individual dags, individual tasks, down to the very logs of the python script that you are executing. Okay, I already said what a dag is. A dag is comprised of the different tasks. So on the right hand side here we have a very small dag, just with two tasks. And you can see it's also a graph. It has two nodes. The nodes are the tasks and the edge is the dependency between them. It's a little arrow here. This just means the extract task needs to happen before the write to minio task, and by default the upstream task needs to be successful. But there's a lot of customizability here. On the left hand side you can see all of the code that went into creating this tiny dag. Of course, in production and in real life use cases, your dags are often much longer. But here you can see really everything that is needed to create one of these dags. On line eight, you can see where we define the DAC. So to tell airflow, hey, I want to make a new dag, you just add a decorator. The add Dac decorator on top of a python function and that will tell airflow. Everything that's in this function is a DAC. I only have to give it free parameters. Here I say this should happen every day, and I want it to start at the 1 January this year. Then within that python function on line 13 we have two tasks that are defined, and I on purpose am showing the two different ways that you can define airflow tasks. So on line 14 you can see I have an add task decorator and I put this on a Python function. And you might be thinking that just looks like a normal, like any regular python function. It's very true. You could take your current ML script that you have, put the add task decorator on top of it, and then it's a task in airflow. And if that's all you need, if you have several scripts and you want to run them in a certain order or with certain dependencies, then you know now everything you need to know to make that happen in airflow. Of course, sometimes there's a lot more customizability here. And the other great strength of airflow are predefined operators. An operator is simply a class that we use to create a task. So on line 19 you can see I have an operator that is called local file system to minio operator. This is actually an operator that I import locally on line free. You can see I import it from my own include folder because I made this for a use case and it didn't exist. And you can see I have a class and I only give it a few parameters. I give it a task id, which is what airflow will call the task in the UI and internally in the database. And then I give it some information, a bucket and an object name. And everything else happens under the hood. So the whole call to the minio API happens in this class that has been defined anywhere. And this is a strength because you have these predefined operators for many, many data tools. And if someone has created an operator for what you need, then you can simply use that, and you don't have to redefine the wheel every time. All right, that was a very, very simple dag. But of course dags can be as complex as you want them to be. Here you can see a DAC that is a little more complex. Of course these can get very big, but you can see here we have more complex dependencies. We can group tasks. So there's a lot of flexibility here as long as it stays directed and asyclic. Why should you use airflow? You can orchestrates data pipelines and ML pipelines in the same place, because airflow is tool agnostic. That means you can talk to any tool that has an API. Anything that you can do in python you can do in airflow so you don't have to use different tools anymore, and you can start creating complex dependencies. A lot of times you have an ML pipeline that you want to start once certain data is available, but maybe you want data quality checking to happen on that data. And the ML part of the pipelines should only start if the data quality check has a certain result and all sorts of complex dependencies like that. There's also fully functional API for airflow, so you can even start kicking off an ML pipeline by a button press on a website. I already said it's tool agnostic, and this is very important. And I realize every day more and more how important that is in the current data field. I feel like I wake up every day and there's a new ML tool and a new model and a new website springing up that I might want to use. And if you're using airflow for your orchestration, that will never be a problem because as long as it has some API, some way to call it, you can connect it to your airflow instance. And you don't have to redefine your whole pipeline just because a new tool doesn't natively integrate with another tool that you're already using. So this plug and play style of data orchestration is really important in the current data sphere now. And the third point, it is event driven. You can schedule dags to happen or tasks to happen based on events that are happening in your other data tools. I will show you what I mean by that in the pipeline walkthrough I said it's all just Python code. You can customize it, you can plug and play operators, create your own, you can extend upon airflow, it's open source. If you do something useful, please share it back to the community. And for ML, really important. There are already pre built operators that help you to run dedicated tasks in Kubernetes pods. So if you have a Kubernetes cluster like you saw, I put the at task decorator on top of a python function. I could also put at task Kubernetes. I have to give it a few more parameters in that case to define which Kubernetes cluster I'm sending the task to. But that way you can spin up your task in a dedicated pod. For example, if you need heavy gpu just for one task in your pipeline. Okay, that was the crash course for Apache Airflow. Now I'm going to jump back a little in my life and talk a little bit about medicine. So the data I was using, I was thinking, what is something I'm familiar with? And I've worked in neurosurgery before, so I decided to go with brain tumors and there was a small little data set for that on Kaggle. I've looked at gliomas versus meningiomas. You don't have to know anything about that, really. A glioma is a brain tumor coming from glial cells, so from the structural cells in your brain. And a meningioma is a brain tumor coming from arachnoidal cap cells. The arachnoid is a membrane that is surrounding your brain. So those are two different tumors that are coming from different cells. And of course they have all subclasses, but I was just looking at those two large buckets of kinds of tumour. I had a little more over 300 t two weighted images of both. T two weighted is just one type of MRI image. You can tell it's probably t two weighted if fluids are bright in the image. I did a train test, split of about a quarter being test, which left me with a test set with around 80 pictures of each and a train set with 260 gliomas and 247 meningiomas. Now, just so you can imagine what my data looked like, I only had slices, so only pictures, not a full MRI. And here you can see. Sometimes the differentiation between the two types of brain tumors is fairly easy. This was one of the reason why I picked these two types, because I was thinking this is something that you might be able to tell from one picture. Of course, never conclusively. You would never make a conclusive diagnosis from one picture or just from an MRI. In general, you would always wait for the biopsy. But it is something that I could imagine an ML model could be learning. Here on the left hand side, you can see the meningioma. If you remember, I said, it comes from cells that are part of the membrane surrounding your brain, so it has to touch that membrane at one point. And this is a very typical picture here. And on the right hand side, you can see a glioma, you can see this one started in the middle of the brain, you can see all the edema around it. It looks a little angry, what we would have said in radiology, because this looks quite malign, which unfortunately, gliomas are often malign. Meningiomas are more often benign. But of course, both entities have benign and malign types. Now, this was an example of one picture where I, on the first glass said, yeah, I'm pretty sure what I would classify it as, but it's not always that simple. There were also examples where the comparison was much harder. Here again, on the left hand side that later turned out to be a meningioma, and on the right hand side that turned out to be a glioma. Okay, this was the data I had, and with that I got started with the pipeline. Now I will walk through the pipeline, but of course, if you want to see and play with the whole code, it's on GitHub. I will post that link in the discord as well, and you can clone that and use it as a blueprint for your own pipelines if you want to. First, I want to cover the tools that I used. So obviously I used Airflow and I used the astropython SDK, which is an open source package on top of airflow. I stored all my files and images in s free, but you could use any blob storage, of course that you want to, or even store them just locally. I used a duckdB instance to store references to my files and store the results of my different models. And then in the end pick the model that performed best for a specific test set. I decided to go with hugging phase for the ML heavy parts, and specifically I decided to pick the Microsoft Resnet 50 model and fine tune that because I was thinking that's a very general model. I'm curious how well it performs before it's fine tuned and after on a very specific data set. Of course, since I was working with airflow, I was doing everything in Python, and in the end I wanted to have some way to be alerted of what's happening and I picked slack for that. Okay, again, you can see a DAG view, but this time of the actual airflow environment that I was using for this pipelines. And what you can see here, in total I have eight dags, so eight separate workflows, and all of them have a very varying different number of tasks. And what you can see here is what it looks like while it's running. And I have this overview. I know exactly when each DAC ran the last time and when it's scheduled to run next. Talking about the schedule, you might have heard about airflow being an advancement of Cron, and it's still very much possible to schedule your DAX on a cron string. But since then, Airflow has come a long way and I've actually scheduled most of the DAX in this repository on data. So what I did is here you can see what we call the data sets view in green or slightly bluish. Depending on your monitor. You can see the DAX and you can see in orange the data set. And what a data set is in a nutshell is if you imagine you have an airflow task and you know that airflow tasks, for example, updates a table in a database, then you can tell that task, hey, I know you are updating that table. Please, whenever you complete successfully, raise a little flag that tells my whole airflow environment that this table just has been updated. And then on the other side, I can tell other DAX, hey, whenever you see that flag or whenever you see that combination of flags, I want you to run. I want that whole other part of the pipelines to start. This is exactly what I did. So you can see here I have two dacs that are not scheduled on data sets. The in new test data and in new brain data DAC. And each of those DAX has one task that updates what I call a data set. So each of those DAX will eventually put new data in a folder and that folder is my data set. So I decided, hey, when I have new test data, I want that task to say, I have new test data. Raise that little flag. And the preprocessed test data DAC, which is downstream, is being told to run every time there is new data in that s free bucket. And that's how I started to chain my whole DAX. I will not go through the whole list here, but for example, on the right side, on the lower part of the screen, you can see that if I trained my model, I did the exact same thing. I told that task that trains the model. Hey, whenever you're successful and when you're done, raise a little flag that says new model has been trained. And then the testing part, the part that tests my fine tuned model runs every time that flag has been raised. I said, two dacs are not scheduled on a data set. I actually decided for two dacs that I want them to always run. And I'm very excited about this. You might be able to tell because this is a super new feature. This came out, I think, two weeks ago with airflow 2.6. It's called the continuous schedule. And what this means is if you set the schedule to add continuous, the DAC will always run. So you have a DAC that's always there, always running, and whenever it completes successfully, it will automatically start its next run. So now I can have my DAX that are always listening for something to happen and always running. These are the two Dax that are always running. They look very similar. They are mirrors of each other. One for the training data, one for the testing data. Because I was thinking in my case I would assume that if a new MRI file comes in that it would be filed either to go into the training or the testing data, depending on certain other properties. So I'm not doing a random split here. I assume that someone with domain knowledge would make that decision. Where this file goes and what's happening here is on the left hand side you can see the wait for training data, wait for new testing data task. And these are waiting for the new task for the new data to come in. Now I said I'm using s free. So what is happening under the hood is probably something all of you have done before. A call to the Bodocore API. But I did not have to think about bodocore a single time while creating this pipeline, because someone has done that for me. Very grateful. At this point there is a sensor, an operator called s free key sensor async, which is a deferrable operator. This means that this task, once it starts, it is waiting for something. In this case I just tell it exactly what key to wait for, in what bucket and in what AWS account. You can see AWS con id. That's where I store my credentials. So the credentials are hidden here and make the connection to my AWS environment. And as soon as this task starts waiting, it is actually not being run by an airflow worker anymore. It is being put into a completely different process called the triggerer. And this process runs in an async fashion and frees up your worker slot, which, as you can imagine, if you have a lot of these tasks running, can save you a ton of resources and make your pipelines much more efficient. So that was the first thing I wanted to show you. If you have sensors running, always check if there's an asynchronous version. Now let's imagine I have a new file. It comes in. What I want to happen next is I want to have the list of all the new files. And then I copy the files over. And you see here I have square brackets with a number in it. And this is because I decided I want to know for each of the individual files if my copy statement was successful, because I want to backtrack in case maybe someone had put in a file with a different file ending, or maybe something else went wrong with a corrupted file. I want to be able to backtrack that for every single file. And I did that by using dynamic task mapping. I did not write out 400 tasks, especially because I never know how many new data will come in. This time I had 400 pictures. Maybe tomorrow it's just one. So I need a varying number of tasks every time this diagrams and I definitely don't want to change the code every time. This is where dynamic tasks come in and dynamic tasks are a way to make your a flow pipelines more dynamic. I will not have time to talk in depth about how to use dynamic task mapping. There are a lot of resources online, there's a full length webinar as well. But what you can see here is we have an operator. So on line 73 we have the s free copy object operator. Again, someone already did all the work for me. I just had to plug that operator from the provider package and I say, well, for each of the instances of this operator I always use the same connection, I always connect to the same AWS account. But for each instance I have a different source bucket and a different destination bucket key that's on line 70 and I pass in a list of dictionaries and for each of those dictionaries a copied task will be made. And this happens at runtime. So the decision how many tasks will happen is always done once this task actually has to run, and that's how it can have one task instance maybe one day if there was only one new file, or it can have 100 or even 1000 the next day without me having to change any code. If you're interested in that, I encourage you to look more into dynamic tasks. This is what it looks like in the UI you can see here. All my mapped tasks, all of them were success in copying my file. All right, we have our data in the right folder in the US free bucket. Now we do preprocessing. First little side note, I'm creating the task called create DuckdB. Pool actually doesn't create a pool in the database sense that you might be familiar with. This creates an airflow pool, which is an airflow specific feature that is a collection of worker slots. And I can say I have a certain number of tasks and those tasks can be anywhere. In my whole airflow instance they can be in separate DAX and I can tell all of those tasks, you only get to use these worker slots. This is used to better handle resources to, for example group tasks that you never want to use too many worker slots at the same time. But it's also super handy if you have tasks that you never want to have run at the same time because in this case I'm writing to duckdb a few times, and I cannot do that synchronously. I cannot have two processes handling writing to the same duckdb instances at the same time. So what I'm doing is I'm creating a duckdb pool with one slot, and throughout the whole airflow instance, all the tasks that are writing some information to DuckdB, I tell them you can only use that one pool, only that one slot, and my parallelism issues are solved. Next, I list all the files in my training bucket and then I create a table in duckdb and I load both the references to the files. So the key to the file name and the labels that in this case I'm fetching it from the file name. But of course in a use case you probably have a different way that you're getting your labels. Maybe it's stored in a different database, maybe it's something that someone does manually. But there's another task that fetches the labels, and I store both the labels and the references to the file together in the same table in duckdb. Here I want to quickly zoom into the list train files task because again, that's something that you would need to call the AWS API for. But here there's actually an even handier thing, and this is part of the Astro. This is the first part of the astro that I want to show you in this talk. In general, the Astro is an open source tool for next generation DAC offering, and it tries to create tasks or utility functions that are tool agnostic in itself. So if you remember earlier, I was using an s free copy object operator. So an operator that is specific to s free, if I would switch to a different kind of blob storage, I would have to switch out the operator. But in this case here I'm using the get file list function from the astropython SDK and I just point it towards the connection to my blob storage. But if tomorrow someone decides that we are moving from AWS to Azure, the only thing that I would need to do is give it a new path and give it a new connection id to Azure. I wouldn't have to change any other thing. And that is very plug and play because as you see, I use a variable for my connection Id. So with one change of a variable I could change any task that is using the astropython SDK from one to another blob storage, and also from one to another relational storage. If those storages are supported by the SDK. If that is something that sounds interesting to you now, mainly speaking to all the data engineers listening because this solves so much pain and it has a ton more features and functions, like merging in tables together, for example. Then I encourage you to check out the Astro Python SDK docs. All right, we arrived finally at training the model. So far, everything was kind of data engineering and not really mlops yet, but now we are training a model. I'm getting my image keys from my duckdb storage. So this is also something that could be modified if you had any other process that decides which image to use in a given training session. But here, I'm just getting all of them. I'm loading all the training images into a temporary folder. I get my labels. I train my classifier using a lot of huggingface technology, and then I delete the files again from my temporary folder. And this is where the custom operator came in. So what I first did was just put the add task decorator on top of a script and had it running and it worked fine. But I was thinking there's actually more I can do here because maybe I want to reuse, maybe I have another project where I want to have a binary image classification and use houging face. And I want this to be more modular. So what I did here is I wrapped the whole model fine tuning in a custom operator. This operator doesn't exist publicly yet, but you can just copy paste the code from my repo, of course. But what I did here is I wrapped all of the code that you're familiar with and created this class that I just have to instantiate in my DaC code. So I have the model name here, I give it the file paths, I give it the labels. Learning rate, and all of the other hyperparameters tell it where to save the fine tuned model, give it the transformation functions, and this is good to go. And here, one thing that you can actually see is the very last line that says outlets to an airflow data set. This is what creates that little flag that I was talking about of the data set. Whenever this task completes successfully, the flag will go up to the airflow environment. Hey, we have a new trained model, and everything that I decide needs to happen in that case can start happening. Okay, I have another dag because I thought, I'm really curious how the model performs if it's not fine tuned. And I want to have a robust baseline, very similar structure loading in the images from s three, then running a test operator that I call test classifier. And in the end, I write the results to my duckdb table. So I created another table in my duckdb database and in this table. I want to store all the models with this result so I can very easily figure out which was the best model for a given test run. Here I want to zoom in into the testing operator. I did the same thing again. I took all my test code and wrapped it into a custom operator. So I simply have to provide some parameters. There are more parameters that I just set a default to here. But now my Dax file stays clean and I have all my testing code in a separate file doing the same thing to the fine tuned model. Of course, that's the core thing that we're all always waiting for and watching the logs for. I get the latest fine tuned model so I'm storing all my fine tuned model in a local directory, but I want to get the latest for each test run again, loading the test images, testing my model and then write the results to duct Db and deleting the images from the temporary folder. The code here you can see. That's why it's so great to have things in a modular way. I used the same operator, same import, and this time I just gave it a few different parameters. Namely I gave it the result of the get latest fine tuned model task that was upstream. So I'm using a at task decorated function and return the name of the model that I want to use here and I can just plug that into model name and this will also automatically set the dependency correctly. Now what I wanted to highlight here is I talked about slack notifications and I was thinking, well, the thing I'm interested in is every time a model has been fine tuned and that model has been tested, I want to know how it's doing because maybe if it's not doing well, I have to go back and I have to change some habit parameters or change something on my code. So I decided to in this test classifier task I want to have a callback. It's an on success callback. So this runs whenever this task is successful. I could also have on failure callbacks. Of course, it's actually very common for people. We also have that in our internal data pipelines to have on failure callbacks that send you an email if something goes wrong with something you are responsible for. But in this case I wanted to be notified if the task was successful. I want to get a slack message. This is what that slack message looked like. This is fully customizable. So this is just what I decided I want to have in this message. And I'm pulling from upstream tasks. I'm pulling results here. I'm pulling the result of my fine tuned model test and started reading average test loss zero. Okay, that's too good. And yeah, I need to go back to the drawing board. That was one of the notifications while I was building this pipeline that sent me back to. Okay, hyperparameters completely out of whack. The accuracy is much worse than baseline and f one score. AUC is also much worse and average Tesla's of zero. I think this might be overfitting. So let's go back to the drawing board. And lastly, I have the last DAC in my whole pipeline. I didn't put the whole graph view because it's a very simple one, but it's a DaG that picks the best model and then deploys it. Doesn't actually deploy it yet because I know that code will look different for everyone. But I put in a task already that is called deploy model, where you could put in all your code that actually deploys to your solution. And that's the second place where I wanted to talk about the astropython SDK because here you can see I'm using at AQL something tasks. And what this signifies is that I'm using Astro convenience tasks or Astro SDK operators. The first one I'm using is at AQL transform, which is a way that you can turn any SQL query into a task that runs on any table that you put in. Here I have the intable, which is my results table, and I simply point that at a table in my relational storage. So on line 52 you can see that I'm pointing it at a table object and I again give it a connection id. And if I ever would decide I don't want to use duckdb anymore, maybe I'm using a snowflake database. I could simply point that at my snowflake instance and I wouldn't have to change any other code. So again, this is a way to make airflow much more modular and much more versatile and flexible if you ever change an underlying tool. And the second task takes in the returned table. So I'm running the SQL statement on my results table. This will return a temporary table in this case because I didn't give it an output table. And this temporary table is simply passed into the AQL data frame task, which automatically turns that relational table into a pandas data frame. So I can just print out the model name of the best model that I selected. All right, that was all of the pipelines. I know that was a lot, but I wanted to also show you what that looks like. So let's hop over to my airflow instance. And you can see here it's actually running. You can see there was one successful run of the whole pipeline because we have one past success run. And you also see I have all the Dax turned on and two of them are currently running. So these are the two dax that are using the add continuous schedule and they will always be running. And the first task, which was the s free key async operator, is in a deferred state here. So this task is using the triggerer component to wait for a file to drop in my s free bucket. So let's give it what it's waiting for. Hopping over to s free. And let's take a file here. Let's say I have a new meningioma that I want to train. I want to upgrade the train set, and then I want to retrain my model with this. Okay, uploaded. It takes a second and you can see it's not deferred anymore. Now this task is running. That was the center, and this will run all the downstream tasks. And the other thing that you saw is something happened here in this different dag. This is because this DAG is scheduled to happen as soon as this DAG has moved that file from the folder I dropped the new picture in into a different folder that is called train data ng. And now this part of the pipeline is starting. Can see here that actually has a lot of tasks scheduled, 450. So I'm not going to wait for all of this to happen. Just wanted you to see how the pipeline is kicked off. But once this DAG would be completed, the model training would start over here because again, I have scheduled it on a data set that is updated by a task in this environment. All right, we're not going to watch all of this happening, but we're going to jump into the code. So here you can see my visual code studio and you can see all the code that went into making this airflow environment. And this is also all the code that is going to be in the repository for you to download. On the left hand side, you can see what the airflow environment looks like. An Astro project here, because I created this airflow project with the Astro CLi. The Astro CLI is another open source tool that helps you use airflow. There will be instructions on how to use the Astro CLI on your computer in the readme of the repository. But in a nutshell, once you have the Astro CLI installed and you run astrodefinit in any empty folder. It will create most of these files for you. And what you can see here is the main thing is the docker file that I have open here. It pulls an image. It's called Astro runtime, but this is almost completely equivalent to open source airflow and it just has a few additional features. I'm also setting an environment variable here that is necessary for running the Astro SDK. But that is all that's needed to start airflow. And once you have run Astro in it, you can run Astro start next and it will start up an airflow environment with two example dags for you ready to use now. Next, I wanted to show you where the dags live. So I have this folder here called Dax, and this contains all of the dags that are in my environment. The one I want to show you is the training DAC. You can see here, I have some documentation import everything can see. I tried to modularize it as much as possible. So I'm importing a lot of my variables from a local config file. I'm importing my transform function that is running on my images. And of course I'm importing my fine tuned hugging face binary image classifier operator. I'm bad at naming things. And here the dag is defined. We scroll down, we can see all of the tasks. Each of these pieces of code is a task that is generated in my DAC. You can see some use the astro decay here. This one is using the add task decorator because I wanted to be able to configure this a little more. And here we are using the fine tune hugging face binary image classifier operator with all of the parameters. Now here I told you I'm importing this from a local file. So if we scroll back up, can see I'm importing this from include custom operator hugging face. So let's go to that file. And this is where all of the magic happens that you are probably more familiar with, a lot more familiar with than I am. We are using torch here and different utilities from huggingface. But if we scroll down here, we have all of the functions, we have the custom data set that I'm creating, and we have the huggingface binary image classifier here. And what I wanted to show you here is how easy it is to create a custom operator. The only things you need to do are you have to inherit from the base operator that you import from airflow, and then you have to define all of your parameters. You can give it a color, you don't have to, but I always do. And initialize it. And you just have to initialize the parent class as well and then you're good to go. You can have an execute method, and everything that is in this execute method will run as soon as this airflow task is running. And here is all that you are probably very familiar with. So here I'm pulling the actual model and I'm doing all my training in this execute method. And of course same thing happens with the test hugging face binary image classifier operator. All right, this is still running, but let's hop back into the presentation because I wanted to talk about the results. Now, I haven't done this before. This was my first time fine tuning an image classification model, so I went in blind. But it was very interesting before I fine tuned it, just running the all purpose Resnet 50 on my classification glioma versus meningioma. It has not seen many brain tumors before, I'm assuming. But yeah, the model did not perform very well. And if you remember, I had a pretty balanced test set. So the split is worse than guessing or about as good as guessing. And after I fine tuned the model, I got something that I was pretty happy with. I tried to aim for highest AUC and I got 0844, which is something especially looking at the data set. The images were often cut at very different slices in the brain. There were other inconsistencies. I'm impressed. And that was only for the app pox. I only trained it for 1 hour on my local computer. So yeah, this is actually working pretty well. All right, what's next? Of course, this is just a very simple ML and data orchestrating pipeline. There is much more that you can do here. For example, you could use dynamic task mapping to fine tune your models, to tune the hyperparameters. If you remember, I showed you that you can have one task, one operator, and then for different parameters you can simply provide it with sets and it will create a mapped task instance for each of these sets. You could do that with your hyperparameter parameters. So this is ideal for hyperparameter finetuning with full observability. If you go back, you could also, if you're running this in production, use the Kubernetes executor or the Kubernetes pod operator to run the heavy tasks in a dedicated pod and specify the resources you need, like GPU. And of course, one thing that I was thinking while I was doing this, I think at scale I would try to use a relational database with the option of parallel writing, especially for writing the file references, for writing the model results. Duckdb was really great, but writing the file references, I found myself wishing I could have parallel writing. And of course, maybe you're thinking, hey, I'm using huggingface, I am doing NLP and is there an operator for me? Not yet, but there could be, and it's pretty easy to write it. If that's something you're interested in, feel free to create your own and share it with the whole airflow community. One of the best places to get started with that, and also get help if you run into any issues is the airflow slack. I'm also going to put that link into the discord chat later. All right, I hope you got interested in learning more about airflow in seeing how Airflow and ML can work together. There are a lot of resources that are specific to ML use cases. For ML Flow, there's a whole provider, so there are existing operators that interact with ML Flow. I link the GitHub repository here. There's a whole demo on how to use airflow with weights and biases. There is a very great webinar on how to orchestrate machine learning workflows with airflow, and a blog post about what you might want to know before you get started. Both of these resources, the webinar and the blog post, were created by Jeff Fletcher. And I say that because he helped me a lot while creating this pipeline. So big shout out to him. He's the director of field engineering machine learning at Astronomer. And yeah, big thanks and I can really recommend his resources. And of course, if you're using Sagemaker, we already have a tutorial on how to connect airflow and Sagemaker, and there are existing operators for running things in Sagemaker. And yeah, the last thing I wanted to say here on the slide for resources, if you're very new to airflow, probably very experienced with ML, but want to get started with airflow, we have a quick start that you simply can clone and run that is more data engineering focused, but highlights a lot of interesting airflow features that also were part of this talk like dynamic task mapping and data sets. And of course, if there is an ML tool that you're using and that you want to see an integration with, or there's content that you would like to see around ML and Airflow, please tell me, please tell us. At astronomer, we are always looking for how we can help the airflow and greater data community. That was it. For content. Take home messages. I have to say one thing. As someone who formerly worked clinically. Was this clinically useful? Absolutely not. In reality, you would always have a radiologist look at these pictures, would look at the whole MRI. You never just look at one slice of an MRI. So this isn't really clinically useful in any way. It was just something that was fun and interesting to do. But it shows that even with these very constricted resources, I only had a handful of pictures, I only ran my whole training locally. I didn't do that much fine parameter tuning. You can get interesting improvements on an existing model, and I think in part this will be the feature future. Maybe not so much for brain mris, because they are always looked at very closely anyway, but maybe for other images that are, of which there are a lot taken. And maybe you want to prioritize which to look at first. Or one use case that I've seen actually already happening, or being trialed a few years ago in a radiology department, was that they had a model that flagged potentially cancerous small lesions in lung cts. So the radiologists, after they read the whole image normally, they would always go back and look at those flagged images and just look through these little lesions to make sure none of them might have been missed. So that's something that how ML could augment the work of physicians in many ways. And yeah, the next thing, can the pipeline be easily adjusted for other use cases? Yes, definitely. That was my goal here, that you can adjust it for different things. You might have to change a little bit about maybe the transform function for your images, but the whole idea of airflow is to make it as modular and adjustable and flexible as possible. So the main take home message here is airflow is the one central place that can orchestrates both your data and your ML. Pipelines can have complex interdependencies and it is fully tool agnostic. And with that, I thank you so much for listening to this talk. I hope you got interested in checking out airflow and if you have any questions, please ask me in the Discord channel. And with that, I hope you have an awesome day.
...

Tamara Janina Fingerlin

Developer Advocate @ Astronomer

Tamara Janina Fingerlin's LinkedIn account Tamara Janina Fingerlin's twitter account



Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways