Conf42 Python 2024 - Online

How to use common Python frameworks to test Apache Airflow data pipelines

Video size:

Abstract

You test all your apps; your data engineering and machine learning pipelines should be rigorously tested too! This talk will show you best practices and options to test Apache Airflow pipelines using common Python testing frameworks that can be integrated into a CI/CD pipeline.

Summary

  • How to test your Apache Airflow data pipelines using common Python frameworks. I will show everything that I show in the slides in a demo as well. The demo repository code will be available in an open source GitHub repository.
  • Apache Airflow is the open standard for workflow management. It's being downloaded millions of times per month. You can use decorators or operator classes to create tasks. 81% of airflow users say it is important to their business.
  • astronomer is the best place to run Apache Airflow in production. Astronomer also provides additional tooling on top of airflow. Everything that I'm going to show you in this talk is possible with fully open source tools.
  • Airflow is really important in a lot of businesses. All software engineering and DevOps best practices apply, including testing and CI CD. Even if there is something going on, if your pipelines are tested, it will make it easier for you to debug.
  • All of the code is in source control. In this case, I use GitHub. If you're using Airflow for machine learning purposes, you will also have your ML artifacts inside of this version control. On each of the steps you can see I put a little gear icon and CI CD because we are doing testing all the way along.
  • If you want to develop with airflow locally, you have a lot of different options. There's one that I highly, highly recommend. The Astro Cli allows you to very easily spin up a local airflow environment within Docker. It also includes built in testing features.
  • You can test your DAx interactively in your ide with your favorite debugging tool. No matter which ide you use, you can run through your Dax the same way you run through other Python scripts with your debugger.
  • You can incorporate Astrodef pytests into your CI CD. This means you have DAC validation tests, unit tests, and integration tests. You can also add custom dag rules to enforce architectural decisions.
  • If you're using airflow, it really is important if you have custom hooks and operators. The third type of best is integration testing. This is if you are talking to an API. You can use any Python test framework that you like.
  • CI CD stands for continuous integration, continuous deployment. Since airflow is all just Python code, you can use any CI CD tool that you like. You can also consider having automated creation of your deployment. This is the next step in DevOps in a lot of cases.
  • All of the code is available to you if you go to GitHub and then astronomer externaltalkdemos. So 2024 conf 42 Python airflow testing prod. Here is where we pull in airflow into the Docker environment. It is a best practice to always pin your versions here.
  • The airflow DAX folder is where we can put all of our supporting code. And most importantly for this talk, we have the tests folder where all of the tests live. Sometimes you need to debug your tasks in more depth and you want to use the python debugger.
  • You can take this test and add it to your CI CD. Tests include DAC validation tests, integration tests and unit tests. You can run all of these with one command. This is how you can get real control over your DAX.
  • You can also do upgrade testing. The take home message airflow is written in Python. This means you should really test it. We recommend to have a robust testing suite inside of your tests folder to run with Astrodefpy tests on every step on the way.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi. Welcome everyone, and thank you so much for joining this talk on how to test your Apache Airflow data pipelines using common Python frameworks. So I'm very excited to talk to you about this topic. First, I'll be giving a quick introduction to Apache Airflow and astronomer, and I will talk a little bit about why you want to test your data pipelines. After that, I will go over the technical details on how to implement testing inside of local development environments and also how you can add your airflow tests to your CI CD pipeline. I will show everything that I show in the slides in a demo as well, and the demo repository code will be available to you in an open source GitHub repository. But first, what is Airflow? Who is astronomer now? Airflow is the open standard for workflow management. It's a very popular tool. It's being downloaded millions of times per month, which is just a lot. We have a big slack community. If you ever have a question about Apache Airflow, this is the best place to ask. Just join our slack and you will be able to talk to all sorts of airflow users and also developers. It's a very actively developed project. There are over 2800 contributors now, and there are a lot of building blocks in the ecosystem now. That's the high level, a workflow management system. If you are working on data, you will recognize some or a lot of these logos. Usually if you are working on data, you have your data in different places and you perform actions on the data using different tools, like one or more of these most likely. And some of these tools talk together, but not all of them. And you really are missing something in the middle there, right? You're missing a centerpiece, and that is really what Airflow is. Airflow is the orchestrator to your data symphony. So Airflow sits in the middle and is able to talk to all of these pieces to orchestrate actions in all of these data tools with the right dependencies, on the right cadence at the exact right time. We very recently did a survey of airflow users, and among the about 800 people who responded, 81% actually said that Airflow is important or even very important to their business. So if there is something going on in their airflow pipelines, it is a big deal in their companies and in their businesses. That's the high level. But what does it actually look like day to day? If you are working with Airflow in general for this talk, I will assume that you've used airflow before, that you've written a couple of dags, but in case you have not, or in case you need a refresher on more modern syntax. This is the Airflow 101 slide. On the left hand side you can see a screenshot from the airflow UI. So this shows one pipelines within airflow which we call dag that stands for directed asyclic graph. And on the right hand side you can see all of the code that went into creating this DAC. So we have a DAC. In this case it is called intro to airflow DAC. And this is just defined by using a decorator in this case. So I use this Act DAC decorator on line seven, I put it on a python function and if I put this file into the right folder, which I'm going to show in the demo, then airflow knows this is a DAG and automatically will pick it up. A dag itself contains tasks. So in the graph on the left hand side you can see we have a graph with two nodes. Those are the tasks, pick a number and add 23, and the edge in between is the dependency. So the first task, pick a number needs to happen before the second task add 23 is happening. Now these tasks can be defined in two main different ways. You can use decorators or operator classes because it is all just python code under the hood. So the first task, pick a number I've defined using the add task decorator. I just used a regular Python function, put add task on top of it and then I can put any python code in between and airflow will automatically create a task out of this. This is already how you can turn all of your Python scripts into airflow tasks if you want to do that without any additional knowledge. Now this is one way that you can create a task. The other one is by using an operator class. There are thousands of these operator classes around and a lot of them are bundled into provider packages. So they abstract away a lot of the logic that you need to interact with certain data tools. For example, there's an Azure provider, there's a Google provider, there's an Amazon provider, and they contain a lot of pre made operators that make it easier for you to talk to those services. In this case, I used an operator called my basic math operator, and this is an operator that I actually wrote myself for demonstration purposes. So I can also show you the best later that test this specific operator. In this case, this operator just adds the number 23, in this case to the number that is being returned from the first task. So you can see on line 25 I can actually use the output of an upstream task in my downstream task, so you can also exchange information in between tasks. Lastly, on line 30, I say, well, let's chain those tasks. I need to pick a number before I add 23. That is already all you need to know to define your first few airflow. Dax two notes on best practices. Usually, DAX run periodically, incrementally, and automatically. So you usually have some sort of cadence that can be based on time. Like in this case, I have a daily schedule, but it can also be based on something that's data driven. So you can, for example, wait on an action in a different data tool. Or you can also say that one dag needs to happen once another dag has finished, and usually it works incrementally on data if you're using it for more traditional ETL ELT pipelines. And ideally it runs automatically with only little input from you as soon as you're done developing them. Now, for the tasks themselves, we have three best practice words that you can keep in mind when you're writing a task. The first one is they should be item potent, which I already broke in this little example. You might have noticed I'm using random, and this will not be item potent because I'm not seeding it in this case. But ideally you want your task to always have the same output for the same input. This is important when you're rerunning tasks after the fact, which airflow gives you a lot of options to do that maybe a month later or even a year later to rerun a specific run of a dag. The next one is to keep your tasks atomic, so you want your tasks to do as little as possible. You do not want to have a very long script that does a lot of things, that even talks to different outside tools all in the same task, because then you lose observability and you lose a lot of the additional features that airflow can give you, or they are just not that useful for you if you pack everything into the same task. And lastly, because it is all just python code, you can create this in a modular structure. You can put functions in supporting folders, import them, use them in different dags. You can even build your own custom framework on top of airflow on how you define and write dags. It is all just python, and it is all fully open source. All right, I already said airflow is open source. What is astronomer? So astronomer is the best place to run Apache Airflow in production. I'm obviously biased, but I truly believe that it is a managed service that gives you the infrastructure to run your Apache airflow pipelines, especially once you grow and you have hundreds of pipelines, this helps you run airflow reliably in production. Of course, there are other ways you can run airflow with open source tools. Like there is an official helm chart. A lot of people have custom helm charts. There are a lot of different options that you have. Astronomer also provides additional tooling on top of airflow. If you want to try this, there's a free trial. If you go to astronomer IO slash Triastro, you can take it for a spin. But everything that I'm going to show you in this talk is possible with fully open source tools. You do not need to be an astronomer customer to do all of the testing ways that I'm going to show you in this talk. All right. Why should you test your data pipelines? Well, I already alluded to something. Airflow is really important in a lot of businesses. So if something goes wrong, it can look like this. Of course, there are a ton of reasons why there could be something up with your data pipelines. The worst way to find out is usually by a slack message from someone high up in the organization. That sounds something like, where is my dashboard? Or why is my dashboard showing something nonsensical? You cannot prevent all of it, but you can prevent some of these events by testing your data pipelines. And even if there is something going on, if your pipelines are tested, it will make it easier for you to debug because you know which part of your pipelines you can trust. This is kind of the takeaway message of this talk. Airflow is written in Python, and airflow pipelines are just Python code, so it's Python all the way down. And this means that all software engineering and DevOps best practices apply, including testing and CI CD. You would never ship application code that is not unit tested, and you should also not do that with Apache Airflow. You should also test all of your DAx, all of your pipelines. Okay, told you why you should test it. Now let's get into how you actually can do the testing. And first we will talk about local development. So this is the overview. This is one of the potential structures that you can have when you're working with data pipelines. One important thing that is really just a DevOps best practice is all of the code is in source control. In version control. In this case, I use GitHub. But of course, you can use any version control system that you want. And if you're using Airflow for machine learning purposes, you will also have your ML artifacts inside of this version control. And in a lot of cases you have configuration files for infrastructure as code as well. But we will focus on the DAC code for this presentation. Now we have our version control. We have three branches, in this case, development, staging and production. And then we have all of the steps that our code takes. And on each of these steps you can see I put a little gear icon and CI CD because we are doing testing all the way along. And first of all, we are going to talk about the local part. So we have our code, we have our development branch. We are working locally on a machine, but we can already do some testing. Now, if you want to develop with airflow locally, you have a lot of different options. You can run a standalone instance. You can run airflow in Docker. There's one that I highly, highly recommend. It's called the Astro Cli. This is also a fully open source tool. It's developed by Astronomer, but it's not specific to astronomer customers. Everyone can use it. And this allows you to very easily spin up a local airflow environment within Docker. And it also includes built in testing features. So if you go to this link that is on the screen on the astronomer docs, the Astro Cli, you can get some install instructions. But what you really need to know is you just need three common first you install the package brew install Astro. Then you go to any empty directory on your machine and you run astrodefinit. And once you run astrodefinite, a local astro project will be initialized. And this is already fully functional with two example DAX. So you can directly say Astrodef start. And after one or two minutes you have airflow running locally within Docker, and you can access the efflow UI at localhost 80 80. Of course, then you can start adding your own DAX and start to develop. All right, let's say you've done that. You used the Astro Cli, you have your project, you added your DAX. How do you go about testing those DAX? And the first option that you have is by using CLI features. If you've used airflow before, you are probably familiar with the blue ones on the screen. Those are part of the general Airflow CLI. You can also use those with the Astro Cli. You just have to append Astrodev run, as I've shown in the parentheses to the command. And the first testing command that you can use is airflow Dags test. And this just executes one dag run of your dag. So it runs through your dag, and if you have any issues along the way, it will tell you. The second option is airflow tasks test, so you can also best specific tasks inside of your dag. This can be very useful if you're working on a long DAC with a lot of dependencies, but you have a task in the middle that doesn't really need anything from upstream and you just want to develop and iterate on that one. You can just test a specific task directly from the CLI and you get the full logs of the task as well. Now, if you're using the Astro Cli, you have more options. So the green ones here, the first one is called Astrodev parse, and this parses your dax and makes sure there are no import errors. Of course it parses for python syntax, so if you made a basic syntax error, it will notice. It also checks for things like there cannot be two tasks that have the same id in the same dag, or you cannot, because it's called directed asyclic graph, you cannot accidentally define a circle. So these are things that Astrodev parse would catch, and you can also do that very easily from the CLI. The next one, which I will talk about more in the upcoming slides, is Astrodefpy test. This command runs all of the tests that you've defined in the best directory. I will show you that in a demo. And even though it's called Pytest, you can use any Python framework that you like. You do not need to use pytest can be unit best, another Python framework. This command just runs all of the best in that folder. And lastly, a little bit of a different kind of test. There is a command called Astrodef upgrade best that will test your current environment and all of your dags against a newer airflow version. So if you are planning to upgrade and you're unsure whether there might be a breaking change, you can run this command and it will test your DaX. It will also tell you about all of the version changes of packages. It's another thing that I will show you in the demo in more detail. Right, this was the CLI, but I promised there would be something about IDe debugging. It's one of my favorite little features that not that many people know about. You can actually test airflow DAx interactively in your ide with your favorite debugging tool. So no matter which ide you use, if you use vs code Pycharm, you can actually run through your Dax the same way you run through other Python scripts with your debugger. And the way that you do that is at the end of your dag file, you add if name equals main. So if you run the file directly, then you take the dag object that is in that file and you say best. And that's already it. That makes the DAC interactive. And then you can run that file with a debugging tool. But you have additional options. So if you want to, you can say, well, I want to run this DAC for a specific date. If you're worried about your DAC breaking on the 1 January, if you're doing a lot of logic around date times, it's definitely one of the worries on my mind with some of the dags that I'm writing sometimes, then you can just test it for that date. You can also use different connections. You can store them in a yaml file and just inject them. So you could test your DAC against a new database, for example, or a new schema. You can use different variables and you can also inject the DAC configuration. I will show you that in the demo as well. All right, so let's move on to CI CD, because so far we've stayed local. We were just working locally on a machine with our local development created by the astral CLI. But eventually we want our code to live in production. Right? That's the dream of all of the code. So let's move on to the staging branch. And there is a CI CD step here. Now, we recommend and different teams will have different processes around these. The important thing is that you have a process that you follow to have a PR from a dev branch to a staging branch. Usually there are a lot of developer branches, especially in larger teams. But one dedicated staging branch and the PR will be in your version control and ideally reviewed by your team members. And as part of this pr, you run some tests automatically. And this is where you can reuse Astrodef pytests, because this is not just a command that you can run locally, but you can incorporate this command into your CI CD and it will run all of the best in your tests folder. Usually this means you have DAC validation tests, unit tests, and integration tests. I will talk about that in the next slide. And I put this on the slide because I've done this before. I didn't want to wait for all of the checks to complete. I was 100% sure that my code was fine. Just a small change, right? You should wait at only merge if the tests have passed. Now, what are those three kinds of tests. The first one is airflow specific, so this is something that you do not have with your other python files. You can have DAC validation tests. The first test that all DAC validation should run is if the dags are valid. So what I mentioned earlier, are there any circles defined? Are the task ids valid? Is it a valid type of schedule? Did you miss a mandatory parameter? Those sort of things that are also run when you run Astrodev parse. The second part is that you can add custom dag rules. So for example, you could have a constraint on schedules. If you have a team member who might accidentally push a dag with the wrong cron string and suddenly your dag runs every five minutes and it's supposed to run once per day, you can add a test that makes sure that only approved schedules come through. You could theoretically even define for like a specific dag what schedules are allowed. If that fits your use case. You can do the same with start dates or with tags. The screenshot in this example makes sure that catch up is set to false, which all airflow developers who are listening know or have done that before on accident, set catch up to true, or forgot about catch up to false, and then all of the backfills have scheduled automatically. So this is a very common one that people use to prevent that. Or you can also say, yeah, I want to have specific tags. If you have ways of organizing your dags, you can really enforce your architectural decisions onto your dag offers by using these custom dag rules. Another one that is very common is because you can also have constraints on your tasks. You could, for example, only allow specific operators. This is something that I will show you in the demo, and this example uses Pytest, but you can use any Python test framework. The reason I've used Pytest here is because if you use the astro CLI, there will be example tests already automatically created, and those will use pytest. All right, the next one is unit testing. Now, all of the Python developers, of course, are familiar with unit testing. Where does that come in? If you're using airflow, it really is important if you have custom hooks and operators. So like the example that I've showed in the intro slide, my basic math operator is an operator that I wrote. So it's really an operator that I should also write best for. The second use case is if you have functions used and add task decorated tasks, you can also write unit tests for those as well. So really, if you write your custom python code, you should also add your testing. Again, you can use any Python test framework that you like. This code snippet here uses unit best, but you could use pytest. You could use any package that you prefer. A little side note here, I mentioned that there are a lot of building blocks in Apache airflow. If you're using one of those open source building blocks, then the tests are already done for you. So you only have to write unit tests for your own custom operators. If you want to figure out if there is already an operator for your use case, the best place to do so is the astronomer registry. If you go to registry astronomer IO, you can find all sorts of information on all sorts of operators. All right. And the third type of best is integration testing. This is if you are talking to an API. So with unit tests you usually try to only test the code that you yourself wrote. So if a custom operator calls out to an API, for example, you're writing a custom operator to interact with Snowflake, and then you would start to mock that API call to make sure that it is your code that is working or failing and not something with the API. And with integration tests, you actually want to test that API call, which again you do for custom code that you write and you can do with any Python test framework. In this code snippet, I simply use the built in assert. One little note here. Be careful, of course, if you call out to an API, that can incur cost and sometimes take a lot of time, especially nowadays with llms. So be careful of what kind of payload you are sending and what kind of code call you are making inside of your tests. All right, so we merged our PR. Our code is now on the staging branch. It is very happy on that branch, but it wants to move to the deployment. And CI CD stands for continuous integration, continuous deployment. So we have a step that does this automatically. Ideally the code, once it is moved onto the staging branch, it will run the tests again, and if those tests pass, it will be automatically deployed to the staging deployment. Some people also add more integrated best to the stage that maybe test your code in context, in the context of the whole environment. You have a lot of options on what you can put into your CI CD script. Since airflow is all just Python code, you can use any CI CD tool that you like. In the demo. I'm going to show GitHub actions, but there are example scripts for different tools available in the astronomer documentation. You can also consider having automated creation of your deployment. So for example, maybe you want the deployment to automatically update a configuration and have that configuration file in your source code as well, or in your source control system as well. Then you can use infrastructure management as part of your CI CD. This is kind of the next step in DevOps in a lot of cases, and you can also do that for your airflow infrastructure. This will depend on how you are running airflow in production. But if you're an astronomer customer, you can use the Astro API to manage your infrastructure in a code based way. Okay, so our code made it to the staging deployment. It is running. There it is. Fine, let's go to production. This is just a recommendation. It's not always possible, but we recommend that you run your code for a few days in your staging deployment as an end to end test, because you can never think of anything. There will always be edge cases that you did not think of that maybe come up only with data on certain days or. Yeah, something specific that you only see once your code is running in the full context. So that is one of the main purposes of the staging deployment, making sure your new code is working, your new pipelines are working end to end, and then usually a couple of prs get bundled, and there is a staging to production pr that gets reviewed again. And once that is merged, all of our tests run again and the deployment is automatic again. All right, that was the overview over the different ways and contexts of testing. Now I will move to the demo. So, as I said, all of the code in this demo is available to you if you go to GitHub and then astronomer externaltalkdemos. It is on the branch for this conference. So 2024 conf 42 Python airflow testing prod. Don't worry, you don't have to write that down. The readme on the main branch will have a link to this branch. But let's look at this. All right, so this is the airflow environment you can see here. I'm running on localhost 80 80. So this is running on my local machine with the Astro CLi. And the code repository that is available to you just contains two dacs. The first one is the intro to airflow DAC that I've shown in the slide. So you have the code, and the second one is the one where I'm going to show you the testing options. So let's click on this math DAC. Scroll down. Okay, this is our DAC. Can already see that I've tested a specific task here. But if you're looking at a full run, you can see the structure of this dag. So it is a very, very simple dag. We have two tasks. At first one picks a random number. It actually uses an API to do that. The second one retrieves an operation from an airflow variable, and then we will perform that operation with that number and the number 23 using this basic math operator that is a custom operator. And lastly, we make sure that a table exists in a local postgres database and then we write that result to that table. So a very simple DAX structure, but it has all of the pieces that we need to look at testing. So with that said, let's hop over into the code. I'm using vs code here. You can use any ide that you prefer, but what I've done here is in an empty folder. I ran the command astrodef in it, and this will create the Astro project and it will create most of the files that you see here and most of the folders. The important one here is the Docker file. So what is happening here is that it pulls the astro runtime and this is functionally equivalent to open source airflow. It just adds a little few features that help with running airflow with the Astro CLI and with Astronomer. But this is also ready for you to use and you can expect feature parity from Apache Airflow. In this case, version ten two corresponds to a 2.8 version of open source airflow. So here is where we pull in airflow into the Docker environment. You do not need to know anything about Docker in depth or in particular to use this tool, but you can add your customization here if there's something you want to customize. Since Docker is running in an isolated environment, we need to install our packages. And this is where you can do that. You can add all of the packages that you would pip install to the requirements file. And then once you run Astrodefstart, these automatically get installed in your airflow environment. One little note here. It is a best practice to always pin your versions here. This is also something that has caused me some headache before when I forgot that for a simple package, always pin your versions. All right, so the next thing is we have the DAX folder. And I said earlier, airflow will know that a file is an airflow DAC. If you use this add DAC decorator and you put the file into the right folder and this is the right folder. So if you put a file in here, like our intro to airflow file, and it has the word dag in it, it will automatically be picked up by the scheduler. We have an include file. This is where we can put all of our supporting code. So, for example, the custom operator lives here. And most importantly for this talk, we have the tests folder where all of our tests live. And I will go through those in more detail in a second. Okay, so this is our astro project that is created by Astrodefinit. We can start it by Astrodef start. It is already running here. Show you the docker containers that are running. And now we want to do some development. So we wrote this DaG. This is the DAG that I've shown you with this very simple structure, picking a random number and performing an operation on it. Let's say we've written this DaG and we want to test it. Now. If you remember, the first option that we have is by using CLI commands. So one of the commands would be we can just test the whole DaG. So airflow DAX test, if you're using the Astro Cli translates to Astrodef run DAX test, and then we use the DAC id. So I run Astrodef, run DAx test, math DAC. Let's run this. This will just run for the whole DAC. And it's pretty fast. So you can see here it executed all of the alt tasks. We ended up with eleven times 23. So the random number was eleven. And this result eventually gets inserted into our postgres database. All right, so let's say I do some more development on a specific task. Maybe this pick random number task. And I don't want to run everything all of the time, so let's just run this specific task. This is also something you can easily do with the CLI. We can do Astrodef run tasks, test, map dag, pick a random number. So we use the DAC id and then the task id. And this will just run this task. I get 14. Let's do it again. It should be random, not item potent. Get 71. So this is working. All right. So this is already very helpful. But sometimes you need to debug something in depth. So you need to debug your tasks in more depth and you want to use the python debugger. And what you can do then is let's add a breakpoint here. Let's add the breakpoint here. This just means that I tell my project, I think there's something going on here. I want to stop here if I'm using my debugger tool. And let's use the vs code debugger. So what this does is it runs and debugs the stack and it stops here. And then I have all of my debug options. So can actually see here, I can see the maximum and minimum that I am running with, can see the global variables, something that is very useful if you're an airflow developer. I can see all of the options that I have within the context that I injected into this task and I can perform actions here. Like I can say, well, is there already a num at this point? There is not. Num has not been defined, but I have minimum. Minimum is defined. And from the context, I think I have context ts. I have all of the context that is within ts. And I can of course, use all of the debugging. I can do a step down, step further, and now I'm within the function that is being called. So I can see what is happening inside of the function. I can step through the request, and at this point I will probably have the request ready do r not defined, but I can step through and then I return the jSOn here and say, well, what is r here? R is the response. So it's 200. What is R Json? That should be the payload. And I can get the payload. So it's 43 at that point in time. So this is really how you can prevent all of those print statements that as a junior developer, I had a lot of print statements in my code because I needed to know what is going on in debugging. But if you're using a debugging tool like this, you can just step through your code and do this in a much cleaner way and in a very interactive way. Okay, let's exit this debugger and go back to our code. Go back to our terminal. All right, so how did I do that? Because this is not working just by default. Right. You may have tried this before with your flow, Dax. I added the DAC test function. So what I've done here on line 100 is the same thing that you've seen in the slides. I added if name equals main. So if this file is run directly, I want you to run this instead. So I say the dag object. I assigned the call of my dag function to an object. I take this dag object and I use the dot test method, and all of these parameters are optional. But this just means now you can run this dag in your debugging tool, and I can run it for a specific date. For example, I can say, well, I'm worried in a year is this still going to work? Like if I'm using some date time, and then I can run it for a specific time of the year. I'm also injecting connections and variables, and I can show you how I do that. I have this folder daC test up here, and then I have my connections in a yaml format. This is just my local postgres, and this is the database that I want to connect to when I'm testing the DaG. But I could point this at any database and I also inject a variable. In this case, I inject the operation. So if I change that, for example, to change it to minus, then it would subtract instead of multiply. So I can test for different situations. Of course, you can also have configurations. In this case I have an upper limit and a lower limit for the random number. And I can just inject a different limit so I can say, well, I want the upper limit to be 20. Okay, let's run this again with the different configurations. And now it runs through. I removed the breakpoint so it didn't break, but what you can see now if we scroll up is here we got the equation 14 -23 so we didn't get the multiplication that it was earlier, but we ran it with the new variable, which was the minus operation. All right, so this is super useful if you're working locally and if you're debugging while developing. But let's look at the tests that are in the tests folder. So if we look through that, we have DAC validation tests first. And these are actually some that are generated automatically if you run Astrodef in it. So if you scroll up here, this code up here will be generated for you. And this just gives you the function that gets the input errors. So the function that makes sure that your dags are passing correctly and the function that gets your dags. And what I added here is the first one is the one that I've shown in the slides. So this is a test that makes sure that the catch up parameter is set to false, which is very very useful. And then I also added a test that makes sure that I only use allowed operators. So I say I only allow this add task decorator, which the official name is the python decorated operator and my own custom one, plus the SQL execute query operator. And this is the test that actually makes sure that for all of these tasks in my DAC tasks. So for all DACs that I have in my flow environment, please make sure that the type of the task is in the allowed operators. All right, integration tests just have a very simple one. I take this function that makes the API call that you've seen earlier can show you that it's in the utils very simple function that just calls this random number API. And it makes sure that this returns a result that makes sense for our constraints and the unit tests. Of course, if I write a custom operator, I should really unit test this. This tests the operator for all operations. It makes sure that if I divide by zero I get an error, and it makes sure that if I have an invalid operation that it also throws an error. And you can see those were three different python frameworks. Unit best built in assert and pytest. And we can run all of these with one command. So if I run Astrodef pytest, this will actually create a little container and then it will run all of tests and they all passed. Very happy. But let's see, let's make one fail a test. So we say we want catch up to always be set to false. So what happens if catch up is suddenly true? Let's rerun the test. This DaG should not get passed the test, and it did not. One test failed. Catch up equals false. This is not allowed. So this is how you can get real control over your DAX, and not just in your local situation to run it, but you can take this test and add it to your CI CD. So in this case I'm using GitHub actions, but you can do this with any CI CD tool. I'm saying I want to always run these tests when I push to these branches or when a pull request is closed to these branches, then I have some customization which tests I want to run. And I say, well, check out the repository, install the Astro Cli so we can run Astrodef Pytest and then run the test harness. So this is running the exact same command inside of the CI CD that I can run locally. We have the same thing on merge to production. And the last step here is just because I'm using astronomer, that it also automatically sends my code to my astronomer deployment using my deployment id. But if you're using a different way of running airflow in production or in the cloud, you just can replace this last step. Now to show you that this is actually working, let's hop over. And this is the repository that I've talked about. If we scroll up, it's the astronomer external talk demos repository. And you can see here this was happening on a push to a branch and it runs the exact same test. So if we scroll down, you can see we also have those twelve best that passed. We had the DAC validation tests, the integration tests and the unit tests. All right, so the last thing that I wanted to show you is that you can also do upgrade testing. So one thing that you might have noticed in the demo earlier, we had a banner up here, new version is available. This happens a lot because new versions come out all the time. So we are on ten two and there's actually a runtime version ten three available. Now I want to upgrade, but I want to be sure that my DAX are fine and I want to know which packages have had upgraded versions in this change. So what I can do is I can run Astrodef upgrade test. So let's do this. I already ran it before, but it's actually fairly quick, takes a little longer the more DAx you have, but it ran and it tells us no errors detected in your dax. This is great. This also tells you it is not running the Astrodef pytest, it's just testing your DaX against the new version because the Astrodef pytests would still fail our intro DAC for having catch up to true. But this best against the new version. And what you get is you get this little folder here. And this contains several files. The first one is can HTML file that you can open in Firefox or in chrome. And you can see that this tells you all of the details of your environment, which can be super useful, especially if you're debugging a platform issue. But we can see what I'm running on here, which Python version, the packages that are relevant, and we can see both of our dags and we can see that both of them passed all of the tests. They will work in ten three as they are in ten two right now. The other thing that I personally find super useful is dependency comparison. So there will be pinned versions of different packages for each astro runtime and each airflow version. We actually get the full pip freeze for both versions here. And this one tells you what actually changed. So you can see that we are using the same base airflow version, so we're using the same airflow, but the Astro runtime extra has changed. We did some provider upgrades of the built in providers and for example, Titest had a major update in this version. And this can be very important if you're using one of these packages inside of your airflow tasks and you may have a specific dependency or you need to update something in your airflow tasks. This will tell you, and this will make sure that your Dax do not break. Okay, so that was the whole journey of our code from local development to production and how you can test each step of the way. As a little recap, if you're doing local testing, you have two main options. You can use the Astro CLI airflow CLI commands, and you can use DAX test to debug interactively and for all of the CI CD steps. We really recommend to have a robust testing suite inside of your tests folder to run with Astrodefpy tests on every step on the way, with DAC validation tests, unit tests for all of your custom code, and selected integrated tests for APIs that you specifically want to test. The take home message airflow is written in Python. Airflow pipelines are just python code. This means you should really test it. Software engineering best practices and DevOps best practices do apply. I hope this was helpful for you. Thank you so much again for joining, and I hope you have a great rest of the conference.
...

Tamara Janina Fingerlin

Developer Advocate @ Astronomer

Tamara Janina Fingerlin's LinkedIn account Tamara Janina Fingerlin's twitter account



Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways