gwerren.com

Programming notes and tools...

HangFire as a Service Bus

Wed, 22 Apr 2020 HangFireService BusC#

Introduction

I have recently started using HangFire in a fairly large system that is currently hosted as a single web app.

HangFire has many different uses including running regular scheduled tasks however one great use is to move important or time consuming work to a background job that can be monitored and re-run if needed.

Imagine a system holds user status information and needs to call a third party system to inform it when the user changes their status. Now lets assume we don't want status updating to fail if the third party system is off-line. In this case moving the call to the third party system to a HangFire job could give the separation and reliability we need.

Taking the example a bit further lets assume we now need to add a call to another third party system when a user updates their status. Here we could simply create a second HangFire job like the first but targeting the new third party system.

This is all good however as things grow we may want to separate out parts of our system at which point we may look to move to a service bus style architecture (I won't go into why you would want to in this post, there are plenty of good resources available on the web). Moving from HangFire to a service bus implementation will be pretty hard if we have HangFire BackgroundJob.Enqueue calls scattered all over our code.

In this post I will look at how we could wrap HangFire in such a way to make it far easier to move to a full fledged service bus (like MassTransit, I will reference this a fair bit in this post as I have experience using it) when the time comes.

So why wouldn't I start a full service bus to start with? The answer is simple, a full blown service bus is a lot more infrastructure to run, HangFire runs within the existing web app.

Required Features

NB: When I use terms like "publish" above I mean that to the calling code it should look like publishing

Code on GitHub

I have published a repository on GitHub (HangFireServiceBusFacadeExample) which contains the sample code I reference in the rest of this post. It has a series of commits with the important ones as follows:

  1. Add a basic service bus facade for HangFire with some example event messages and consumers
  2. Use IBackgroundJobClient instead of BackgroundJob
  3. Pass a context containing the message to consumers rather than passing the message directly

First Version

IMessagePublisher

To start with lets define a simple event publishing interface.

public interface IMessagePublisher
{
    Task Publish<TMessage>(TMessage message)
        where TMessage : class, new();
}

This only has a single Publish method which takes an instance of a message. We place a couple of restrictions on the message type, making it more likely to be able to be used with with the JSON serialization used in HangFire. We also make the Publish method return Task, allowing the use of async implementations if desired. Whilst HangFire job creation does not currently support async creation other service bus implementations do so this gives a good amount of future proofing.

NB: Some service bus implementations (including MassTransit) advise using interfaces instead of objects for the messages, that is not possible with this approach (at least not without more effort) however the general advice about avoiding inheritance in message types should still be followed

IMessageConsumer

Now we need to define an interface for a consumer of a message.

public interface IMessageConsumer<TMessage>
    where TMessage: class, new()
{
    Task Consume(TMessage message);
}

Like the publishing interface this is pretty simple and has a single method, Consume, which takes the message to be consumed. The restrictions on the message type are the same as before and the method returns Task which allows consumers to use async methods which a lot of consumers are likely to require.

Implementation / Configuration

We now need an implementation of the IMessagePublisher that we can register as a service with the service collection in ASP.NET Core. This should in turn support registering consumers for specific message types. First lets have a look at how we might perform the registration in the Startup.ConfigureServices method:

services.AddSingleton<IMessagePublisher>(
    o => new MessagePublisher()
        .For<TestEvent>(
            messageConfig => messageConfig
                .Consumer<TestEventConsumer1>()
                .Consumer<TestEventConsumer2>())
        .For<WeatherForecastsRequestedEvent>(
            messageConfig => messageConfig.Consumer<WeatherForecastsRequestedEventConsumer>()));

Notice the fluent style API for configuring the publisher, this is common for this sort of configuration as it makes reading and writing configuration easier

For the implementation we have the following code:

public interface IMessageTypeConfigurator<TMessage>
    where TMessage: class, new()
{
    IMessageTypeConfigurator<TMessage> Consumer<TConsumer>()
        where TConsumer : IMessageConsumer<TMessage>;
}

public class MessagePublisher : IMessagePublisher
{
    private readonly IDictionary<Type, IMessageConsumerSet> consumerSetsByMessageType
        = new Dictionary<Type, IMessageConsumerSet>();

    public MessagePublisher For<TMessage>(Action<IMessageTypeConfigurator<TMessage>> configure)
        where TMessage: class, new()
    {
        if (!this.consumerSetsByMessageType.TryGetValue(typeof(TMessage), out var consumerSet))
        {
            consumerSet = new MessageConsumerSet<TMessage>();
            this.consumerSetsByMessageType.Add(typeof(TMessage), consumerSet);
        }

        configure((MessageConsumerSet<TMessage>)consumerSet);
        return this;
    }

    public Task Publish<TMessage>(TMessage message) where TMessage : class, new()
    {
        if (this.consumerSetsByMessageType.TryGetValue(typeof(TMessage), out var consumerSet))
            ((MessageConsumerSet<TMessage>)consumerSet).Publish(message);

        return Task.CompletedTask;
    }

    private interface IMessageConsumerSet { }

    private class MessageConsumerSet<TMessage> : IMessageConsumerSet, IMessageTypeConfigurator<TMessage>
        where TMessage : class, new()
    {
        private readonly IList<IConsumerWrapper> consumers = new List<IConsumerWrapper>();

        public IMessageTypeConfigurator<TMessage> Consumer<TConsumer>()
            where TConsumer : IMessageConsumer<TMessage>
        {
            this.consumers.Add(new ConsumerWrapper<TConsumer>());
            return this;
        }

        public void Publish(TMessage message)
        {
            foreach (var consumer in this.consumers)
                consumer.Publish(message);
        }

        private interface IConsumerWrapper
        {
            void Publish(TMessage message);
        }

        private class ConsumerWrapper<TConsumer> : IConsumerWrapper
            where TConsumer : IMessageConsumer<TMessage>
        {
            public void Publish(TMessage message)
            {
                // Here we create the HangFire jobs for the registered consumers
                BackgroundJob.Enqueue<TConsumer>(c => c.Consume(message));
            }
        }
    }
}

This is pretty straight forward, we have a MessageConsumerSet that wraps up all the consumers for a specific message type. Each consumer is then individually wrapped in a ConsumerWrapper which has the task of calling BackgroundJob.Enqueue in a strongly typed manner.

Usage

Now that we have an implementation of IMessagePublisher registered with the service provider, anywhere we want to publish event messages we can simply request the IMessagePublisher and publish a new message.

It is important to note that, since the consumer implementations will be resolved by HangFire when the jobs are executed and in the ASP.NET Core HangFire implementation the built in service provider is used for dependency resolution, we can write the IMessageConsumer implementations with constructor dependencies and they will be resolved for us. We don't need to register every consumer as a service though since HangFire takes a hybrid approach to resolution at the top level.

Second Version

Rather than calling the static BackgroundJob.Enqueue HangFire method we may instead want to use the IBackgroundJobClient.Enqueue method. To do this we need to request an instance of IBackgroundJobClient from the service provider and pass it into the MessagePublisher. This is a pretty simple thing to do and can be seen in the GitHub commit.

Third Version

Something that some service bus implementations do is to pass a context object to consumers rather than just the raw message. This context contains the message along with other things such as the ability to publish new, linked events. We won't try to link messages that are published as a result of a previous message however we will look to pass a context with the message and a publish method.

Interfaces

First lets define a context and update the IMessageConsumer interface to use it:

public interface IMessageConsumer<TMessage>
    where TMessage: class, new()
{
    Task Consume(IConsumeContext<TMessage> context);
}

public interface IConsumeContext<TMessage>
{
    TMessage Message { get; }

    Task Publish<TPublishMessage>(TPublishMessage message)
        where TPublishMessage: class, new();
}

The IConsumeContext is pretty simple, it has a publish method that matches the IMessagePublisher interface along with a property for the message itself.

Creating the Context

Now we need to change the ConsumerWrapper that we defined previously to include creation of the context. We can do this by defining a simple class (in this case MessageConsumerWithContext) which wraps up the creation of the context object and itself exposes a Consume method that takes the raw message.

Here is the change to the ConsumerWrapper.

private class ConsumerWrapper<TConsumer> : IConsumerWrapper
    where TConsumer : IMessageConsumer<TMessage>
{
    public void Publish(IBackgroundJobClient backgroundJobClient, TMessage message)
    {
        // Here we create the HangFire jobs for the registered consumers
        backgroundJobClient.Enqueue<MessageConsumerWithContext<TMessage, TConsumer>>(
            c => c.Consume(message));
    }
}

And here is the MessageConsumerWithContext implementation.

public class MessageConsumerWithContext<TMessage, TConsumer>
    where TMessage : class, new()
    where TConsumer : IMessageConsumer<TMessage>
{
    private readonly IMessagePublisher messagePublisher;
    private readonly TConsumer messageConsumer;

    public MessageConsumerWithContext(IMessagePublisher messagePublisher, TConsumer messageConsumer)
    {
        this.messagePublisher = messagePublisher;
        this.messageConsumer = messageConsumer;
    }

    public Task Consume(TMessage message)
        => this.messageConsumer.Consume(new Context(this.messagePublisher, message));

    private class Context : IConsumeContext<TMessage>
    {
        private readonly IMessagePublisher messagePublisher;

        public Context(IMessagePublisher messagePublisher, TMessage message)
        {
            this.messagePublisher = messagePublisher;
            this.Message = message;
        }

        public TMessage Message { get; }

        public Task Publish<TPublishMessage>(TPublishMessage message)
            where TPublishMessage : class, new()
            => this.messagePublisher.Publish(message);
    }
}

JobActivator

There is one little problem with this approach, remember how we were very happy to rely on the built-in dependency resolution in HangFire to create our consumer instances even if we hadn't registered them with the service provider? Now that won't work since the MessageConsumerWithContext is the top level object being instantiated. We could register every consumer with the service provider but we can do one better and define a custom HangFire JobActivator. The code for the ASP.NET version is pretty simple so we can copy that and modify it to also handle MessageConsumerWithContext as a special case as follows:

private class ContextAwareJobActivator : JobActivator
{
    private readonly IServiceScopeFactory serviceScopeFactory;

    public ContextAwareJobActivator(IServiceScopeFactory serviceScopeFactory)
        => this.serviceScopeFactory =
            serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));

    public override JobActivatorScope BeginScope(JobActivatorContext context) => this.BeginScope();

    public override JobActivatorScope BeginScope() => new Scope(this.serviceScopeFactory.CreateScope());

    private class Scope : JobActivatorScope
    {
        private static readonly Type MessageConsumerWithContext = typeof(MessageConsumerWithContext<,>);
        private readonly IServiceScope serviceScope;

        public Scope(IServiceScope serviceScope)
            => this.serviceScope = serviceScope ?? throw new ArgumentNullException(nameof(serviceScope));

        public override object Resolve(Type type)
        {
            // MessageConsumerWithContext is a special type for us to create,
            // delegating the creation of the actual consumer to the normal
            // HangFire behavior for ASP.NET Core which allows the consumer
            // types to not need to be directly registered with the DI container.
            if (type.IsGenericType && type.GetGenericTypeDefinition() == MessageConsumerWithContext)
            {
                return Activator.CreateInstance(
                    type,
                    this.serviceScope.ServiceProvider.GetService<IMessagePublisher>(),
                    ActivatorUtilities.GetServiceOrCreateInstance(
                        this.serviceScope.ServiceProvider,
                        type.GenericTypeArguments[1]));
            }

            // If this is not a request for MessageConsumerWithContext then immediately default
            // to the default HangFire behavior.
            return ActivatorUtilities.GetServiceOrCreateInstance(this.serviceScope.ServiceProvider, type);
        }

        public override void DisposeScope() => this.serviceScope.Dispose();
    }
}

Finally we need to tell HangFire to use this new JobActivator, we do this in Startup.ConfigureServices within the AddHangfire call where we simply add the following additional line:

.UseActivator(new ContextAwareJobActivator(serviceProvider.GetService<IServiceScopeFactory>()))

I also decided to wrap the registration code within Startup in some friendly extension methods which can be found in the GitHub commit.

Conclusion and Additional Thoughts

HangFire is a great system, it is quick and easy to get started with and has a fairly low overhead in terms of hosting (since it is within the same process as the rest of your application). A system may well out-grow HangFire for some use-cases though, especially those that, in a larger system would be best-served by a service bus. We have seen here how it is possible to use a system like HangFire that fits well now whilst also making it far easier to transition in future if needed.

One thing that I will leave as an exercise for the reader is to automatically register all consumers using reflection rather than using the fluent API. As the number of consumers grows, registering them all manually could become a major pain (not to mention potential source of bugs and merge conflicts).