Skip to content

Running Background jobs with gocraft in Golang

homepage-banner

Introduction

In this blog post, we will explore the powerful combination of GoLang, Redis, and Gocraft, and how they can elevate your web development projects to a new level.

  • GoLang is a popular programming language known for its speed, concurrency, and efficiency, often requiring background processes to handle tasks without blocking the main thread.
  • Background jobs are tasks that are executed asynchronously in the background while the main program continues to run. This ensures efficient utilization of system resources while avoiding blocking crucial functionality or user interfaces.

This article emphasizes three points:

  • Background jobs are tasks that are executed asynchronously in the background while the main program continues to run and can be used for tasks such as sending emails, processing large datasets, uploading files, etc.
  • Gocraft and GoLang are a powerful combination that enhances the efficiency and scalability of web development projects.
  • The Gocraft/work library simplifies the process of running background jobs with Redis, providing features such as job queuing, job parameters, job retrying, job logging, job scheduling, job concurrency, and a web UI.

What is the purpose of background jobs?

Background jobs are typically used for the following tasks:

  • Sending emails
  • Processing large datasets
  • Uploading files
  • Scheduling tasks
  • Long-running operations

They have several advantages:

  • Improved responsiveness: The main program remains responsive while running background tasks.
  • Increased scalability: You can quickly scale up the number of background workers to handle higher workloads.
  • Better resource utilization: System resources are utilized more efficiently as the main program doesn’t have to wait for background tasks to complete.

Goraft/work is a GoLang library that simplifies the process of running background jobs with Redis. It provides the following features:

  • Job queuing: Enqueueing and dequeuing jobs from a Redis queue.
  • Job parameters: Passing parameters to jobs for customization.
  • Job retries: Automatically retrying failed jobs a specified number of times.
  • Job logging: Tracking the execution status and logs of jobs.
  • Job scheduling: Scheduling jobs to run at specific times or intervals.
  • Job concurrency: Controlling the number of jobs that can run simultaneously.
  • Web UI: Monitoring job queues and worker thread performance through a web interface.

Gocraft/work utilizes a simple architecture consisting of three main components:

  1. Enqueuer: This component adds jobs to a Redis queue.
  2. Worker Pool: This is a pool of worker processes that dequeue jobs from the queue and execute them.
  3. Web UI: This visual interface monitors the job queue and worker performance.

Here is a basic overview of how Gocraft/work works:

  1. The application uses the Enqueuer to enqueue jobs into the Redis queue.
  2. Worker processes retrieve jobs from the queue and execute them.
  3. Worker processes report their status and log back to Redis.
  4. The Web UI displays information about the job queue and worker performance.

Job Scheduler and Redis Work Pool

It is best to have a job scheduler and a work pool to efficiently manage background jobs. The job scheduler determines when each job should be executed, while the work pool handles the pool of worker processes that execute the jobs.

Redis is a popular in-memory data store that is often used as a backup storage for background job queues. It offers high performance and reliability, making it ideal for handling a large number of jobs. Redis provides various installation options depending on your operating system and preferences. You can choose precompiled binaries, build from source code, or use the official package repository.

You can find a quick overview of the installation process for each platform here - https://redis.io/docs/install/install-redis/

Getting Started with Gocraft/Work

To start using Gocraft/Work, you can follow these steps:

(1) Install the Gocraft/Work library using the following command:

go get github.com/gocraft/work

(2) Download and install the binary for the Web UI:

go get github.com/gocraft/work/cmd/workwebui

(3) To run the Web UI, you can use the following command:

workwebui -redis="redis:6379" -ns="application_namespace" -listen=":5040"
  • “-redis”: This parameter specifies the address and port of the Redis server hosting the job queue.
  • “-ns”: This parameter defines the namespace used to identify the job queue.
  • “-listen”: This parameter specifies the port on which the Web UI will listen for connections.

Follow this directory structure:

example
├── scheduler
│   ├── scheduler.go
├── processors
│   └── process.go
└── go.mod

Queueing Homework

Redis Pool:

  1. redisPool: This variable defines the Redis connection pool. It ensures efficient and reusable connections to the Redis server.
  2. MaxActive: Specifies the maximum number of active connections in the pool (e.g., 5).
  3. MaxIdle: Defines the maximum number of idle connections in the pool (e.g., 5).
  4. Wait: Controls the behavior of the pool when all connections are busy.
  5. Dial: This function defines how the pool establishes new connections with the Redis server at address “:6379”.

Enqueuing:

  1. Enqueuer: This variable defines an enqueuer object responsible for queuing jobs to the Redis server.
  2. “application_namespace”: Specifies the namespace used to identify jobs belonging to this application.
  3. redisPool: Provides the Redis connection pool for enqueuing.
// main file in scheduler package

package main
import (
 "github.com/gomodule/redigo/redis"
 "github.com/gocraft/work"
)
// create redis pool
var redisPool = &redis.Pool{
 MaxActive: 5,
 MaxIdle: 5,
 Wait:true,
 Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379")
 },
}
//Using a specific namespace to create a query
var enqueuer = work.NewEnqueuer("application_namespace", redisPool)

func main() {

 _, err := enqueuer.Enqueue("send_welcome_email", work.Q{"email_address": "test@example.com", "user_id": 4})
if err != nil {
  log.Fatal(err)
 }
}
  • Call the Enqueue method on the enqueuer object.
  • “send_welcome_email”: This text specifies the job name to be added to the queue.
  • work.Q: This mapping stores the parameters of the job.
  • “email_address”: This parameter specifies the email address to send to.
  • “user_id”: Username: This parameter holds additional information about the customer.
  • The Enqueue method will return a job ID and an error object.
  • If there is an error while queuing the job, the log.Fatal(err) function will log the error message and exit the program.

Process jobs

// main file in processor package

package main

import (
 "github.com/gomodule/redigo/redis"
 "github.com/gocraft/work"
 "os"
 "os/signal"
)
// Make a redis pool
var redisPool = &redis.Pool{
 MaxActive: 5,
 MaxIdle: 5,
 Wait:true,
 Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379")
 },
}
type Context struct{
    email string
    userIdint64
}
func main() {
// Create a new pool. Parameters:
// Context{} is a struct that will serve as the context of the request.
// 10 is the maximum concurrency.
// "application_namespace" is the Redis namespace.
// redisPool is a Redis pool.
 pool := work.NewWorkerPool(Context{}, 10, "application_namespace", redisPool)

 pool.Middleware((*Context).Log)
 pool.Middleware((*Context).FindCustomer)

// Map the name of jobs to handler functions
 pool.Job("send_welcome_email", (*Context).SendWelcomeEmail)

// start
 pool.Start()

// waiting exit signal:
 signalChan := make(chan os.Signal, 1)
 signal.Notify(signalChan, os.Interrupt, os.Kill)
 <-signalChan

// Stop the pool
 pool.Stop()
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
 fmt.Println("Starting job: ", job.Name)
return next()
}

func (c *Context) FindCustomer(job *work.Job, next work.NextMiddlewareFunc) error {
// If there is a user_id parameter, set it in the context for future middleware and handlers to use.
if _, ok := job.Args["user_id"]; ok {
  c.userId = job.ArgInt64("user_id")
  c.email = job.ArgString("email_address")
if err := job.ArgError(); err != nil {
return err
  }
 }
return next()
}

func (c *Context) SendWelcomeEmail(job *work.Job) error {
// Extract arguments:
 addr := job.ArgString("email_address")
if err := job.ArgError(); err != nil {
return err
 }

 fmt.Println("Sending email")

 from, password := "xyz@gmail.com", "sthkxlpixjferhfd"
 to := []string{
   addr,
 }

 smtpHost := "smtp.gmail.com"
 smtpPort := "587"
 message := []byte("This is a Welcome email message.")

 auth := smtp.PlainAuth("", from, password, smtpHost)

//send mail
 err := smtp.SendMail(smtpHost+":"+smtpPort, auth, from, to, message)
if err != nil {
  fmt.Println(err)
return nil
 }

 fmt.Println("Email Sent Successfully!")
return nil
}

Plan Tasks

You can schedule tasks to be performed in the future.

enqueuer := work.NewEnqueuer("application_namespace", redisPool)
secondsInTheFuture := 300
_, err := enqueuer.EnqueueIn("send_welcome_email", secondsInTheFuture, work.Q{"email_address": "test@example.com"})
  • secondsInTheFuture: This variable stores the number of seconds in the future to execute the task. In this example, the value is 300 seconds (5 minutes).
  • EnqueueIn: This method is called on the enqueuer object to schedule the delayed execution of a job.

Unique Job

You can create a queue for unique jobs, so that only one job with a given name/parameter can exist in the queue at a time.

