I'm Fattesingh Rane

I'm Fattesingh Rane

Welcome to my personal blogging page on engineering and technology.

10 Dec 2023

How I Reduced the Processing Time from 3hrs to 12min using Concurrency in Go

Introduction

Worker Pool, also known as Thread Pool is used to achieve execution of concurrent tasks. The idea behind the worker pool is that we will have a fixed number of workers running in the background and waiting for some task to get assigned to them while our main thread continues to be in execution. Whenever we encounter some heavy work/task (eg: file processing, network call, etc), we will submit this task to our worker pool for concurrent execution in the background.

Problem

Before we deep dive into the worker pool pattern, lets understand the problem we are trying to solve. At my workplace I was requested to read a CSV file from Google Cloud Bucket, these files contained user_ids which I wanted to send over Kafka. This was a really easy task because Google Library handled most of the difficult things for me like reading records from GCS and putting them in a matrix it took hardly a few seconds however problem came when I was sending user_ids to Kafka one by one, this operation almost took hours. This becomes worse if I receive a CSV with maximum possible user_ids (around 80lakhs) and processing time hits ~ 1-2 hours this was not all I could receive multiple requests and each request could have multiple CSV files. In the worst case, I could receive a set of CSV files with around 5M users.

After some analysis, I found that this reading of user_ids and sending over to Kafka needs to be done concurrently and since the project was in Go so concurrently processing user_ids was not a big thing, I could simply launch this whole looping operation in a go routine, but there was a problem. The problem was this was a CPU-bound operation and I could not launch too many Goroutines except things will be fast as this would result in too much of a context switching and making this even worse than synchronous processing.

Let’s talk more about tasks, there are two types of tasks I can think of when dealing with concurrency:

  1. CPU bound task
  2. IO Bound task

CPU Bound task

This task majorly relies on CPU cycles, we can understand this task by observing the nature data and operation we are trying to process, for example, if we have a huge array in our memory and we want to loop over it or if we have a small set where each entry in a set would take a bit amount of processing may be some calculation etc. This kind of task has a limit to how much we can process concurrently because we have a limited number of CPU threads and CPU cycles.

IO Bound task

This task relies on an external entity, an external entity can be like reading from a disk reading from a network or waiting for a signal to happen. They don’t have to rely on the CPU because the CPU doesn’t process them instead CPU is blocked due to these tasks and they are majorly known as Blocking Tasks.

So if we see both types of tasks then they both lead to our main thread getting blocked until we are finished executing the thing we are blocked onto.

Solution

The solution to this problem is to have a set of workers who are only responsible for handling these blocking calls. We will have a limited number of workers to process our heavy task ensuring that we do a limited amount of context switching.

To summarize what we need is several workers, how to submit tasks to these workers and how to gracefully shut down this worker when the server is shut down to prevent Go routine leaks.*

Let’s start by defining the struct of the worker pool itself and its constructor function.

type workerPool struct {
    workerCount int
    taskFnChan  chan func(ctx context.Context) error
    errChan     chan error
}

func NewWorkerPool(wc int) *workerPool {
    return &workerPool{
        workerCount: wc,
        taskFnChan:  make(chan func(ctx context.Context) error, 1),
        errChan:     make(chan error, 1),
    }
}

In the above code, we have defined a struct named workerPool which has the following properties

  1. workerCount - it represents a count of the number of workers (go routines) our pool will start.
  2. taskFnChan - it is a channel to receive a task that we need to execute concurrently. Note that it takes context which helps us to cancel tasks anytime we wish to along with the entire pool.
  3. errChan - a channel on which errors will be emitted whenever an error is generated by any of the task functions.

We instantiate our worker pool by calling a constructor method NewWorkerPool which is an argument wc of type integer i.e. number of workers to spawn.

To submit a new task for execution, we can make use of a method AddTask whose sole purpose is to submit a task to one of the free workers under concurrent execution.

// Allows submission of new tasks to the pool.
func (wp *workerPool) AddTask(taskFn func(ctx context.Context) error) {
    wp.taskFnChan <- taskFn
}

We have defined two methods to start the pool

StartAndExitOnErr

func (wp *workerPool) StartAndExitOnErr(ctx context.Context) error {
    defer func() {
        close(wp.errChan)
        close(wp.taskFnChan)
    }()

    wrappedCtx, cancelFn := context.WithCancel(ctx)
    defer cancelFn()

    wp.start(wrappedCtx)

    if err := <-wp.errChan; err != nil {
        return err
    }

    return nil
}

Here we start our pool and wait on wp.errChan i.e. we wait until any of the workers returns an error and the moment it does all we do is return an error. In return, we execute our context cancelFn in a deferred call followed by the closing of our channels. if you are wondering why we are cancelling the context then it is to signal all our running workers to stop processing.

StartAndWaitOnErr

func (wp *workerPool) StartAndLogOnErr(ctx context.Context) {
    go func(taskFnChan chan func(ctx context.Context) error, errChan chan error) {
        defer func() {
            close(wp.errChan)
            close(wp.taskFnChan)
        }()

        wp.start(ctx)

        for {
            select {
            case <-ctx.Done():
                return

            case taskFn, ok := <-wp.taskFnChan:
                if ok {
                    if err := taskFn(ctx); err != nil {
                        errChan <- err
                    }
                }
            }

        }
    }(wp.taskFnChan, wp.errChan)
}

Here we launch a closure function as a go routine which will continue to run forever until we are not cancelling the context from the parent. Inside of this function, we leverage the multiplexing pattern of Go to handle the go routine cancellation.

Both these start methods are self-explanatory but what is important to note here is that we are calling wp.start(ctx) which is responsible for spinning out go routines.

Now to the wp.start(ctx) function

func (wp *workerPool) start(ctx context.Context) {
    for i := 0; i < wp.workerCount; i++ {
        go worker(ctx, wp.taskFnChan, wp.errChan)
    }
}

It’s pretty straightforward we launch our worker go routines equal to the size of the worker count.

And finnaly the core of our whole program, i.e worker definition

func worker(ctx context.Context, taskFnChan <-chan func(ctx context.Context) error, errChan chan<- error) {
    for {
        select {
        case taskFn := <-taskFnChan:
            if err := taskFn(ctx); err != nil {
                errChan <- err
            }

        case <-ctx.Done():
            return
        }
    }
}

Our worker function is quite easy, it has a similar pattern to that of StartAndLogOnErr with the only difference of its logic. Here we have input args as context, our taskFnChan which is receive only channel and errChan which is send only channel.

so whenever an AddTaskFn method is called the function submitted in this method will be sent onto the channel which will be received inside this worker in the first case statement and we will execute the function inside this block here. Now if this taskFn execution gave us any error then we would simply send the err on errChan and continue to the next task.

But let’s say we receive the shutdown signal which will be handled by our second case statement and we will cancel this worker execution and return.

That’s all about the Worker Pool implementation in Go.

What’s next?

Implementation of the worker pool was quite easy but then the important part is, how many Go routines do we need to start? or how many workers do we need to start?

It completely depends on you. In my use case since I was dealing with a CPU-bound operation I had to do a bunch of benchmark analyses below are the results of my analysis

Worker CountApprox Execution TimeUser Count
101hr5M
1540min5M
2020min5M
2512mins5M
3011mins5M
3530min5M
4035min5M

I experimented with 25 and 30 several times but the results were competitive and sometimes 25 was better and sometimes 30 was better. We decided to stick with 25 (just decided).

So this is how we used the Worker Pool Pattern to reduce running time from 3 hours to 12 minutes for our worst-case scenario.

Follow the link for the source code: SourceCode