Conf42 Golang 2024 - Online

Go Concurrency powering a Gigabyte scale real-world data pipeline

Abstract

This talk is about how we built a real-world data pipeline in Go to migrate Gigabytes of data from MongoDB to PostgreSQL in a streaming fashion. You’ll learn how to use Go’s concurrency features to build and scale such a system. If you want to see Go’s concurrency in action, attend this talk.

Summary

  • Today I am going to talk about go concurrency powering a gigabyte scale real world data pipeline. I'm a founder at one two n where we help companies with backend and reliability engineering. I also write stories on pragmatic software engineering based on our work that we do on Twitter and LinkedIn.
  • We wanted to move data from Mongo to postgres. How do we map MongoDB documents, tables and rows? There are two types of transfer. One is a bulk one time transfer and the other is streaming data transfer.
  • Our idea is to migrate data from MongoDB on one side to postgres on the other. How can we improve the performance of the system? Can we add more worker pools? Where can we use some concurrency?
  • So essentially what we are doing is we are segregating per database. If we have n databases, we are typically going to have twice the number of Go routines. What if we create go routines per database collection combination? That will be a truly concurrent and massively parallelized solution.
  • So always like consider Amdahl's law, and also consider its sort of variant and even more broader law, which is universal scalability law. Keep things simple may be more valuable for the maintenance and runtime of the program. And then premature optimization. So finally we ended up with a performant concurrent implementation.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Today I am going to talk about go concurrency powering a gigabyte scale real world data pipeline. So let's get into it. So before we get started, a quick thing about myself. I'm chinmay naik. I go by chinmai 185 on Twitter, GitHub, LinkedIn, etcetera. You can catch me up right there. I'm a founder at one two n where we help companies with backend and reliability engineering. I also write stories on pragmatic software engineering based on our work that we do on Twitter and LinkedIn. So follow me up there. And in general, I like engineering. I love psychology, I play percussion, and I'm a huge fan of rather old computer game called Age of empires two. So hit me up there. So today I'm going to talk about MongoDB to RDBMs, data migration, and how we achieved that using ghost concurrency features. Right? So fundamentally, we wanted to move data from Mongo to postgres. I'm not advocating for either one of these two technologies as such, but we just had a use case where we had to move data from MongoDB to postgres. Now this was two type of transfer. Like one is the ETL extract transform load kind of transfer where we were doing one time bulk transfer. Kind of think of it like snapshot transfer of data. And we also had to worry about streaming data transfer. Like if there is any ongoing updates to MongoDB, how do those reflect in postgres? Before we go ahead, let's think about how do we map MongoDB documents, tables and rows? Because these two are different technologies, they're not one to one direct data transfer. So let's look at student collection. It's just sample collection. In MongodB, where we have, this is a sample student document where you see id, which is primary key. There is name, which is a string field. You've got role number, which is a numeric field. There is Boolean field for is graduated. And this is also another string field. And in Mongo, what you can do is you have bunch of nested documents. So you have address as a nested set of nested array of objects, and you have nested object, which is phone number. How does that data get translated into postgres? So the student record itself is pretty simple, right? You could just migrate the keys one on one, and keys in MongodB document become columns in postgres. So for example, id becomes primary key. You have name as a string field, roll number is graduated, date of birth. Pretty simple. What about nested fields? We had addressed and we had phone. How do those get translated? What we could do is we could create a relationship between student address and student phone, right? So we could create a student address table where we have the primary key and we have a foreign key of sorts, which is logical foreign key from the parent student table. Then we have line items and other fields, line address and zip and other fields. Similarly, for phone number, we can create a primary key and we have a foreign key, which is a logical foreign key based on the student id, right? So and other fields like personal and work from the previous Mongo document. So essentially we are migrating data in this fashion. We have got, for a single Mongo document, we have got a student table, one record in postgres for the address, which is a nested sub document. We are going to create two records in postgres for student address table. And of course there will be a relationship between student table and the student address table. And there is going to be a third table which is student phone table, which will have one record right from the Mongo document. So that's how we are going to migrate data from Mongo to postgres. If you think about how does it translate into from JSON to SQL, this is how it looks like. So for the id column in our id field in Mongo, we are going to create an insert table record in student table. The same id will then get replicated as foreign key of sorts as part of the student id table, a column in the in postgres and same for the phone record. So how does MongoDB JSOn data maps to Sql? We just saw one Mongo document can map to N SQL statements, right? So a nested array of JSON objects or further array of objects in Mongo, they get translated into relevant tables in postgres and are recorded with primary key and foreign key relationships with some logical key constraint. Well, inserts are cool, but I. How do we do updates and deletes in Mongo? Right? We also have updates to records and deletes. Well, for updates there can be two types of update. One is we have an update SQL statement wherein a particular documents, some records get updated or some column get updated in postgres, which is update table, set column value kind of command in SQL. Because Mongo doesn't have schema, a fixed schema and postgres or any relational database has a schema, a new key added to JSON document that will translate into alter table statement in Mongo, in postgres, right? Which will have a schema change essentially. So for delete, it's going to be a simple delete SQL statement from a table. Then we have two choices of migrating data. One is a bulk one time migrate where we just move all the data from Mongo to postgres in this fashion, where we copy data from Mongo, create proper SQL schema, create insert statements, and just copy data. But we can't stop our production mongo and migrate everything and then just migrate to postgres. For example, we would have to have some streaming data fashion as well where we would migrate one time the initial data, and after that we are going to need to rely on updates to mongo or new inserts to mongo or deletes to Mongo to be also translated to postgres. So that has to be done in a streaming fashion. What if we want both options, which is where we need a reliable way to track all the updates to Mongo database so that we can use that to migrate to postgres. Any thoughts that come to mind? What kind of Mongo feature we can use? You guessed it, it's called Mongo Uplog. In the Mongo operation, log is a logical collection. It's a capped collection in MongoDB. It's in the local database. What it does is it kind of think of it like a write ahead log for MongoDB. It tracks all the edits to the database, whether it's insert updates or deletes. And you're going to be able to look at whatever is happening to the database in terms of updates or inserts or deletes. So what we could do is we could use Oplog to translate data or transfer data from Mongo to postgres. So let's look at how it looks like in practice. What does the Oplog record look like? Here is the sample insert oplog. You've got the op as the operation, which is insert. Here I stands for insert. The namespace stands for a database and a collection combination. So you got test as a database name and student as the collection name. Then you've got the proper insert object, which contains the actual object that is being inserted in MongoDB. Similarly for update, you've got operation as update, you've got the namespace and you get a set of field value pairs, right? What is being updated, for example, and you get the actual updated object. So imagine this to be you want to update based on some key and you want to set some new set of values. So that's what you get in the update oplog. And similarly, you'll have delete as well. You might be wondering when are we really getting to the concurrency and the go part of it? So, well, the wait is over. Here is the sequential pipeline. What looks like. So fundamentally our idea is to migrate data from MongoDB on one side to postgres on the other. The way we do this is we think of writing a utility or a program called, let's call it Oplog to SQL. What that program is going to do is it's going to read the oplogs from MongoDB sequentially as it comes is going to, there is going to be a go routine which can read these oplogs. It's going to then put these records, put these oplogs into a channel. There will be another go routine which will read from this channel which will convert process oplogs in some sort of SQL format. That's what we saw just now, right, how we can convert a single oplog into N SQL statements. So we are going to cover, convert and process and convert an oplog to a bunch of SQL statements. Once that's done, we put them into another channel from this channel, another go routine picks up these values, these SQL statements, and then it just basically does raw inserts in postgres, right? So imagine we wrote this program with this kind of sequential data pipeline. We will be able to migrate data from Mongo to postgres both in streaming and in the bulk fashion, assuming we have access to those set of oplogs from that time. So here you can consider these as go routines and these channels, these sort of pipes as channels essentially. So this is how our Mongo oplog looks like. Here is how corresponding postgres database table would look like. Similarly, you've got this namespace and you've got table that translates into database schema and table name. In postgres, the data gets translated into particular table. So to be able to translate or move this data from Mongo to postgres, we are going to have to create schema, we are going to have to create a table and then we are going to insert data into that table. Once we insert this data, this is what it looks like. We have the schema, the table and the insert statements. Well, imagine we have two oplogs, two insert oplogs on the same mongo collection. Well, we can't go and create new schema and new table all the time. We shouldn't actually. So we're going to create schema and table only once and perform n number of inserts, right? So that's what we look at here. So one insert goes here and then the second insert goes here. So we have to also maintain some sort of state to be able to not create schema and table multiple times. For now, given this is a talk related to concurrency, we'll skip all the details and edge cases related to updates and deletes. You can trust me that I've got it handled. So here is what our pipeline looks like. Let's say we run this pipeline, right? What we get for about 3.6 million records. I ran this pipeline on my sort of MacBook with basic specs. It ran for about nine hour 20 minutes. So let's think about how we can improve the performance of the system. Typically if you think about it, we can probably add some worker pool to be able to speed up things. But where? Which parts of the program can be parallelized? Where can we add concurrency? Typically the process and the convert Oplog to SQL. That's the go routine. That seems like it's doing too much work and it could use some help. So let's modify the program to add worker pools. What we have done in this case is we are reading the oplogs in a single go routine. We are pushing the oplogs in the oplogs channel. From there we have an orchestration where we are creating n number of worker pools or worker go routines to be able to process these oplogs concurrently. And hopefully if you have multiple cpu cores we can also run them in parallel. Once all of these worker go routines run there Oplog processing and conversion operations, they will convert these into SQL statements. Those SQL statements will go into this SQL statements channel. The execute SQL go routine is going to then pick up and run these SQL statements in postgres. We've got one design, but can we do better? Can we add more worker pools? Well, I think the SQL Go routine execution can also be parallelized and can also use some concurrency, right? So it could also use some help. So let's modify the program to be able to have more go routines. Now in this case we have same, but what we've done is for each go routine which handles the incoming oplog, it creates a set of SQL statements and it pushes the SQL statements in its own channel which another guru team can read from. This way we have maximum concurrency. And by the way, I can tell you like no go routines, gophers are harmed in this exercise. So we are good. So we ran this program. What happens is it runs in about for the same number of same number of oplogs. 3.6 million. It runs concurrently in about 2 hours 18 minutes. And that's actually four X performance improvement over the sequential one. So we've already gotten like four X performance improvements. That's amazing. Before we do something more, we just realize something is wrong. So what could be wrong? So again, remember we have this oplog channel where you are getting all the oplogs one by one. And there is multiple go routines which are processing these oplogs concurrently. If you use go routines multiple times, you will probably know that you can't guarantee ordering of independent go routines. You can't really have synchronizations across go routines. So what does that mean? That means this insert and update in the output can actually also become update and insert. Well it can also become delete and insert. This is a problem. How does this cause a problem? So think about our pipeline. We read the oplogs, we put them in a channel. There is bunch of go routine workers which process these oplogs. So imagine we have a single document which is being inserted and immediately after is being updated sequential oplog entries, one for insert and one for update of the same document. What could happen with this setup is that the SQL statement for insert would be generated and similar SQL statement for update would be generated. But we can't guarantee the ordering of these results. That means it could end up happening that the update SQL runs first and then it tries to run the insert SQL. Well that would result in SQL error. So we can't really throw worker pulls at a problem, right? Like the data integrity gets compromised and we have a correctness issue. So we need to always write correct program first. Then we can make it performant. So this is a program that is performant but it's not really correct. So then do we go back to drawing board or what do we do? And again, imagine we have n number of databases in Mongo and each database has m collections. What we could do is we could fan out for each database and fan in for SQL. That's an option. So for example, what we would do is again we would read the same in a single go routine. Remember from MongoDB we push this into the same channel, same oplox channel. But on the other side we have a go routine, a single go routine which will fan out these oplogs based on per database. So essentially what we are doing is we are segregating per database. So if you have n databases, we are going to create n number of channels on the other side, right side of the oplogs for each database we will have a go routine that will consume these oplogs. Remember, each go routine consumes oplogs from only one database. So for example, that go routine only processes DB one Oplog. This go routine only processes DB two Oplog and vice versa. So we have n number of go routines based on the number of databases. And ultimately all these go routines would then fan in and send all the SQL updates or statements to a single channel, from which we could then execute the SQL to postgres. Well, you could theoretically say that that single go routine on the who is executing the SQL on database, that could become the bottleneck and it could use some help. So we could modify this design and we could fan out for each database without fanning in all the coroutines into a single without fanning in all the SQL statements into a single channel. So this would, this is how it looks like. Similarly we have those n databases, right? We have n go routines, but instead of fanning them all in into a single channel, we are creating a channel for each go routine. And similarly to be able to insert records in postgres, we are going to create n number of Go routines on the database also, right? So we will have one go routine per database and we'll have one go routine per SQL insert, update or whatever. The total number of go routines that we'll end up having is two nice. Plus obviously there are two go routines that we have, one for reading the Oplog and one for orchestrating the fan out. But basically if we have n databases, we are typically going to have twice the number of Go routines. If we take this idea even further and we say well, why don't we create go routines for each database and collection combination? Also, right now we were only creating go routines per database, but each database has m or some collections, right? So what if we create go routines per database collection combination? That will be a truly concurrent and massively parallelized solution, right? So this is what, what we end up with. We have a concurrent data pipeline. We've got Mongodb. The diagram was so huge to fit horizontally that I had to fit it vertically, right? So we have Mongodb on top. Then we have this oplog to SQL program where we have, we read the oplogs in a single go routine. We push them into a channel. On the receiving end we have a orchestrator go routine which is going to fan out per database. So it's going to create n channels every time it encounters a new database. It's going to create a new channel and it's going to push all the values, all the oplogs for that database in that channel. Then we have the fan out oplogs per collection. So we have those n go routines, one for each database as and when they encounter an oplog for a different collection in the database, they are going to create M such collections oplogs or m such channels. Goroutine will then process oplogs for that only which channel specifies for a database and a collection combination. So this go routine is going to work on only, let's say DB one collection one combination. The next coroutine will work on DB one, collection two, db one collection three likewise. And then a new set of go routines will work on DB two, collection one, db two collection two, etcetera. So we've essentially got m into n kind of go routines. Each of them write create SQL statements. Each of them push these SQL statements into another channel, which we will have again a SQL execution go routine which will execute all these SQL statements onto postgres. So essentially when we run this program, we see that it runs in about like 1 hour 36 minutes and 30 seconds for the same number of oplogs. So we had about 3.6 million oplogs. The sequential operation ran in nine hour 20 minutes and the concurrent one ran in 1 hour 36 minutes. Have in terms of resource utilization though, for this is that, imagine we have 16 databases and we have 128 collections per database, right? That's on the higher side, but let's imagine that. So the total number of go routines that we'll have is we going to have n number of fan out oplogs per database, right? So we have got n goroutines like this. Then we have got, for each database we are going to create m go routines. So there will be n multiplied by m of those yellow go routines. And lastly, for execution of SQL, we are going to have one go routine per database collection combination. That means n multiplied by m. Again, if we just consider 16 databases and 128 collections, we are going to end up with 2048 database connections because each coroutine will actually form a connection to postgres. That's going to be quite some number of connections and it's going to hog the cpu and probably even cause problems with postgres if we have so many open connections. So while this program is performant, it is not utilizing the resources or it is over utilizing or overburdening postgres. What can we do? Can we do better? Right? So what we do in this case is we keep the most of the pipeline, the same, but instead of fanning out for postgres writes we fan them in. Remember, we don't need to create n multiplied by m connections to database if we can limit it to just n connections, one connection per database, that's good enough, and that's what we'll do here. So from all the go routines which process a single databases, records, all of them will funnel the data back into a single channel, and that channel will have another go routine which will execute all those SQL updates, inserts, etcetera on the postgres. So essentially we are going to create n number of go routines for handling database connections to postgres and that's where we limit the number of go routines prol that happens. So if you consider 16 databases and 128 collections per database before we had a massive fan out where we would end up with 2048 database connections on the right side after the modification, what we have is we are fanning them in again to per database. That means we have only 16 number of connections to database. So we don't create massive database connections and cause database to hang and perform slow, but we instead create only 16 connections to database. So if you were to then compare this improved performance, you will see that in my case it ran a bit faster because it's probably utilizing the database and overall machine properly. So it was definitely better than concurrent implementation earlier, but it wasn't so much better. But in terms of resource utilization it was massively so. This is what our final concurrent data pipeline looks like we've got. We're reading those from Mongolia. We are fanning out into n number of Go routines, one for each database for each of them. We are fanning them out again into m number of collections and m number of Go routines, one for each database collection combination. Ultimately then we fan in again. We don't have a go routine sprawl. We fan them in into collecting all SQL updates, inserts, deletes all sqls basically for one database into a single go routine and a single channel. And then that go routine will perform updates, insert and delete to Mongo to postgres database. That's what our final concurrent pipeline looks like. Think about what we just did overall as a solution, right? We understood the problem domain. We didn't. We said that the solution is in context of what the problem that we are solving. So our problem was MongoDB to database postgres data migration. We built the working solution first, which is a sequential data pipeline. Agreed that it ran very much slow, but we first always have a working solution before we try to optimize something, we identify possible parallel portions of the program. Not every program can be parallelized, right? So you have to identify what parts of the program can actually be improved. We have to avoid blindly applying concurrency patterns. For example, we tried to apply worker pools without really thinking too much, and then it didn't work out, right? The program was concurrent, but it wasn't performing. I mean, it was performing well, but we did not have correctness. And without correctness, no optimization helps. We have to also consider Amdahl's law, where what parts of the program can be parallelized and what's the sequential only portion of the program, right, which cannot be parallelized. So no matter how much hardware you throw at the parallelizable portion of the program. But if it's not more than some percentage of the remaining program, which is the sequential program, it's not going to help out much. So always like consider Amdahl's law, and also consider its sort of variant and even more broader law, which is universal scalability law. Fundamentally, like simplicity may be more valuable, and keeping things simple may be more valuable for the maintenance and runtime of the program. And then premature optimization, right? And premature performance gains that you may gain beyond certain point. So finally we ended up with a performant concurrent implementation with fan in for database, to be able to limit the the go routine sprawl and to be able to limit the number of database connections. So that's it. I hope you learned something. Connect with me on Chen Mai 185 and check out our website at 120. Here is a link to our playbook and specifically link to this particular problem where I've dumbed down this problem into set of stories that you can implement yourself and try out various concurrency features of Google. I've broken this problem down into multiple small stories so that you can incrementally work on those. Thanks. Hope you have a nice conference.
...

Chinmay Naik

Founder @ One2N

Chinmay Naik's LinkedIn account Chinmay Naik's twitter account



Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways