Conf42 Python 2024 - Online

Building Apache NiFi 2.0 Python Processors

Video size:


Let’s enhance real-time streaming pipelines with smart Python code. Adding code for vector databases and LLM.


  • Tim Spann: I'm going to be talking today, be building Apache 9520 Python processors. One of the main purposes of supporting Python in the streaming tool Apache Nifi is to interface with new machine learning and AI and Gen AI. He says Python is a real game changer for Cloudera.
  • You're just going to add some metadata around it. It's a great way to pass a file along without changing it too substantially. We really need you to have Python 310 and again JDK 21 on your machine. You got to be smart about how you use these models.
  • There are a ton of python processors available. You can use them in multiple ways. We're still in the early world of Python processors, so now's the time to start putting yours out there. Love to see a lot of people write their own.
  • When we are parsing documents here, again, this is the Python one I'm picking PDF. Lots of different things you could do. If you're interested on writing your own python code for Apache Nifi, definitely reach out and thank.


This transcript was autogenerated. To make changes, submit a PR.
Hi, Tim Spann here. I'm going to be talking today, be building Apache 9520 Python processors. Welcome to my talk here at Comp 42 for Python. I am a developer advocate covering all types of things around streaming, big data, LLM, gen AI, all that fun stuff. And you can get in touch with me through medium, GitHub, whatever Twitter's called now my blog, all over the place. Every week I have a newsletter that comes out. You can get it on GitHub, LinkedIn, it's all over the place, all the streaming, mLai, open source tools, lots of fun stuff. I also do meetups around the New Jersey, New York and Philly area, plus virtual plus. All the content gets put on GitHub and YouTube and Slideshare. So you'll be able to get it out there if you can't make it to an event. And I've got one coming up soon in New York and in Princeton in Boston. One of the main purposes of us needing to support Python in the streaming tool Apache Nifi is to be able to interface with the new machine learning and AI and Gen AI. They have a ton of awesome Python libraries, as you probably already know. And to do that with Nifi before was difficult with Java. So with the addition of Python, we could do a lot of cool stuff, integrate with a lot of different libraries. So that's pretty awesome. Generally we use Python to do some of our processing on unstructured file types, which Cloudera dataflow can ingest structured sources. We're handling those with Nifi to the box. Same with a bunch of enterprise data APIs, application streams, all that data, we get it in, and then when we want to push it to an AI model or different vector stores. The best way to do that is with Python processors, and we'll show you some of those features. Not going to go into generative AI, but generally we work with it around text to text, text to image, text to speech. And getting that data processors chunked up, vectorized and ready to go is usually done with Python, some of it in standard Nifi. But Python is a real game changer for us. Within Cloudera we're dealing with a lot within know the major llms out there, whether they're closed like OpenAI or they're open models like Mistrial or Metai, and a bunch more in hugging face, as well as some through Google work with all the major vector databases. I'll show you a little bit with pine code and Chroma, but Milvis and solar and all the other ones are out there we do a lot through hugging face because of their open community and open model really gels well with our open source and our open community model there. We work with Langchain and that's in some of our pre built python processors and we're doing some things with llama index. Lots of cool frameworks out there every couple of hours I think there's a new one. There's a new Google model just came out which looks pretty awesome. That's pretty standard. Up top we have generative AI applications and we make these prototypes available for you in GitHub so you could try them out. And we work with different closed source foundation models, whether that's through OpenAI, Amazon, Breadrock, IBM, Watson, lots of different ways to do that. We have model hubs, we can find two models and we work with the major open source foundation models like Metal, llama. Two we could work with the managed vector stores. Pine cone right now is the main one, private vector stores like Solar or Milvis. Underneath that we provide a massive open data lake house that does the real time data ingest and routing of this data. The data wrangling which you can imagine has a lot of python Spark, flink, Nifi, all those fun tools. We store the data, help you visualize it, have the training and serving of the models and we run this where you need to be, whether that's public cloud, private cloud, hybrid, multi cloud from AWS, Azure, Google, red hat, wherever you need to run. And we work with all the cool fast hardware out there, whether it's Nvidia's Google stuffs, AMDS, IBM, Intel, Dell, all those are partners and friends and we're know really cool stuff with lots of open source and Python everywhere. And obviously model hub's like hugging face. Now Nifi is a great tool for working with Genai and having that support for Python just brings us to that next level. We could do a lot through rest and through Java, but being able to add that next step now of Python as part of our pipelines makes it very easy for us to do a lot of stuff with lots of different data sources. And I've got a ton of articles out there working with things like live question and answers, travel advisories, weather, tons of different formats, ingest that real fast, enrich it, do real time alerts if necessary, push things to vector databases using Python, get things into prompts for LLM wherever they have to run, push real time alerts and data into Kafka and also write into your data warehouse so you'll have petabytes worth of data to train your models or to act as additions to that. As we've seen, some of the models out there will not always give you back the perfect answer. And the best way to augment that is with rag. And we could support that by having data in our data warehouse, in our vector database. And we can cache things and we could pre compute things. A lot of advantages of having a lot of data, all of it connected real time together. Build out real time reports and visualizations and alerts and aggregation with lots of the latest open source tools that run in this very mixed environment, regardless of what you need there. And if we're working with data flowed in the public cloud, we make it really easy. Go to our ready flow gallery, press a couple of buttons, boom, deploy a flow and get ready to start taking data from a database. Pushing it to s three, that might be the first step in your real time gen AI pipeline. Again, we mentioned before lots of ways we work with LMS, help serving them, fine tune them, get data into and query the vector data stores. But the main place I work is within data preparation and data engineering. We make sure all this data gets processed in whatever size cluster it needs to be, all the way to the edge from wherever it needs to be, whether it's on cpus or gpus. All that with a lot of open source. Keep that knowledge in a secure repository that can span multiple public and private clouds, and make sure you have everything you expect to have with that data storage. Nifi 20 is the latest and greatest version of Apache Nifi. This has brought us the magic of really supreme Python integration. Before this we could work with Python, but it was a little kludy. We'd execute it and it would kind of run on its own. Not the best way to do that. We now added support for the JDK 21, which is a lot faster, leaner, and better use of threads. And we've added some other enhancements along with that process. But let's look into some Python processors and pretty easy to write your own. Basically, take any if you have a library you want to wrap and you can see my examples is pretty much what I did. It's a pretty base plate. Here you have a Python class. You import these things you see at the top that you need for building this type of class. We point to the Java that we need to point to that connects Java and Python together. We give ourselves processor details. This tells me what version I work with, a description. You'll see that when we go into the tool where that pops up some tags there to say what this is related to, and more importantly our property descriptors. These are the properties that we want to have inside of our application, and these are important ones, as you can see here. I want the prompt that I'm going to send to for this case Watson x AI API, and I'm going to need the key and I'm going to need the project ID for this particular project that I have rights to execute. And this is the main body of the Python code. Pretty straightforward. This doesn't change much. You start off with the definition name is transform self, context and flow file. Those are important. Context gives me access to lots of things in the environment. Like you see the properties. The flow file is any file that's coming into the system. If I want to change it, remove it, make my own. It's important to get that in. So I get those properties in, set my credentials. I have some libraries from IBM to use. I call it convert the response to JSON and just return it here with the success. If we had a failure, we can have a relationship of failure. A couple different options on those. Our contents are what we're outputting, which in this case is the output of this call. And we could send any attributes we want. Here I just set the mime type. You could also, and you'll see in some other examples, I don't change the contents. I give you whatever flow file you gave me. Just pass that along, give you some attributes. That way you don't have to change whatever data was coming in your pipelines. You're just going to add some metadata around it. It's a great way to pass a file along without changing it too substantially, which is fast and just putting things you might need around it. If it's small data or something that augments it, you could decide later if you want to rewrite everything. So an example of one that doesn't change what's coming in is processor I wrote for this one you need and for everything with Nifi. Two, Python 310 is the minimum, which you're probably at, but I know some of the older machines don't have it. We really need you to have Python 310 and again JDK 21 on your machine. This uses libraries from hugging face, NLP, Spacey and Pytorch. This is based on an example I found in stack overflow and it's pretty cool. What it basically does is you pass in some text and you get back one or more companies and I've got a couple of attributes get returned. One is a list of all the companies that might be listed in your data you passed in. And the first company I put in a separate attribute. Pretty straightforward source code is listed at the bottom and we'll run through some examples. So you see it running, but it's a nice little processor to get you company names. I use this in some flows where I want the company name, and then I'm going to call a rest endpoint that gives me the stock symbol based on that company name. And then I could use that stock symbol to get stock values. Cool stuff to have. Also, sometimes when you're doing something in slack, someone gives you a question. Maybe it doesn't need to go to a large language model every time. Maybe sometimes it's a lookup from a databases or it's a call to a rest endpoint to get the current stock. You got to be smart about how you use these models. Don't use them for things they're not designed for. Another cool one. If you're doing any kind of transit data, and if you've been watching my blog, you'll see I've been doing a lot of transit data. They have a lot of data. It's always in motion. Lots of interesting stuff. Real data, real world and all over the world. Well, some of these return more than one type of data, which I found out and that can be problematic. So for me, I wrote this compound gtfs data reader takes a URL header and a key if you need it and you tell me what kind of feed you want here. There's three types in the GTFS spec. GTFS is a transit spec out there, trip updates, say when the bus or subway or whatever it is has an update vehicle tracks the vehicles and alerts or something happens which happens again. The links are down there, could try it out. Someone gave me a file full of weird looking data I've never seen before. It's this webvtt. Well this is a web video text tracks format. Well this is for when you're doing training classes or you have text that go along with a video. Well, if I want to process this in a generative AI or LLM, I can extract just the text out there. I don't need the number or the timestamp in there for just grabbing that data that I may want to push to a vector data store or I might want to use in a prompt whatever I'm using there. And I've got the processor for that. And you can see the API there mentioned earlier. And I showed you that code. The Watson XSDk to access their foundation LLM models. This one does the inference, it's secure and it's the official SDK. Other ways to do this, and I was doing this through rest, but having this as Python with the standard SDK is nice. And this runs fast, pretty easy to do, and we saw those parameters before and it fits really nice in a streaming pipelines get a nice way to do that. Now another one that's just wrapping a library is my system process monitoring Python processor. Again, Python 310 and more. This is accessing PS util, which is a great python library. And I'm just grabbing and running as many of these that make sense in a context on multiple machines, which if you want to tweak this source code is available on that QR. But you see where my GitHub is, you'll be able to find all this stuff and this outputs all of these different results as attribute values. Again, metadata not messing up your data. Nothing changes for you, which is great. Another one. Sometimes you need to test things, sometimes maybe you're doing a demo or you just want synthetic data. Well, the faker library is pretty awesome. I have it so you could choose as many of these as you want. True false. There is a ton of them. I'll let you get one of each right now. It goes to attribute value. Maybe I'll create an output flow file. I'm still debating that. You let me know which one is better for you if returning a flow file. Maybe I'll create a second processor that does a flow file. Or maybe we'll put another parameter that picks. Let me know if that matters to you, how important that is for sometimes I need wiki pages. These are useful in a Genai pipeline if I want to train a model, add some augmentation, look it up, add that to the prompt as part of the rag or prompt engineering. I use the Wikipedia API in Python and it lets you choose whether you want the HTML back or the text. And you could pass in what wikipage dynamically. Again, just use that QR code if you want to see it. The example grabs, that company we parsed with the extract company grabs their wiki page. Just showing you how you could use these in multiple steps. Again, I'm learning to write new Python processors here based on things we need. And before I added that fancy compound one, I have a basic one. This is the most standard transit URL. They return one of these three types, and usually it's in the name. It's like MTA, blah blah blah blah blah, alertsgb or Proto or gtfs or whatever it is, and they just have one. So this returns the JSOn for you pretty easy. Takes gtfs format, makes it easy to format and grabs it from the URL. Couldn't be easier than that. Now there's a bunch that come prepackaged in Apache 9520. I've tweaked a couple of these based on some of my needs, but there's ones for pine cone to do queries and to store your data. There's a great one for chunking up documents to make them small enough to fit in the vectors. Right? There's a parse document one. This is great. This is for parsing your markdown, PowerPoint, Google Doc, Excel, parse that data, get the text out. Again, great for pushing that to a vector store using that as a prompt, just storing that in a database, using that part as a slack message, whatever. Got another one that converts CSVs into Excel if you need Excel, one that does our favorite deep learning, some object detection. And right now I'm working with Yolo eight, so I might run write a new one on that. The best and most useful to have one Python processor do all this for you. The prompt chat GBT you put in your text prompt, couple parameters your id because you got to have access to OpenAI and calls, the model gets your results back. Really nice. Thank you Python. And then the same thing for the Chroma vector DB. There's a put in a query. Those are great ones as well. We're still in the early world of python processors, so now's the time to start putting yours out there. Love to see a ton of people after this conference write their own. Definitely contact me if you need a little help, and if you have some, I'll publish them in the weekly newsletter and I'll tell the NiFi engineering team about them. Maybe they'll go into the official line or they will definitely be promoted and used. Example of a flow we get slack messages in transform and clean them. Query a vector store, build my prompt, call my model with python, transfer and clean the results, push them to slack in kafka. Pretty easy. We will show you some demos here and then let you get on with the many more talks that you're seeing today, which is a pretty awesome list. This is 9520. This is my environment running on my laptop, and in this example I consume a feed from medium, which happens to be my feed to get all my articles and process them, clean them up. I do some of this with regular nifi processors, nothing too fancy. And then when it comes out here, here we start getting into these fancy python ones. As you can see, Python extensions, this one is taking plain text in and extracts a couple of metadata fields along with the data so it parses the data out. We could do one here, I've got a couple sitting here. Let's just run one, have that come in and then we will parse that. Now if we needed to look at all the python processors, I can just type python. You can see there's a couple versions of different ones in there. Can see my extract company name, fake record a bunch of different ones in here. Pretty straightforward. Okay, that finished this parse document, finished the chunk chunk is also a python one, again to make things fit into a vector. So this processed them up, chunked them up, and now we could push these to say a chroma data store with some parameters there. Pretty easy to do that. Just run one there. Same thing here. If the chroma happens to be remote, same idea query, pretty straightforward, very easy to work with this without too much trouble. Down here you could see we have some other stuff going on, so I'm going to let one through so you could see the other processors. And after that gets pushed, this one is the extract company here. I parsed in the prompt. Now this is one way to get data into a processor. The other way is it could come in as a full flow file. Depends what makes sense for you. If you look there, you could see it ran right now and I could see the output here and the results. What came in, what came out pretty straightforward. This one didn't work because it probably didn't have a company name. Some of these don't have a company name, but you do have always have the values coming out of our utils. So we'll get back any of the things that make sense for the different util. They're pretty straightforward. If you're using any of the system processing. We get the PS util stuff out here, we got some output from the wiki, and if we see here a couple different attributes come back from the wiki. Yeah, you could see in here where we had the company name. If I have a company name, then I can get a wiki page and then just have it output. But as you can see, we could put a ton of parameters here, very easy to do. There's the pids on the running, there's our text, there's our results here, and it was an HTML page. We get back again, there's a couple of different formats you could pick when you're grabbing a wiki page. And now I could push it to something else if I wanted to. Depend on what makes sense for your use case over here, I've got another use case. If we didn't let this guy time out, make sure these guys are loaded so we get those ready. Okay, so this one I'm using that gtfs processor here. And this is calling Halifax, Canada. Their vehicle positions. So I'll run this once. And that already finished. And that gave me back the results here. 500 bytes in, can't really see that till it gets split out. And you can see we get a lot of rows came out of there and we could take a look at one and see that it got converted into JSoN. That tells me about that vehicle and the trip and how fast it's going. Different data around those attributes. Pretty straightforward. Now when we are parsing documents here, again, this is the Python one I'm picking PDF. Let it parse it the way it makes sense. There's a couple of options there. I like this PDF parsing model. This one works pretty good for me. Just English. Add a couple of metadata fields. Not really, I don't need them because here I'm going to split that data out into pieces. Like we could take a look at a chunk, that's a chunk of text in a JSON format. Then I run that, get a chunk, and that chunk I'm going to push to a vector store so it's not too big to go into the vector databases. And then the results of that will go into a slack just to track it. I've got one to do that. Translate wvtt. Don't need any parameters. This just takes the whole file coming in and converts it to just the text again. And then I could push it into a prompt if I wanted to. Pretty straightforward, whatever makes sense. I could parse powerpoints. Lots of different things you could do. There. Example is using the prompt chat GBT. I'm using the turbo, get my inputs parsed in temperature, API key, all that kind of fun stuff. So we could run one of those and get the results back. This is coming from slack events. Pretty straightforward on that. And then I push that to a slack channel and we could take a look and see the results of that. We'll go to. Here is where it got the chat here it posted the results and we could add other stuff to that. But pretty easy to use that Python processor there. It's just an easy way to be able to get your data. Move it really quickly. Do what you need to do. Thanks for listening to my talk. If you're interested on writing your own python code for Apache Nifi, definitely reach out and thank.

Tim Spann

Principal Developer Advocate @ Cloudera

Tim Spann's LinkedIn account Tim Spann's twitter account

Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways