Conf42 Machine Learning 2023 - Online

Build ML Enhanced Event Streaming Apps with Java Microservices

Video size:


The easy way to build and scale machine learning apps.


  • David Kjerrumgaard will talk on how to build machine learning enhanced event streaming applications with Java. We'll start with why Apache Pulsar is a great fit for event driven microservices. What enhances event drivenmicroservices to include a machine learning element to it?
  • Event driven architecture is a completely decoupled system in which microservices independent systems communicate with one another asynchronously by exchanging events between them. You can couple the microservices philosophy with decomposing applications in very small pieces.
  • Apache Pulsar was founded originally in Yahoo in 2012. It was designed to be a cloud native messaging and event streaming platform. Why is it a good fit for event driven microservices?
  • Pulsar is designed to run in cloud environments, more containerized environments. It can take advantage of features like stateful sets to automatically restore state if one of the instances goes down or to scale up automatically dynamically. It's a pluggable architecture. You have a lot of choices on what you want to use.
  • Pulsar functions are a serverless computing framework that runs inside Apache Pulsar itself. They're designed for lightweight stream processing. They excel at basic use cases that don't require the complexity of full stream processing engines. They can also be deployed as Kubernetes pods.
  • Pulsar functions SDK software development kit that includes an API. Supported for all three languages, and it provides a richer set of API for doing interesting things. Also supports retention of state within pulsar functions. Third party libraries for machine learning specifically for this use case.
  • There are two maven modules within this repo. The first is a Java based microservice that will be using the machine learning model to perform a sentiment analysis on raw tweet data. The second is a tweet simulator that will publish data out for us. We will use these modules for deployment in a minute.
  • So now let's pivot and start generating the source data. Let's create a pulsar client to consume. Now let's start creating the pulsar sentiment analysis function. And shortly we'll start seeing the tweets coming in one by one.


This transcript was autogenerated. To make changes, submit a PR.
Hello and welcome to my talk on how to build machine learning enhanced event streaming applications with Java. My name is David Kjerrumgaard and I'm a developer advocate at Stream Native. I have over two decades of experience in software development, big data and event streaming. I'm also a committer on the Apache Pulsar project and a published author. As you can see, I've worked across a diverse set of companies, everywhere from Amazon and FedEx to Zappos. As I mentioned in the previous slide, I'm also a published author. I'm the author of Pulsar in Action by Manning Press, which is available for free download on the link shown here at the bottom of the slide, and also a co author of Practical Hive. So let's talk about the agenda for today. We'll start with why Apache Pulsar is a great fit for event driven microservices, and how to build an event driven microservices applications with pulsar functions. Once we've shown you how to develop a basic pulsar function, what enhances event driven microservices to include a machine learning element to it? So let's begin with event driven microservices and event driven architecture. Event driven architecture is a completely decoupled system in which microservices independent systems communicate with one another asynchronously by exchanging events between them. Rather than making a point to point call, use a message broker of some sort as an intermediary to store these events and then deliver them to all the registered consumers who are interested in these events. So an event represents something like a change in state to the system, such as an item being placed into a shopping cart, ecommerce order being checked out, or an item being added to your shipment and being delivered on its way for delivery to your house. Now, event driven architectures are loosely coupled, and again, they communicate asynchronously, typically via a pub sub mechanism. Event driven microservices then, are microservices that are designed to communicate with one another over these message buses, right? So we talked about event driven architecture, but not necessarily microservices. So you can couple the microservices philosophy with decomposing applications in very small pieces based on a business line of unit, and have them implement that using that same pattern. Again, communicating over an event bus of some sort. In short, event driven microservices need an event bus to interact with one another. That's the critical component that all event driven microservices share in common, regardless of what programming languages they're written in such as Java or Python that allows them to be written in any language of your choice and communicate with one another seamlessly. So let's pivot a little bit and talk about Apache Pulsar and why it is a good fit for event driven microservices. Apache Pulsar was founded originally in Yahoo in 2012, and it was designed to be a cloud native messaging and event streaming platform. So it includes all the capabilities of your traditional pub submessaging mechanisms like you would see in a traditional messaging system like RabbitMQ or IBM MQ, things like that, and also includes a more modern event streaming or event consumption messaging model that comes with Apache kafka or under the covers, Amazon kinesis and things like that. So it is really designed to support both messaging semantics natively. Out of the is by default it has a pub sub model so the producers and consumers don't interact with one another. A producer connects to a pulsar broker and publishes messages. These messages are stored on a topic which is just a name channel between producers and consumers. Consumers can then, at their leisure when they want to come in attached to the broker and consume messages from the topic in an asynchronous manner. So again, they are completely decoupled from one another, they communicate asynchronously, and they have a single endpoint being the pulsar broker to communicate between them logically. At the bottom of all pulsar communications is what's called topics like any other messaging system. Just wanted to note here that unlike other messaging systems, it actually has a three tier URI or uniquely referenceable interface id for each topic. So a topic exists within a tenant, and pulsar is a multi tenant system natively out of the box, which is a different shading feature from other messaging systems. So you can have different independent departments within your organization, sharing the same logical cluster and being logically isolated from one another. Next, under tenants exists the concept of namespaces, which is a logical grouping of topics who have similar policy requirements around security or data retention, tiered storage, offloading, things like that. And then last but not least, there are topics sitting at the bottom of those, and these are where the messages ultimately end up. And so when producers produce the topics, they address it that way. And likewise from a consumer perspective, they give the tenant namespace a topic to uniquely identify those particular topics they want to consume messages from I'll touch briefly on the physical architecture of pulsar cluster that it is. Again, as I mentioned earlier, it's cloud native, and what this means is it's architected in such a way that it's designed to run in cloud environments, more containerized environments. It can take advantage of features like stateful sets to automatically restore state if one of the instances goes down or to scale up automatically dynamically. And how we architected for that way, a pulsar to be that way was that we had a decoupled architecture in that the serving of the messages, the pulsar brokers shown there at the top are completely stateless. No data is stored on the pulsar brokers that's kept completely separate in what's in a storage layer based on Apache bookkeeper, which is another storage open source project based on for storage. So when you publish data to a pulsar topic it is persisted to bookkeeper. This gives a lot of advantages, again for resiliency because if any broker dies another one could take over serving that topic because the data doesn't go with it which is completely different from all other messaging systems out there. This really sets it apart which allows it to scale dynamically. Now tying the two together is the metadata storage, right. We're putting this data on a different storage layer. So how do the brokers know where to go and bookkeeper to get it? Well that's where the metadata storage comes in. That's where we keep all the records, policies and what we call the managed ledger for where this topic components are spread out across Apache bookkeeper in that component. It's a pluggable architecture. You have a lot of choices on what you want to use. Historically it's been based on Apache Zookeeper which is known to have issues and scalability issues at some point. So if you're running in a containerized environment you might want to consider Etsyd. We also included a new release by streamnative called Oxia which is a highly scalable system that solves a lot of the problems that zookeeper had. So if you're interested in going with pulsar, I highly recommend looking at Oxy which is indicated by that little pinwheel logo there on the left inside the metadata storage box. Let's pivot a little bit and talk about pulsar functions. So now you know what Pulsar is. It's your event messaging bus. It provides pub sub messaging. That's great David. We need that for event driven architecture, event driven microservices. So what are these pulsar functions? Well within patchy Pulsar we decided to create a serverless computing framework that runs inside pulsar itself. You can think of them like AWS lambdas, and they enable very low functional level programming. A very simplistic API that wires in directly to Apache Pulsar itself and handles all of behind the scenes the details of the coding, of setting up a producer to write data to Pulsar and a consumer to consume data from Pulsar. And all you have to do is function on the business logic itself. So you say, I want this to be a pulsar function. This is my input topic, this is my output topic. And all of that happens for you automatically. The framework takes care of that for you, and they allow you to run individual units of code that react to that particular publication of messages. So they have a programming model, something like this. Pulsar function is just a deployable piece of code that you control, and every time a new message comes in, that event gets triggered automatically. We'll see what the API is here in a second, but every time a message comes in off of any of the input topics you're subscribed to, boom, your code gets triggered. Now, as you process that data, you may want to have an output, generate some output, do some processing to it, do some manipulation so you can optionally publish it to an output topic. And also for monitoring and tracking progress, things like that, debugging, there's an optional logging topic as well, which you can publish messages to. And this is great. So again, it supports multiple programming languages, and we'll focus on Java today. But if you want to write your microservices in Python or go, that is also fully supported. And the key to this, as we'll see here at the end, is that you can package third party libraries into these code itself. So if you want to include a machine learning library, as we'll see here in the example, you can do that and take full advantage of that inside your pulsar functions. You don't need to set up a complex framework to do these sorts of things, you can just easily embed it and take advantage of that. So why do we think pulsar functions are good for microservices in general, or just why pulsar functions versus alternatives, right? When we decided to design pulsar messaging system, we realized that a lot of stream processing functionality is really simplistic, that you need to do a few simple operations on it. And a lot of people had difficulty in setting up yet an additional data pipeline tool, something like a stream processing engine like storm or flink, that's its own distributed system, just to do some processing of the data. So we wanted to make it more simplistic. We also wanted to embrace the serverless computing model so that they can run in containerized environments. They can be deployed as Kubernetes pods. This allows them to be resilient to failures and starts and stops, things like that. And it's also to maximize developer productivity, right? You don't need to learn a complex framework like flink and something like Scala to handle spark or something like that. You can just write a very simple language native interface in Java and implement a simple method and it's easy to get going. Again, they're designed for lightweight stream processing. They excel at basic use cases that don't require the complexity of full stream processing engine, such as event driven microservices. Again, so an event comes in, I'm notified of it, I do my processing logic and I optionally publish an event out, or I manipulate and store some data. That's entirely up to me and it's a great fit for that. You can also use them for simple things like messaging, transformations, ETL sort of processing. I want to do data enrichment. Something comes in, I want to do lookup in a database, add that up, clean up some data, maybe offload some bad records coming in, things like that, and you can chain them together. That's the beauty of them as well, because you have an input topic in one and an output topic. That output topic could then be the input to another one. So it really allows you to start developing some complex chains using some very simple tools. So let's talk about developing pulsar functions. So I laid the groundwork here. Now you have a processing engine, you have a compute engine to do it on what is required to do it. So there are native pulsar functions, but I won't get into that today. That just allows you to do it without the SDK, which is an option. But what I'm going to demonstrate straight today includes a pulsar functions SDK software development kit that includes an API. Again, it's supported for all three languages, and it provides a richer set of API for doing interesting things, right. So you can have a single message come in and publish it out to multiple output topics if you want. We also support the retention of state within pulsar functions themselves. So you can, for example, compute an aggregate, store that to a state and then come back later on, or pull that data back out and use that stored value as part of your manipulation computation. Again, which is great for something like event driven microservices. Maybe you want to store your database state what you've seen of these messages, so that you can use that as your external data store and you can produce to many different topics, consume data, content based routing. All these different features are available with the functions SDK. Last but not least, before I get into the demonstration, obviously we're going to package a lot of third party libraries together, machine learning libraries specifically for this use case. And obviously when you deploy them up to this fronttime environment, it's critical that you bundle all these deployments with them. They're not going to be available inside pulsar themselves, so they have to be all within a single deployable artifact. And for the Java based functions you can use either a Uber jar or a NAR file. I prefer NAR file for multitude of reasons, and that's what I'm going to demonstrate today. For Python you have different options as well, and for go based functions you can deploy them as a go file. So let's explore the code from the repository. There are two maven modules within this repo, the first being one called sentiment analysis function. This represents a Java based microservice that will be using the machine learning model to perform a sentiment analysis on raw tweet data. As you can see here, it contains a Java class called sentiment analysis function that implements the function interface defined in the Apache Pulsar functions API. It is a typed interface which specifies the input type value coming in from the source topic as string, and we will be returning a type of analyzed tweet which is a user defined class that will contain the tweet text itself along with the calculated sentiment for that tweet. The only method defined in this interface is the one called process found here, and we mark it with the override annotation. To note is that we're going to return a type of analyzed tweet, which is what is specified as the output type in the function, and these must match. Similarly, the input type was specified as string and the input parameter. The first one is going to be of that same type, allowing you to have strongly typed data coming in so you can manipulate it in the manner you see fit. We have added a single guard and initializing method to make sure that we initialize this natural language processing library just once in order to perform it, rather than do this multiple times. We can see that this library is an open source library, third party library available here, and we include it in the module as a definition here. So we include it as a normal dependency like you would any other Java class, and it's freely available inside the pulsar functions themselves, which is really powerful. Not a lot is going on inside this particular method, but we just want to demonstrate the capabilities, right? So a tweet comes in, the text comes in, and then we leverage this third party library, this machine learning model, to perform sentiment analysis on it. It calculates a sentiment for us, and as I mentioned before, we return a value that contains the original tweet text along with a calculated sentiment. And this is really it. This is the core of it, but it gives you an idea of what actually can be done. Now, in addition to including the third party libraries in the base palm, you can see that we've defined a plugin, a build plugin here, which is the NAR plugin. This bundles everything together in a unified single deployable modules. I discussed in the previous section, makes it easy for deployment. The second module is called a tweet simulator, and this is going to publish data out for us. It is also based on the Pulsar I O and functions framework, but it implements a different interface here called source, which is part of the Apache Pulsario core framework. It allows you to integrate with systems, publish data in and out. In this case, we're using it to simulate tweet data, primarily because the tweet API is now locked down. And after a thousand tweets, you're sort of out of luck for an entire month for testing. We want many more tweets than that. So I wrote this little class instead to generate data into that particular topic to feed our sentiment analysis function. This tweet source here reads data from a local file, just created some random text, some sentences that come in, some random data that will be passed through, and then we'll perform analysis on that. So that's really it. That's where the code is. And this can easily be built. If you go to your terminal and you run maven clean package, it will generate NAR files for both the tweet source and the sentiment analysis function, which we will use for deployment in a minute. Also want to note that as part of this particular project, we have a pulsar function, or pulsar cluster rather, that you want to run locally and can be started with the start brokers command, which is what we will use down here. We'll notice that we're already in that particular bin folder, so I'll go ahead and start the brokers now. So we have a pulsar cluster to test against locally. Makes it very useful. It's self contained, and I want to note here that I have specifically used the version 293 of pulsar. And I've used a little bit of a trick to map the volumes, these internal volumes, to where these nars are being built to. You can see to the target directory here. I'm mapping it to a file system on the pulsar cluster, which is going to make it easier to deploy these using the command line tool in a second. So I did it once for the source and again for the analysis function as well. Also note that you want to make sure that you enable the function worker. By default, it is disabled in the pulsar standalone. So you can do that by adding this particular setting and making sure that you do not have the NFW switch. A lot of times that's there, and that means no function worker that disables it. So make sure that that is turned off. Once you've done that, you should have a pulsar cluster up and running. Let's verify. Let's look at this docker logs pulsar f and we can see it started, it's up and running. Everything is great. We're going to use this to monitor the progress of our system. So now let's pivot and start generating the source data. So as you can see here, I've already copied and pasted in the command to do it. There is a command line tool in the pulsar admin called source, and it has lots of different methods, the first of which is create. And then you can specify the name of what you want to name that particular source. And then you specify the archive file or the artifact that contains the source data. In this case, it's the NAR file that we generated and specify a destination topic. Where do I want this data to publish to? So we'll go ahead and execute that and look at the log here. And we should see it coming in so we can see that it got received. The tweet simulator came in. It's being unpacked, the metadata is coming out. It has some information on how it was deployed. It was a source, the NAR file was deployed. So if you want to get down at the details, you can check these things. If there's any issues, check your log on that source. We can verify that it was deployed as we wanted. We can see the configuration values here. Everything looks correct. We can also check the status. Right? So let's check the status of our source just to make sure that we're generating some data. And it's been executed 17 times already. So we can come over here and we can consume from the tweets in. So this particular source is publishing again, as we saw, to the tweets in topic. So let's consume. This is another command we can run in a separate window, create a pulsar client to consume. N zero means consume indefinitely. Give it a unique subscription name and the topic name, which is going to match to where we're publishing the data. And let's verify that the data is coming in. We're going to see some startup information here about the kind of connection, but then shortly we'll start seeing the tweets coming in one by one, data coming from that raw text file one at a time. So the source is working. Now let's pivot and start creating the pulsar sentiment analysis function. So I have left in the code itself a method that encapsulates all this. We don't want to type it from memory like I don't, so we'll go ahead and use this to recreate it. I want to break this down a little bit though. So again, we're executing a command inside pulsar itself, and inside pulsar there's an admin tool and a function subcommand, and we tell it to create a new pulsar function. We specify with a dash jar that where the entire artifact is to deploy, which is the sentiment analysis nar file. We just created the class name of the sentiment analysis function itself, so again, it's the full package name. This is the class we want it to run. The input parameters are going to be this tweets input topic here that data is coming into, and for our output we're going to publish it to a different topic. Default tweets out. And last but not least, if you want to, you can specify a log topic as well for informational data there, but that's really all we need. This should work correctly. If everything goes correctly, you see a command created successfully. Again, you'll see things in the log file coming in, letting you know that it was successfully received, how it was configured, the tenant namespace and name class. Everything else should match what you want. You can quickly verify this as well. Everything's up and running. If you have any sort of errors, and it would be first noted here. If you want to also check on the status and get the configuration details at any point in the future, we can copy this real quick. Run the command here and it should spit back out the configuration details that we sent in, again, verifying it was deployed as we see fit. If you want to update these, if you want to change the output topic, change the parallelism, things like that, you can change the behavior later, but it looks like everything is up and running. So now let's go and see what the output looks like. So again, as we saw earlier, it's writing as an output to tweets out, public default tweets out. So we are going to consume from that same topic over here in a separate consumer and see this again, there's still the tweets coming in. So let's see how our machine learning model is working. Is it getting the information? Is it processing it correctly? We'll start up here shortly. Again, you see some connection details coming in and sure enough, you're starting to see here was the original tweet and it was assigned a sentiment of neutral. The next one comes in, everyone was curious, positive, et cetera. So it starts coming in, it matches. We can get the most recent data and we're doing some analysis on it. So it's event driven in that regard. As soon as something comes in, an analysis is done on it and it's published out. And so you can see that this is moving and it's going to work in tandem for a while and that's not a problem. If you want to check the performance and how long the processing of each one of these tweets takes, you can use the stats command, which is going to give you some average processing latency. This is the average, obviously, processing time of each tweet. It's going to tell you how many tweets have been processed, how many over the lifetime of the tweet itself, and things like that. So lots of information is available. As you can see here, everything is working just fine as expected. So in summary, event driven microservices use a message bus to communicate between themselves and Pulsar is a cloud native distributed messaging and event streaming platform that provides the pub subsmantics that an event driven architecture needs. Furthermore, Pulsar includes a native lightweight compute capability called Pulsar functions that allows you to build microservices with just a few lines of code. And as I've shown you, you can easily add third party machine learning libraries into your pulsar functions to enhance your microservices with machine learning capabilities. Thank you for attending my talk. Let's keep in touch. Scan the query code for my personal page, follow me on Twitter, reach out to me on LinkedIn and go get the source code from my GitHub repo. Again, thank you very much.

David Kjerrumgaard

Developer Advocate @ StreamNative

David Kjerrumgaard's LinkedIn account David Kjerrumgaard's twitter account

Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways