Conf42 Large Language Models (LLMs) 2024 - Online

Adding Generative AI to Real-Time Streaming Pipelines

Abstract

Let’s build streaming pipelines that convert streaming events into prompts, call LLMs, and process the results.

Summary

  • Tim Spann: My talk is adding generative AI to real time streaming pipelines. I'm going to discuss a couple of different open source technologies. We'll touch on Kafka, Nifi, Flink, Python, Iceberg. All the slides, all the code and GitHub are out there.
  • Llm, if you didn't know, is rapidly evolving. There's a lot of different ways to interact with models. That enrichment, transformation, processing really needs tools. The amount of models and projects and software that are available is massive.
  • Nifi supports hundreds of different inputs and can convert them on the fly. Great way to distribute your data quickly to whoever needs it without duplication, without tight coupling. Fun to find new things to integrate into.
  • So what we can do is, well, I want to get a meetup chat going. I have a processor here that just listens for events as they come from slack. And then I'm going to clean it up, add a couple fields and push that out to slack. Every model is a little bit of different tweaking.
  • Nifi acts as a whole website. And as you see here, it can be get, post, put, whatever you want. We send that response back to flink and it shows up here. Thank you for attending this talk. I'm going to be speaking at some other events very shortly.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi, Tim Spann here. My talk is adding generative AI to real time streaming pipelines, and we're here for the large language model conference at Comp 42, which is always a nice one, great place to be. I'm going to discuss a couple of different open source technologies that work together to enable you to build real time pipelines using large language models. So we'll touch on Kafka, Nifi, Flink, Python, Iceberg, and I'll show you a little bit of each one in the demos. I've been working with data machine learning, streaming IoT, some other things for a number of years, and you could contact me at any of these places, whether Twitter or whatever it's called, some different blogs, or in person at my meetups and at different conferences around the world. I do a weekly newsletter, cover streaming ML, a lot of LLM, open source, Python, Java, all kinds of fun stuff, as I mentioned, do a bunch of different meetups. They are not just in the east coast of the US, they are available virtually live, and I also put them on YouTube, and if you need them somewhere else, let me know. We publish all the slides, all the code and GitHub. Everything you need is out there. Let's get into the talk. Llm, if you didn't know, is rapidly evolving. While you're typing down the things that you use, it's changed, right? So trying to keep up to dates a lot. There's a lot of different models, and I'll show you it doesn't really matter how, you know, how we integrate, because we could do it all through a couple different ways with open source and the models is some closed ones, we'll integrate with some open ones. Which one makes the most sense for you? Depends on your use case, but we'll integrate with GPT 35, we'll integrate with Olama running some models. We're running with Mistral and mixtural, working with pine cone today and chroma. Touch on those a little bit. There's a bunch of other ones out there, all of them pretty good. We do a very little bit with lang chain. It is used by some of my processors that are written in Python, which are integrated through Nifi, and we're using a bunch of libraries like transformers and also hugging face hosted models, as well as models hosted by Watson X and some hosted by Cloudera. There's a lot of different ways to interact with models. We'll also do Olama, which is running on my laptop along with my whole open source stack, so I wish I had more ram and maybe a GPU in there, but you do what you can. Lots of different use cases. If you're at this conference, you probably know them. We're not going to touch on all of them. They take forever. But the same ideas, though, get your prompt ready, send it to the model, get your results back and do something with them. That enrichment, transformation, processing, all that you need to do around prompt engineering really needs tools. So you're not handwriting, everything. That's why we like to use something like Apache, Nifi, which I'll show you can help you with all of this, whether it's getting stuff into your vector store, helping, cleaning up, calling multiple models, you know, maybe code assistant, step one. And then you need to summarize documentation and translate it into another language. You know, maybe you need to build some content, push it somewhere. NiFi really helps enable a lot of those use cases. You know, this is probably more important in this decision. You could change, you know, maybe you start off with a closed model because they are probably the best at this point. They do a lot of good stuff. But then you're worried, can I get the data there safely? Maybe I can, maybe I can't. Is there data? I don't want to be used in their models. Open source, because we love open source. We love these open models. That works really well with our ideas around open source tools for streaming. So it makes sense. But if that's not what makes sense for use case, you know, make that decision. What's nice is with Nifi I can run and I'll show you an example where I have like four or five different models in different locations that I could decide or not decide when to run. Maybe when one is cheaper, maybe when one is better for certain use cases, you know, you make that decision if you're going to do this. In the enterprise cloud, Arrow works with AWS and IBM and Pinecone and Nvidia and hugging face and ray to make sure you can do everything you need to do to work with the foundation models, have the proper search and vectorization, have as much GPU powered performance, you know, all the right tooling, and run all the different compute options that you need. And again, the amount of models and projects and software, and software as a service and open source tools that come available is massive and it's changing constantly. New versions of things. You're like, oh, this is the coolest new model. It's like, ooh, Google Gemma is the new one. Oh, no mixture. Now this, it's, it's fun because there's always something new going on, and probably during this conference there'll be two or three new things. So I'm keeping my eyes open, I want to see what comes out. But the middle orange bar is where I'm mostly working, is getting that real time ingest, enrichment, wrangling, transformation, all of that. Get that data stored, get the data to the model, whether it's for training, whether it's for context, for prompts, whether that's calling the model, get results back, get your classification all important, whether that's in private cloud, public cloud, hybrid cloud. So maybe you need to run on Amazon Azure, Google and your own red hat Kubernetes. You could do that. Doesn't matter if it's Google specific hardware. AMD, really good chips, Nvidia, IBM, Intel, Dell, everybody's got good stuff that helps you out there. You just got to know that you're using the right framework and libraries and versions of Python and Java that are tweaked for those things. And we definitely have people that can help there. Get all your models out there and running common use case. And this is where NiFi really shines for this price process is interacting with people and data and for, you know, automated tools, they may not know the difference. Now you know the difference if you're doing live QA and I show you that through slack because you're typing it. But that data could be done by another bot. That data can be coming from a database, it can be coming from rest, endpoints, documents, social media transactions, public data feeds, s three and other files somewhere logs, ATM's, other live things, weather reports, wherever that is. We're getting that data in, sending data out when it needs to go out, you know, cleaning it up, get all the enrichments, do any alerts that need to happen right away. Get things vectorized, chunked, parsed and into whatever database or data stores it needs to go to get it to the right models, wherever they may be, whether that's in clutter, machine learning, Watson, 100 other places. Get that into Kafka. So we could distribute it to any information that needs to be shared and shared instantly with as many people in as many places as possible. We get it into Kafka if that needs to spread, if that can't be accessed, because maybe it's in a very secure internal Kafka, we could replicate that one to ones in the public cloud so it could be shared with other applications systems. Great way to distribute your data quickly to whoever needs it without duplication, without tight coupling. It's really nice feature. Get it stored, get it enriched, and we'll show you that in examples. And a common way to do that is CDC. And we can pull that CDC instantly, get it working like we mentioned before, get all those llms together. A common thing that we do as part of these flows is ingest. We're ingesting and that could be, you know, constantly a stream that's getting pushed, we're pulling. It could be Cron's, it could be one time grab documents, messages, events, files, whatever it is, it doesn't matter. Nifi supports hundreds of different inputs and can convert them on the fly. We can parse documents very easily. Thank you. Unstructured IO, chunk them up into the right size pieces so that it'll be optimal for your different vector store or if you're pushing it somewhere else. I mean, the nice thing is now if I could send it to as many places as possible, get that into the stream, whatever we need to do there, the data pipelines, getting that external context when you need it. So I type a question, what is Apache nifi? Grab some external context so I could pull all of my recent articles from medium, get them cleaned up, enriched transform, parsed, chunked, vectorized and available so that when I ask a question, if that's already you there, I pull that, add that as a context with my prompt, clean that prompt up, get it in the format that's needed by whoever's model I'm calling, you know, a llama. Slightly different from hugging, face rest versus Watson versus Cloudera. Get it in that right format, probably Jason. And make sure it fits, you know, make sure it's not too big, get that to them, get, get those results back, maybe start caching at different layers, maybe that goes into a database, maybe that goes into ram, lots of options there. And do the round tripping for you so you don't have to write an application for everything. You know, someone type something in discord or arrest or comes from Kafka, you can pull it from a database, pull it from Slack and send the messages out. Whether it's going to email slack, a database, wherever you need it to go, we'll send it there pretty easy and we could deal with all your types of data, even if it's zipped up, tarred up, if it's images, videos, documents obviously is a big one for most of the use cases for large language models, so, but it doesn't matter. Thanks to unstructured IO and some other processors in IFI, I could deal with HTML and Markdown and RSS and PDF, word docs and Google Docs and RTF and regular text. And I added one to do VTT. If you know about that format, sound any kind of the feeds from social and XML too, and we can figure out what type it is, chunk it up, store it, parse it, do all those things you need with that unstructured and semi structured data interface with whoever you need in the cloud or on premise or in your private kubernetes, all the major ones. If it's not listed, it means we haven't tried it yet. Definitely reach out. I'm always looking to find new things to try and integrate. Fun stuff out there. Now if I just got into version two, this is the one you want to use. This will be official production release very shortly, possibly by the time you're seeing this. It's got hardcore python integration. So I can run really cool stuff in there. Really cool way to deal with parameters in it so you can do a lot of interesting DevOps stuff using the latest JDK. If you're not a Java person, that one has a lot of enhancements that makes Java incredibly fast. If you saw that 1 billion row challenge out there, really cool stuff there. And just to show you how fast it is, I also recently found that we can do real time integration with models while we're running real time SQL flink. SQL is a real time SQL on top of fast data such as Kafka, such as Pulsar, such as real time streams. And we can also grab data from things like Kudu and Iceberg, and JDBC stores like postgres, Oracle in real time or through CDC. But what this means is by writing a simple function, I can call a model as part of a query, and I'll show you that it's really cool. And I could do that whether I'm doing that directly to something like cloud error, machine learning, or if it's something that takes a little more enrichment, like that process we were mentioning. So I'm going to have Nifi host a rest endpoint for me so I can have this tool, call that, and that'll do all the cleanup. Make sure your prompt is nice and send that over to say hugging face and get your results and send them back to you so you could use them in your live system. Got a link to the article here and an example of the SQL that we have there. Pretty straightforward. Like I mentioned, I'm working with the Gemma model so you can access that. Take a look at that example again. The difference for using one model versus another. Not really that much for me. Now, I mentioned being able to use Python. We'll go through that pretty quick. If you saw the comp 42 python, I explained that in detail, how to build your own python processor. This is that VTT I told you so I can parse that so we can grab those web video text, use that to either put in your vector store, enrich prompts, or other stuff. We can call Watson right from there, create our own fake data, grab wikipedia feeds and get them in a clean format. Very nice. Saves a couple steps there. This one is important for me because I found sometimes I want to pre process a prompt, and the reason I might do that is someone might ask a question that I don't need to send to a model. Save me some money, save me some time. If you don't need to use a large language model, maybe use regular ML or regular small model. Or in some cases just do a rest call. Like if someone goes what is the current stock price for Amazon? Don't send that to chat GPT. So I parse out the company names with some cool libraries out there. And then I could do another thing where I can look up the stock symbol, get the current stock price and send that back. That is faster and cheaper than calling a language model, even on premise. And I have that in some of the articles. It's very easy. Now I've also added a new library using salesforce blip image captioning to be able to generate captions for images. Because remember, I could have flows full of images and I use this as part of parsing out things when I do meetups. And I think it's generally useful thing to have and I could change the model if there's a better model for that. I also added one for Resnet 50 just to do classifications. Pretty useful for flows. Again, as part of that multi step process, I might want to caption an image. I might want to make sure it's not problematic image. So I'm using this one to make sure we don't even deal with those, we just delete them. Maybe I need to see people's face in the image again. Getting more data could be helpful before you send things downstream to other models. And again, there's Python processors to do chunking, parsing, calling chat GPT, working with vector stores, just to give you an example there. Let's get into some demos. I just have some links to some other articles here, but let's get into the demo. You don't need to see any more slides you will get these slides and they are fully available for everything you need there. So no problems. Very easy. So first thing we do is this is Nifi. It is running for us. And what we can do here is, you know, if we wanted to build new stuff, we can add any of these processors. If you want to see the cool new Python ones, just type in Python. There's a bunch of them, as you saw in the list there before, like extracting company name, just to give you an example. So what we can do is, well, I want to get a meetup chat going. So I have a processor here that just listens for events as they come from slack. And there's a couple tokens you got to set. I have an article shows you how to do that. It's very easy to set up those. So here is my slack group. If you're interested, you could join it and you could use it to ask application questions. Let's see if it knows this one. I don't think it does. When is the comp 42 large? Should I do LM lm conference? So what happens next is it comes in here. As you can see now it's running because it just got messages. And just for fun, I store them in a database. Oh, unless someone stopped this. The nice thing with Nifi is you could start and stop things either manually as you see through this, or through a DevOps process. That could be rest or command line. You get the idea. But let's make sure that data went through. This is data provenance. This is really good because I don't know what that black box LlM is doing, but I could tell you exactly what I did to prepare that prompt when I sent it, how big it was, and what I got back. So you don't have the provenance inside that model, but I know the providence getting there and coming back. So that could be very helpful. So this is my message here. It's me because I typed the message in. There's my input, there's an id timestamps where I am in the world. Helpful stuff. So that message went in, I saved it in a postgres database. I also pushed it into Kafka for slack messages. And you'll see why when we get to that next step. So we've got that data again. We're using two parts of the process already, and then I'm going to do the prompt cleaning and engineering. So I'm just going to pull out the fields from that slack message that I want. If this is coming from something else, you might have a different extraction, but pretty easy. I make sure that it's not myself. You know, I don't want to reply to the reply we send to it. Filter this out. I forgot to mention this model. There's a model from hugging face for not safe for work text, so I want to parse them out. I don't want those. Oh, someone added another step here. Yeah, I was running a demo. I wanted to look at those. We were just going to delete that, which is the awesome thing that we started up. We have these configurable queues that run even on a big cluster and then make sure things don't go somewhere. Oh, one of the questions was not safe, so that didn't make it through the system, which is fine. And here we're going to query pinecone with that input, see what we get back, extract the results there, build up our new prompt, and then figure out who we want to send it to. Do we want chat GBT to get it? Do we want someone else to get it? I've been just doing mixture most recently, but I also have mistral running on Olama. And down here I've got Google Gemma running again, pushing the cleaned up prompts to Kafka as well. So we come into, into here and we format the prompts exactly with some instructions added. You kind of need that for mixed roll eight seven B instruction. I mean, every model is a little bit of different tweaking. So then we call it, and if we look at the results here, we could see what we got back from there. And then I'm going to clean it up, add a couple fields and push that into a different topic and also push that out to slack. While I say this, we got our threaded result back and see if it's any good. Comp 42, April 11 oh, and it's got the link. Oh, look at that. Okay, I must have had that. Remember we looked into Pinecone. Well, that's where I put my medium. And in my medium I've linked to the conference because I'm speaking here. So I like that. Pretty good, pretty good thing. So we got the data and we got it back. And like I said, we could have sent it there, could have sent it to Watson, Google, Jim. Lots of different options here to get things back to slack. I have just a little cleanup and I just, you know, add that generated text and I push it to the right channel. This gives me the right timestamp. So I'm in that thread. You don't have to thread the results back. I put a lot of extra metadata in here just for myself. I don't need to put this here. As you can see, it tells me what model, you know, how long the compute took, all those sort of things. So if you see that this is just metadata for me or just to be interesting. Oh, it was mixtural. It took 100 tokens, you know, that kind of stuff. We could filter that down if we don't. So remember I said I pushed stuff to Kafka. Let's get there. These are these Kafka messages I pushed out there. Well, I can do real time queries on them with flink against those Kafka messages. And if you look here as part of that, I'm passing in that message text and sending it against a function which is this function here which calls into Nifi to get the results back. And then we get there's that Kafka we saw. So it is running and getting the generated text here which are results for that. And you can see the flink job is running. If I had a big cluster it could be running there. Not much going on here, but I have a materialized view. And what's nice about that is it produces all this data as a JSON rest endpoint so I can ingest it somewhere. Now like I mentioned here, that USB is calling out, that is calling out to another process here. And this is how Nifi acts as a whole website. So it receives a message. And as you see here, it can be get, post, put, whatever you want. I can limit that. And then I parse what they send in here. I'm going to route it because I want to send to LLM and I have Google Gemma reply to it. So Google Gemma, clean it up, make my prompt, call Gemma, get the results back again, push it into Kafka so I can, you know, have a copy for someone else. And then we return it to that rest endpoint which is the response here. We send that response back to flink and it shows up here. And as you see, that ran pretty quickly even with all that round turn. That's really it. When I want to show you. Thank you for attending this talk. I'm going to be speaking at some other events very shortly. If you're in the air, if you're in New York or Seattle or Boston, say hi. It's been really good showing you what we could do today. Thank you.
...

Tim Spann

Principal Developer Advocate @ Cloudera

Tim Spann's LinkedIn account Tim Spann's twitter account



Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways