How I Reduced the Processing Time from 3hrs to 12min using Concurrency in Go
Introduction
Worker Pool, also known as Thread Pool is used to achieve execution of concurrent tasks. The idea behind the worker pool is that we will have a fixed number of workers running in the background and waiting for some task to get assigned to them while our main thread continues to be in execution. Whenever we encounter some heavy work/task (eg: file processing, network call, etc), we will submit this task to our worker pool for concurrent execution in the background.
Problem
Before we deep dive into the worker pool pattern, lets understand the problem we are trying to solve. At my workplace I was requested to read a CSV file from Google Cloud Bucket, these files contained user_ids which I wanted to send over Kafka. This was a really easy task because Google Library handled most of the difficult things for me like reading records from GCS and putting them in a matrix it took hardly a few seconds however problem came when I was sending user_ids to Kafka one by one, this operation almost took hours. This becomes worse if I receive a CSV with maximum possible user_ids (around 80lakhs) and processing time hits ~ 1-2 hours this was not all I could receive multiple requests and each request could have multiple CSV files. In the worst case, I could receive a set of CSV files with around 5M users.
After some analysis, I found that this reading of user_ids and sending over to Kafka needs to be done concurrently and since the project was in Go so concurrently processing user_ids was not a big thing, I could simply launch this whole looping operation in a go routine, but there was a problem. The problem was this was a CPU-bound operation and I could not launch too many Goroutines except things will be fast as this would result in too much of a context switching and making this even worse than synchronous processing.
Let’s talk more about tasks, there are two types of tasks I can think of when dealing with concurrency:
- CPU bound task
- IO Bound task
CPU Bound task
This task majorly relies on CPU cycles, we can understand this task by observing the nature data and operation we are trying to process, for example, if we have a huge array in our memory and we want to loop over it or if we have a small set where each entry in a set would take a bit amount of processing may be some calculation etc. This kind of task has a limit to how much we can process concurrently because we have a limited number of CPU threads and CPU cycles.
IO Bound task
This task relies on an external entity, an external entity can be like reading from a disk reading from a network or waiting for a signal to happen. They don’t have to rely on the CPU because the CPU doesn’t process them instead CPU is blocked due to these tasks and they are majorly known as Blocking Tasks.
So if we see both types of tasks then they both lead to our main thread getting blocked until we are finished executing the thing we are blocked onto.
Solution
The solution to this problem is to have a set of workers who are only responsible for handling these blocking calls. We will have a limited number of workers to process our heavy task ensuring that we do a limited amount of context switching.
To summarize what we need is several workers, how to submit tasks to these workers and how to gracefully shut down this worker when the server is shut down to prevent Go routine leaks.*
Let’s start by defining the struct
of the worker pool itself and its constructor function.
type workerPool struct {
workerCount int
taskFnChan chan func(ctx context.Context) error
errChan chan error
}
func NewWorkerPool(wc int) *workerPool {
return &workerPool{
workerCount: wc,
taskFnChan: make(chan func(ctx context.Context) error, 1),
errChan: make(chan error, 1),
}
}
In the above code, we have defined a struct
named workerPool
which has the following properties
workerCount
- it represents a count of the number of workers (go routines) our pool will start.taskFnChan
- it is a channel to receive a task that we need to execute concurrently. Note that it takescontext
which helps us to cancel tasks anytime we wish to along with the entire pool.errChan
- a channel on which errors will be emitted whenever an error is generated by any of the task functions.
We instantiate our worker pool by calling a constructor method NewWorkerPool
which is an argument wc
of type integer i.e. number of workers to spawn.
To submit a new task for execution, we can make use of a method AddTask
whose sole purpose is to submit a task to one of the free workers under concurrent execution.
// Allows submission of new tasks to the pool.
func (wp *workerPool) AddTask(taskFn func(ctx context.Context) error) {
wp.taskFnChan <- taskFn
}
We have defined two methods to start the pool
StartAndExitOnErr
- This method will exit and kill the pool whenever some error is returned from any of the workers. This type of execution is well suited when we are spinning a pool per request and we don’t want to process any further when an error arrives. Not recommended that we have a single pool for the entire life cycle of the application as this will exit the pool on error making upcoming requests that cannot utilize the workers to process.StartAndLogOnErr
- This method will log an error whenever some error is returned from any of the workers. This type of execution is well suited when we are spinning a pool single pool and we want only this pool to exist for all the requests of our application. This pool executes in a non-blocking manager whenever an error is generated. But if we wish to wait for the pool to finish before we can return a response then this type of start method is not recommended.
StartAndExitOnErr
func (wp *workerPool) StartAndExitOnErr(ctx context.Context) error {
defer func() {
close(wp.errChan)
close(wp.taskFnChan)
}()
wrappedCtx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
wp.start(wrappedCtx)
if err := <-wp.errChan; err != nil {
return err
}
return nil
}
Here we start our pool and wait on wp.errChan
i.e. we wait until any of the workers returns an error and the moment it does all we do is return an error. In return, we execute our context cancelFn in a deferred call followed by the closing of our channels. if you are wondering why we are cancelling the context then it is to signal all our running workers to stop processing.
StartAndWaitOnErr
func (wp *workerPool) StartAndLogOnErr(ctx context.Context) {
go func(taskFnChan chan func(ctx context.Context) error, errChan chan error) {
defer func() {
close(wp.errChan)
close(wp.taskFnChan)
}()
wp.start(ctx)
for {
select {
case <-ctx.Done():
return
case taskFn, ok := <-wp.taskFnChan:
if ok {
if err := taskFn(ctx); err != nil {
errChan <- err
}
}
}
}
}(wp.taskFnChan, wp.errChan)
}
Here we launch a closure function as a go routine which will continue to run forever until we are not cancelling the context from the parent. Inside of this function, we leverage the multiplexing pattern of Go to handle the go routine cancellation.
Both these start methods are self-explanatory but what is important to note here is that we are calling wp.start(ctx)
which is responsible for spinning out go routines.
Now to the wp.start(ctx)
function
func (wp *workerPool) start(ctx context.Context) {
for i := 0; i < wp.workerCount; i++ {
go worker(ctx, wp.taskFnChan, wp.errChan)
}
}
It’s pretty straightforward we launch our worker go routines equal to the size of the worker count.
And finnaly the core of our whole program, i.e worker definition
func worker(ctx context.Context, taskFnChan <-chan func(ctx context.Context) error, errChan chan<- error) {
for {
select {
case taskFn := <-taskFnChan:
if err := taskFn(ctx); err != nil {
errChan <- err
}
case <-ctx.Done():
return
}
}
}
Our worker function is quite easy, it has a similar pattern to that of StartAndLogOnErr
with the only difference of its logic. Here we have input args as context, our taskFnChan which is receive only channel and errChan which is send only channel.
so whenever an AddTaskFn
method is called the function submitted in this method will be sent onto the channel which will be received inside this worker in the first case statement and we will execute the function inside this block here. Now if this taskFn
execution gave us any error then we would simply send the err on errChan
and continue to the next task.
But let’s say we receive the shutdown signal which will be handled by our second case statement and we will cancel this worker execution and return.
That’s all about the Worker Pool implementation in Go.
What’s next?
Implementation of the worker pool was quite easy but then the important part is, how many Go routines do we need to start? or how many workers do we need to start?
It completely depends on you. In my use case since I was dealing with a CPU-bound operation I had to do a bunch of benchmark analyses below are the results of my analysis
Worker Count | Approx Execution Time | User Count |
---|---|---|
10 | 1hr | 5M |
15 | 40min | 5M |
20 | 20min | 5M |
25 | 12mins | 5M |
30 | 11mins | 5M |
35 | 30min | 5M |
40 | 35min | 5M |
I experimented with 25 and 30 several times but the results were competitive and sometimes 25 was better and sometimes 30 was better. We decided to stick with 25 (just decided).
So this is how we used the Worker Pool Pattern to reduce running time from 3 hours to 12 minutes for our worst-case scenario.
Follow the link for the source code: SourceCode