Job Functions

When you call TaskQueue.Enqueue(...);, the expression passed to .Enqueue() is parsed to extract the function (and arguments) you wish to execute on the worker.

We Recommend Using Static Functions

To keep things simple, we recommend using private or public static functions with few or no arguments that are a part of your own codebase. Using System (built-in) functions (like Console.WriteLine) works in some cases, but in other cases doesn’t seem to work at all.

public static async Task Main()
{
    var taskQueue = TaskQueue.Redis("127.0.0.1:6379");
    
    await taskQueue.Enqueue(() => DoWork());
}

private static void DoWork()
{
    Console.WriteLine("Doing work...");
}

Non-Static Functions Will Run the Constructor

If the queued function is not static, the worker will create an instance of the class before executing the method, causing the constructor to be run. For this to work, there must be a Constructor with no arguments. If there is not a constructor with no arguments, an error will be thrown by the worker.

public class Program
{
    public Program()
    {
        Console.WriteLine("The constructor will be run for non-static queued functions");
    }

    public static async Task Main()
    {
        var taskQueue = TaskQueue.Redis("127.0.0.1:6379");
        
        await taskQueue.Enqueue(() => DoWork());
    }

    private void DoWork()
    {
        Console.WriteLine("Doing work...");
    }
}

No Special Treatment for Async Functions

The queued function may be async and will behave as expected on the worker. Queuing async functions is the same as queuing non-async functions. Using await in the .Enqueue() method will not work.

// Enqueue async methods the same way as non-async.
await taskQueue.Enqueue(() => MyAsyncFunc());
// Bad, don't use async await in .Enqueue()
await taskQueue.Enqueue(async () => await MyAsyncFunc());

Job Arguments

When you queue a job function to be run, the arguments passed in will be captured, serialized, persisted in Redis, and passed to the worker during execution time.

Use Simple Arguments

For best performance, use simple argument types (like strings or integers) that won’t take take up too many bytes. When passing database models, it’s always a good practice to pass the id of the model, and re-fetch the data on the worker.

// You can pass literals as function arguments.
await taskQueue.Enqueue(() => Console.WriteLine("A string"));
// Or pass variables as function arguments.
var aString = "A string";
await taskQueue.Enqueue(() => Console.WriteLine(aString));

Nested Function Arguments are Run Immediately

Using a nested function argument will cause it to be run immediately, and it’s return value passed onto the workers. In the following example, MyFunction is run immediately (before the job is queued), and "My String" is passed to the workers as the argument to Console.WriteLine as a part of the job.

Func<string> MyFunction = () => "My String";

// MyFunction() will be run immediately, not on the worker
await taskQueue.Enqueue(() => Console.WriteLine(MyFunction()));

Idempotence

As a best practice, all jobs should be idempotent. Putting it simply, if a job is run more than once, everything should still be okay, and the results the same as if it were run once.

While it’s not expected to run jobs multiple times, a failure in a worker node could cause this situation to arise.

This is a common best practice when using distributed job systems.

Job Types

Jobs can be queued:

  • to be run immediately by the first available worker (one-off / fire-and-forget),
  • to be run only after a specific DateTime (scheduled),
  • or on a recurring basis (recurring)

Fire-And-Forget Jobs

Fire-and-forget jobs are very simple, and are the same as have been demonstrated above.

  • Conceptually, it is just a function that is queued to be run on the workers.
  • These jobs cannot be canceled or updated once queued to run.
public class Program
{
    public static async Task Main()
    {
        var taskQueue = TaskQueue.Redis("127.0.0.1:6379");
        
        await taskQueue.Enqueue(() => DoWork());
    }

    private static void DoWork()
    {
        Console.WriteLine("Doing work...");
    }
}

Scheduled Jobs

Scheduled jobs are jobs that will only be run once, but not until at least a certain DateTime.

  • The TaskScheduler runs as a part of the worker, so at least one running worker is required for Scheduled Jobs execution times to be monitored.
  • If the task queue is filling faster than workers are running jobs, a scheduled job may be run after the specified DateTime, but never before it.
  • Incorrect system time set on the worker may cause unexpected run times.
  • Scheduling a job in the past will cause it to be run immediately (with a slight delay).

Set up Two ScheduledTasks

public class Program
{
    public static async Task Main()
    {
        var taskQueue = TaskQueue.Redis("127.0.0.1:6379");
        var taskClient = new TaskClient(taskQueue);

        // Schedule a Task for 5 minutes in the future.
        await taskClient.TaskScheduler.AddScheduledTask(() => WriteDate(), TimeSpan.FromMinutes(5));

        // Schedule a Task to be run at a specific DateTime
        DateTime runTime = DateTime.UtcNow + TimeSpan.FromDays(1);
        await taskClient.TaskScheduler.AddScheduledTask(() => WriteDate(), runTime);
    }

    private void WriteDate()
    {
        Console.WriteLine(DateTime.UtcNow.ToString());
    }
}

Canceling a ScheduledTask

ScheduledTasks can be canceled using one of two methods, demonstrated in this example.

public class Program
{
    public static async Task Main()
    {
        var taskQueue = TaskQueue.Redis("127.0.0.1:6379");
        var taskClient = new TaskClient(taskQueue);

        // Schedule a Task for 5 minutes in the future.
        var scheduledTask = await taskClient.TaskScheduler
            .AddScheduledTask(() => WriteDate(), TimeSpan.FromMinutes(5));

        // Method 1: Cancel with the ScheduledTask Object
        await taskClient.TaskScheduler.CancelScheduledTask(scheduledTask);

        // Method 2: Cancel with the ScheduledTask TaskKey
        await taskClient.TaskScheduler.CancelTask(scheduledTask.TaskKey);
    }

    private void WriteDate()
    {
        Console.WriteLine(DateTime.UtcNow.ToString());
    }
}

Recurring Jobs

Recurring Jobs are set up to be run on an interval. You may specify a TimeSpan based interval, or use a 6-part crontab to specify the interval.

If there are no running workers, recurring tasks will not just stack up on the queue to be run later. At least one running worker is required to process recurring tasks. If the workers are down for a while, recurring tasks will run at the next interval when the workers are started back up, but won’t run previous intervals.

Recurring tasks may be updated or canceled. They will continue to run until cancelled.

When you create a recurring task, you specify a unique name for that task. You will need this name to update or cancel the recurring task.

Set Up Two Recurring Tasks

public class Program
{
    public static async Task Main()
    {
        var taskQueue = TaskQueue.Redis("127.0.0.1:6379");
        var taskClient = new TaskClient(taskQueue);

        // Print the Time every Five Minutes, using a TimeSpan interval
        await taskClient.TaskScheduler.AddRecurringTask(() => WriteDate(), 
            TimeSpan.FromMinutes(5), "five-minute-timespan");

        // Print the Time every Seven Minutes, using a Crontab interval
        await taskClient.TaskScheduler.AddRecurringTask(() => WriteDate(), 
            "0 */7 * * * *", "seven-minute-crontab");
    }

    private void WriteDate()
    {
        Console.WriteLine(DateTime.UtcNow.ToString());
    }
}

Update a Recurring Task

In this example, we set up a recurring task, demonstrate how it can be updated, then finally cancel it.

Anytime you call .AddRecurringTask() with an already existing task name, the existing task will be overwritten, stop running immediately, and the newly specified recurring task will take its place.

public class Program
{
    public static async Task Main()
    {
        var taskQueue = TaskQueue.Redis("127.0.0.1:6379");
        var taskClient = new TaskClient(taskQueue);

        // Set up a Recurring Task
        await taskClient.TaskScheduler.AddRecurringTask(() => Write("hello!"), 
            TimeSpan.FromMinutes(5), "my-recurring-task");

        // Update the Interval
        await taskClient.TaskScheduler.AddRecurringTask(() => Write("hello!"), 
            TimeSpan.FromMinutes(7), "my-recurring-task");

        // Change the Interval to Crontab, Change function argument
        var recurringTask = await taskClient.TaskScheduler.AddRecurringTask(() => Write("crontab!"), 
            "0 */7 * * * *", "my-recurring-task");

        await taskClient.TaskScheduler.CancelRecurringTask(recurringTask);
    }

    private void Write(string value)
    {
        Console.WriteLine(value);
    }
}

Cancel a Recurring Task

Recurring Tasks can be canceled at any time after they are created, in one of three ways. All three ways are demonstrated in this example.

It’s important to note that when using the TaskScheduler.CancelTask() method, the argument is the TaskKey which is not the same as the task name you pass into .AddRecurringTask().

public class Program
{
    public static async Task Main()
    {
        var taskQueue = TaskQueue.Redis("127.0.0.1:6379");
        var taskClient = new TaskClient(taskQueue);

        // Set up a Recurring Task
        var recurringTask = await taskClient.TaskScheduler.AddRecurringTask(() => Write("hello!"), 
            TimeSpan.FromMinutes(5), "my-recurring-task");

        // Method 1: Cancel the RecurringTask with the RecurringTask object
        await taskClient.TaskScheduler.CancelRecurringTask(recurringTask);

        // Method 2:  Cancel the RecurringTask with the RecurringTask name
        await taskClient.TaskScheduler.CancelRecurringTask("my-recurring-task");

        // Method 3:  Cancel the RecurringTask with the RecurringTask.TaskKey
        //   NOTE: recurringTask.TaskKey is NOT the same as the task name "my-recurring-task"
        //         TaskScheduler.CancelTask() can also be used to cancel ScheduledTasks
        await taskClient.TaskScheduler.CancelTask(recurringTask.TaskKey);
    }

    private void Write(string value)
    {
        Console.WriteLine(value);
    }
}