enqueuer := work.NewEnqueuer("application_namespace", redisPool)
job, err := enqueuer.EnqueueUnique("send_welcome_email", work.Q{"email_address": "xyz@gmail.com"})// job returned
job, err = enqueuer.EnqueueUnique("send_welcome_email", work.Q{"email_address": "xyz@gmail.com"})// job == nil -- this duplicate job isn't enqueued.
job, err = enqueuer.EnqueueUniqueIn("send_welcome_email", 300, work.Q{"email_address": "abc@gmail.com"})// job != nil (diff email_address)

Alternatively, you can provide your own key to ensure the uniqueness of the job. When another job is enqueued with the same key as an existing job in the queue, it will only update the parameters.

enqueuer := work.NewEnqueuer("application_namespace", redisPool)
job, err := enqueuer.EnqueueUniqueByKey("send_welcome_email", work.Q{"email_address": "xyz@gmail.com"}, map[string]interface{}{"my_key": "586"})
job, err = enqueuer.EnqueueUniqueInByKey("send_welcome_email", 120, work.Q{"email_address": "abc@gmail.com"}, map[string]interface{}{"my_key": "586"})

Periodic Queueing (Cron)

You can use the job pool to periodically queue jobs on the Gocraft/Worker cluster.

pool := work.NewWorkerPool(Context{}, 10, "application_namespace", redisPool)
pool.PeriodicallyEnqueue("0 * * * * *", "send_welcome_email")
pool.Job("send_welcome_email", (*Context).SendWelcomeEmail)

Concurrent Jobs

You can control the concurrency of jobs using JobOptions{MaxConcurrency: <num>}. Unlike the concurrency of worker pools, this controls the number of active jobs of a specific type that can be simultaneously active in a single Redis instance. This is achieved by setting a precondition on the enqueue function, which means that if we reach or exceed the job’s limit, no new jobs with MaxConcurrency will be scheduled. The Redis key (refer to the reference redis.go::redisKeyJobsLock) is used as a counting semaphore to track the concurrency of each job type. The default value is 0, which means “no limit to job concurrency”.

Note: If you want to run jobs in a “single-threaded” manner, you can set MaxConcurrency accordingly:

worker_pool.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, (*Context).WorkFxn)

Here are simplified breakdowns of key concepts and features of Gocraft/work:

Job Queues:

  • Each job has its own queue named after itself.
  • Queues in Redis are list-based.
  • Use LPUSH to enqueue jobs into their respective queues.

Job Prioritization:

  • Each queue can have a priority (1-100000).
  • Queues with higher priority have a higher chance of being chosen by workers.
  • Workers choose queues based on their relative preference probabilities.

Job Processing:

  1. Workers retrieve jobs from the selected queues.
  2. The job is automatically moved to the “in-progress” queue by a Lua script.
  3. The worker performs the job and increments a job lock.
  4. If successful, the job is removed from the “in-progress” queue.
  5. If unsuccessful, the job is moved to the “retry” or “dead” queue based on the remaining retry attempts.

Job Retries and Scheduled Jobs:

  • “Retry” and “scheduled” queues are implemented as Redis zsets.
  • The score is the timestamp of execution.
  • The “requeuer” periodically checks these queues and moves ready jobs to the regular queues.

Workers and Worker Pools:

  • Worker pools manage workers and provide a common API.
  • Start/stop pools, attach jobs/middleware, and set concurrency limits.
  • Each pool starts N worker goroutines based on the concurrency setting.
  • Each worker independently retrieves jobs from Redis, executes them, and loops.

Unique and Periodic Jobs:

  • Unique jobs ensure that only one instance with specific parameters exists in each queue.
  • Periodic jobs are enqueued using the “periodic enqueuer” based on cron schedules.

Other Components:

  • Reaper: Requeues jobs from the “in-progress” queue if a worker thread crashes.
  • Paused jobs: Jobs in a queue can be paused by setting a Redis key.
  • Dead jobs: Jobs that exceed the retry limit are moved to the dead queue.
  • Glossary: Clarification of terms such as “worker,” “queue,” and “job type.”

Conclusion

This in-depth guide explores the complexity of the Gocraft library, providing you with the knowledge to effectively manage background jobs in your Go applications. We explain key concepts such as job queues, prioritization, retries, and worker pools, and explore powerful features like periodic jobs, unique jobs, and dead-letter handling.

By using Gocraft, you can leverage the powerful capabilities of background processing to enhance the responsiveness, scalability, and overall user experience of your application.

Reference

  • https://www.jdon.com/71153.html
  • https://github.com/gocraft/work
Leave a message