Fork me on GitHub

Channels and Transports Edit on GitHub


The actual connectors in a FubuMVC service bus application are channel's that are backed and created by transport's. Today, FubuMVC only supports a transport based on the Lightning Queues project and an in memory transport for testing.

Installing Transport's

The only thing you need to do to add a transport type to FubuMVC is the presence of the assembly for the transport adapter. In the case of the LightningQueues transport, you only need to install the FubuMVC.LightningQueues library via Nuget:

Install-Package FubuMVC.LightningQueues

Configuring Channels

A key goal of the FubuMVC successor ("Jasper") is to simplify and streamline the service bus bootstrapping shown in this section.

Channels require a little more work. First off, channels are identified and configured by a Uri matching the desired transport, port, and queue name.

From the Hello World example, we first need to build a "Settings" object for two channels identified as Pinger and Ponger:


public class HelloWorldSettings
{
    public Uri Pinger { get; set; } = 
        "lq.tcp://localhost:2352/pinger".ToUri();

    public Uri Ponger { get; set; } =
        "lq.tcp://localhost:2353/ponger".ToUri();
}

All the HelloWorldSettings class is is a way to identify channels and act as a means to communicate the channel Uri's to the service bus model. I'm hard coding the Uri's in the code above, but that information can be pulled from any kind of configuration using the built in support for strong typed configuration in FubuMVC.

To configure service bus channels, it's easiest to define your application with a FubuTransportRegistry<T> class where the "T" is your settings class. Inside of your registry class, you need to use the fluent interface that hangs off of the Channel() method to describe channels and their behavior. A pair of examples of this is shown below:


public class PingApp : FubuTransportRegistry<HelloWorldSettings>
{
    public PingApp()
    {
        // Configuring PingApp to send PingMessage's
        // to the PongApp
        Channel(x => x.Ponger)
            .AcceptsMessage<PingMessage>();

        // Listen for incoming messages from "Pinger"
        Channel(x => x.Pinger)
            .ReadIncoming();
    }
}

public class PongApp : FubuTransportRegistry<HelloWorldSettings>
{
    // Listen for incoming messages from "Ponger"
    public PongApp()
    {
        Channel(x => x.Ponger)
            .ReadIncoming();
    }
}

The FubuTransportRegistry is a subclass of the FubuRegistry class that adds additional options germaine to the service bus feature.

Listening to Messages from a Channel

While you can always send messages to any configured channel, you must explicitly mark which channels should be monitored for incoming messages to the current node. That's done with the ReadIncoming() method shown below:


public class ListeningApp : FubuTransportRegistry<HelloWorldSettings>
{
    public ListeningApp()
    {
        // This directs
        Channel(x => x.Pinger).ReadIncoming();
    }
}

Static Message Routing Rules

When you publish a message using IServiceBus without explicitly setting the Uri of the desired destination, FubuMVC has to invoke the known message routing rules and dynamic subscriptions to figure out which locations should receive the message. Consider this code that publishes a PingMessage:


public class SendingExample
{
    public async Task SendPingsAndPongs(IServiceBus bus)
    {
        // Publish a message
        bus.Send(new PingMessage());

        // Request/Reply
        var pong = await bus.Request<PongMessage>(new PingMessage());

        // "Delay" Send
        bus.DelaySend(new PingMessage(), TimeSpan.FromDays(1));
    }
}

To route PingMessage to a channel, we can apply static message routing rules directly on to a channel by using one of the Accepts**** methods as shown below:


public class StaticRoutingApp : FubuTransportRegistry<AppSettings>
{
    public StaticRoutingApp()
    {
        Channel(x => x.Transactions)

            // Explicitly add a single message type
            .AcceptsMessage<PingMessage>()

            // Explicitly add a single message type
            .AcceptsMessage(typeof(PongMessage))

            // Publish any types matching the supplied filter
            // to this channel
            .AcceptsMessages(type => type.Name.EndsWith("Message"))

            // Publish any message type contained in the assembly
            // to this channel, by supplying a type contained
            // within that assembly
            .AcceptsMessagesInAssemblyContainingType<PingMessage>()

            // Publish any message type contained in the named
            // assembly to this channel
            .AcceptsMessagesInAssembly("MyMessageLibrary")

            // Publish any message type contained in the
            // namespace given to this channel
            .AcceptsMessagesInNamespace("MyMessageLibrary")

            // Publish any message type contained in the namespace
            // of the type to this channel
            .AcceptsMessagesInNamespaceContainingType<PingMessage>();
    }
}

Do note that doing the message type filtering by namespace will also include child namespaces. In our own usage we try to rely on either namespace rules or by using shared message assemblies.

See also the Dynamic Subscriptions

Message Persistence

If the transport you're using supports this switch (the LightningQueues transport does), you can declare channels to publish messages with either a delivery guaranteed, persistent strategy or by a non-persistent strategy. The non-guaranteed delivery mode is significantly faster, but probably only suitable for message types where throughput is more important than message reliability.

Below is a sample of explicitly controlling the channel persistence:


public class AppSettings
{
    // This channel handles "fire and forget"
    // control messages
    public Uri Control { get; set; }
        = new Uri("lq.tcp://localhost:2345/control");


    // This channel handles normal business
    // processing messages
    public Uri Transactions { get; set; }
        = new Uri("lq.tcp://localhost:2346/transactions");
}

public class BigApp : FubuTransportRegistry<AppSettings>
{
    public BigApp()
    {
        // Declare that the "Control" channel
        // use the faster, but unsafe transport mechanism
        Channel(x => x.Control)
            .DeliveryFastWithoutGuarantee()
            .UseAsControlChannel();


        Channel(x => x.Transactions)
            // This is the default, but you can
            // still configure it explicitly
            .DeliveryGuaranteed();

    }
}

Control Queues

FubuMVC may need to send messages between running service bus nodes to coordinate activities, register for dynamic subscriptions, or perform health checks. Some of these messages are time sensitive, so it will frequently be valuable to set up a separate "control" channel for these messages so they aren't stuck in a backed up queue with your normal messages.

Also, the control messages fit the "fire and forget" messaging model, so we recommend using the non-persistent channel mode with these channels.

Below is a sample of setting up a control channel:


public class ControlChannelApp : FubuTransportRegistry<AppSettings>
{
    public ControlChannelApp()
    {
        Channel(x => x.Control)
            .UseAsControlChannel()
            .DeliveryFastWithoutGuarantee();
    }
}