Conf42 Python 2024 - Online

Streaming DataFrames: A New Way to Process Streaming Data in Python

Video size:


Introducing an open-source library in Python: Quix Streams. It solves all the complexities of stream processing in a cloud-native package with a familiar Pandas DataFrame API interface and lets you work with streaming data similarly to static data in your Jupyter Notebook.


  • Today I want to talk about open source library quickstreams and how to use it to process streaming data in Python. Before Quix, I worked in McLaren and I was leading a team that was connecting f one cars to factory. To demonstrate the theory in practice, I will build a real stream processing pipeline in visual studio code.
  • You want to use streams processing if you're either getting lets of data or you want to react to that data fast, seconds, not minutes, hours or days. There are two options to choose from: using client libraries or full fledged stream processing framework like fling or spark.
  • Using Quickstream's library to process real time data in a broker. Here are the QR codes for GitHub. If you want to try to build a whole pipeline in the cloud, you can sign up for a free tier. I hope you enjoyed this talk.


This transcript was autogenerated. To make changes, submit a PR.
Hello and welcome. Today I want to talk about open source library quickstreams and how to use it to process streaming data in Python. I'm Tomas Neubauer, I'm CTO and co founder at Quicks and I want to give you a short insight into how I kind of get into stream processing and why we are doing this. So before Quix, I worked in McLaren and I was leading a team that was connecting f one cars to factory, basically connecting a track site to cloud so people can build real time decision insight in a factory without traveling around the world. And it was quite challenging use case because the amount of data from each car was quite huge, roughly 20 million different numerical values per minute. And as a result, we quickly find out there is no way how we're going to be able to persist this and query at the same time with a database in the middle. And we were sort of forced to look around for different technology and we end up using streaming and Kafka broker and we successfully connected the data and we thought we have solved the problem. But really the bigger problem was to teach all different cross functional streams that mostly use Python and Maslab to leverage this data that we had now in the Kafka, because streaming is traditionally Java based ecosystem and all the tools and frameworks and engines are built in Java, and as a result it's quite hostile to Python engineers. So before we start, let me talk you through what we're going to cover today. So we're going to talk about different streams processing architectures, pros and cons of each. We're going to talk about Quickstream architecture and then to kind of demonstrate the theory in practice, I will build a real stream processing pipeline in visual studio code using quickstreams to kind of show you how it works. Let's talk about streams processing and why you actually want to use it, because here you get one kind of a gist what use case we were solving in Maclam. So generally you want to use streams processing if you're either getting lets of data or you want to react to that data fast, seconds, not minutes, hours or days. And if you say yes to at least one of the questions, well, this is probably why you want to use stream processing. If you say yes to both, well, it's perfect. So in the first case, you might want to preprocess your data before you land it to a storage that could be normalization of your schema, joining of data filtering, down sampling and any sort of data preparation in general, because dumping the raw data into a storage is expensive, decreasing the performance of your database, and it's going to just spread on a disk lots of data and you can't afford that if you have lots of data and input. If you have five messages per day, you probably don't care. Now second, if you want to react to data fast, regardless of the velocity of your data, so imagine you want to react pretty quickly if the nuclear reactor core is getting overheated or if you getting your food delivered in two minutes, you really want to get that information to the destination pretty fast. And this is where you want to use streams processing. Now with the stream processing you have basically two different architectures to choose from at the beginning. The one is to use client libraries from the broker of your choice. If you use Kafka, there are many different libraries. For example the confluent maintain library, which is really up to date, and you will get all the latest features there. And when you do that, you're basically combining Kafka with some microservice orchestration like kubernetes by yourself. The second is to opt for full fledged stream processing framework like fling or spark. At that moment you're combining that plus fling or spark and you're kind of adding a dimension to your architecture. And let's talk about those two options now. So the first is quite elegant for simple use cases where you're just processing one message at a time, so you don't really have any context, any sort of state between the messages. And at that point you are not dragging any JVM dependency. You have different languages to choose from, so it's quite great. But the moment you need staple processing, the moment you need rely on the value of the previous message and trust me, you will end up there pretty much all the time. It's going to be very difficult. It's one of the hardest computer science problem to solve this reliably at scale and residently against any hardware failures. So this is where you might be tempted to go to something like Flink or Spark. The problem is that the moment you do that, you're just getting lots of problems. So first you're dragging a huge Java dependency to your architecture. And second, it's not really typical way of programming because you actually not using Python or Java, you're actually using Flink DSL to orchestrate the engine to manipulate data in a scalable, resilient fashion, which means that it's a different mindset really, and it also brings some problems to a day to day developer's life. Like for example debugging so let's discuss this in a real example. So here you have Flink and Quickstream's code. They will do the same thing. Get the topic with the data of chat application where people having a different, they're chatting about different topics in different rooms and you want to calculate how many words per hour, per room was done. And here with fling, you're going to connect to a Kafka topic. Now you can see here, if you look closely, I'm referring to these two jar files. Well, putting apart that it's quite difficult to found the right jar file, to put it to right place, and to know that you have to also rely on another jar file. So it's a dependency chain. But then you are not connecting your code to Kafka, you're connecting fling to Kafka. So that's why here you have the SQL statement which is basically doing exactly that. And this is the first problem. Obviously you're relying on not the official Kafka client, but you're relying on this Fling connector. And b then when you want to do some operation, let's say that you want to calculate this number of words, because we need to use a custom logic here to count the word. We are not really going to do that in Python. Here we're going to build a python function that we're going to register in fling, and then we're going to reference the function in our DSL, which means that fling would basically communicate between Python and Java with the sockets. And every time we receive new message, the data would float both directions. Now I guess you already start to feel that this is not ideal. It's not ideal for performance, for developer experience, and not for debugging, because you simply can't put a pain point here in this method. It's not running in your computer, it's running in some node in fling. So this is on the other hand an example of quickstream doing exactly the same. It fits into one page, which is first plus, but b, this function is doing exactly the same thing as the previous one. It's running in the same runtime, which means I can do this, I can just put a breakpoint in that method and debug it, see what's happening. So imagine here, it could be not one line but hundredth line, it could be a couple of classes, could be quite complex engineering, and I can debug it in my iDe. And I don't have to rely on jar files. There's no Java, which means I will not get exceptions like this. This is my own exception. I got when I was having the right flink connector, but incorrect. Kafka client jar file. And then obviously this is not the greatest architecture of how to run your code. So you have the Java environment, python environment, they're connecting to each other. There's a difference between local and cluster deployment. My goodness. So what is our approach to stream processing? What I want to show you today is different approach. The combination of Python, Kafka and kubernetes, a library that will work nicely with Docker Kubernetes, graceful shutdowns and Kafka concept to deliver highly abstracted stream processing for Python developers so they can use the whole ecosystem of Pip at Conda and really use machine learning, math, physics, all of that without the drudgery of Java engines and use their favorite ide in building their pipelines. And our goals really is to build scalable and reliable system both on the transportation of the data side and compute side, which is extensible and correct. Correct means you can rely on the output regardless of the hardware failures, network outage, et cetera. So this is the basic architecture and you basically see that Kafka, where you have replication of data and horizontal scale is then being consumed by your compute side, where you have the containers running in a kubernetes, in a stateful set underpinned by the PVC, where you have your state backup by changelock topic. And when you need more compute you add more instances of your Python microservice. When you need more transportation of your data, you add more nodes in your broker. All it's good. And then is how we're going to access this library, what is going to be the API? And because Python is a huge community of developers and huge portion of them are using pandas for offline analysis in Jupyter notebooks, we have decided to create a library which would have interface like pandas dataframes. So you using the muscle memory that you learn on batch in streaming and you don't have to relearn new API. So today I want to show you this in action. And actually I want to kind of demonstrate the theory in practice by programming the crash detection microservice. So we're going to look at the sensor data from this app and we're going to create a threshold detection. If we go over a certain number of gforces applied to this phone, we're going to create an event of a crash. So let's get to it. So now we are in quicks. When I have my pipeline that I want to show you today. Here we have a web gateway which exposing an API which this application on my phone would use to send the telemetry into the pipeline. So I'm going to start the pipeline here. And you should see that this is basically getting green. Amazing. And if I go to Messages, I'm getting messages through this endpoint to the topic. If I get just one random one. Now you see that it's a payload in a JSON, not really that friendly for processing, but we're going to solve that in the next service. In this service we're taking this raw data and we converting it to much more tabular type of data like this. Cool. And then we have third service called features which I have prepared for kind of build something new here. So I will clone this local and go to visual studio code to get coding. So we are now in visual studio code when I'm running my dev container. So I have exactly the same environment as I would have in quicks when I deployed this to the pipeline. And now we're going to develop the code that would read the data from a sensor in a phone and look at the actual meter readout. And if we met some threshold we can create alert. So let's get to it. So I'm just going to do a python free main API to see what is in a topic. Great. So we have some data. So let's stop it. We see here the accelermeter, the dimension. So let's just make a filter where we make sure we're only getting rows where this is present. And then I see we have a timestamp in epoch from 1970 nanoseconds. That's not really that readable for the development. So let's convert that to a string. It. So it requires seconds, this method. So I'm going to just divide it and then let's print it. Yeah, it would be probably better if we make this a string. Amazing. And a table like view is probably going to be a bit nicer. So you see all three dimensions and a timestamp which is current time. If I shake you see it's going that values up and down. So first thing we're going to do is calculate a new column which is going to be the accelerometer, absolute value from all dimensions. That will give us kind of a sense regardless of the rotation of the phone, how hard the forces are. And let's just print timestamp and this new column. All right, so if I take the phone into my hand. Oh great, we're getting high numbers. So that is almost where we want to be, but this will be susceptible to kind of bumps when you hit with your bike, a bump or you just get a random shaking of your phone, it would not really be a real crash. So let's look at let's say one or 2 seconds long window where we accumulate the forces applied. So if there is a continuous forces applied to the phone like you are falling from the rock or something, we're going to create an alert. So for that we're going to do a hopping window, so we're going to do extraction of this parameter first and then hopping window of at 2 seconds and we're going to emit the row every 250 milliseconds and we're going to get a sum and we only want a final row of each window. So let's run it. Cool. Now obviously we get the start and of the window and the value to make it a bit nicer to read. Let's do the same transformation here with the startup. And now you can see that this data being generated four times per second. So that's great. And if I start doing some simulation of the crash, you see that we accumulate this value. So now we are kind of in a good position to just output this result to output topic like that and we can just deploy this to our pipeline and connect it to our APIs or front end. And so this is how you use Quickstream's library to process real time data in a broker. Now I hope you saw how simple is that? So give it a try. Here are the QR codes for GitHub. So you can try this library or look at the code of the library, or if you want to try to build a whole pipeline in the cloud, you can sign up for a free tier of our cloud plus form. I hope you enjoyed this talk. And if you have any questions, we have a community slack when you can discuss anything with me or my colleagues, and if you have any problems with our library when you're kicking the tires, just let us know, we won't block you. Thank you very much for listening to this presentation and enjoy the whole conference.

Tomas Neubauer

Founder & CTO @ Quix

Tomas Neubauer's LinkedIn account Tomas Neubauer's twitter account

Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways