The actual connectors in a FubuMVC service bus application are channel's that are backed and created by transport's. Today, FubuMVC only supports a transport based on the Lightning Queues project and an in memory transport for testing.
Installing Transport's
The only thing you need to do to add a transport type to FubuMVC is the presence of the assembly for the transport adapter. In the case of the LightningQueues transport, you only need to install the FubuMVC.LightningQueues library via Nuget:
Install-Package FubuMVC.LightningQueues
Configuring Channels
Channels require a little more work. First off, channels are identified and configured by a Uri matching the desired transport, port, and queue name.
From the Hello World example, we first need to build a "Settings" object for two channels
identified as Pinger
and Ponger
:
public class HelloWorldSettings
{
public Uri Pinger { get; set; } =
"lq.tcp://localhost:2352/pinger".ToUri();
public Uri Ponger { get; set; } =
"lq.tcp://localhost:2353/ponger".ToUri();
}
All the HelloWorldSettings
class is is a way to identify channels and act as a means to communicate the channel Uri's to
the service bus model. I'm hard coding the Uri's in the code above, but that information can be pulled from any kind of configuration using
the built in support for strong typed configuration in FubuMVC.
To configure service bus channels, it's easiest to define your application with a FubuTransportRegistry<T>
class where the "T"
is your settings class. Inside of your registry class, you need to use the fluent interface that hangs off of the Channel()
method
to describe channels and their behavior. A pair of examples of this is shown below:
public class PingApp : FubuTransportRegistry<HelloWorldSettings>
{
public PingApp()
{
// Configuring PingApp to send PingMessage's
// to the PongApp
Channel(x => x.Ponger)
.AcceptsMessage<PingMessage>();
// Listen for incoming messages from "Pinger"
Channel(x => x.Pinger)
.ReadIncoming();
}
}
public class PongApp : FubuTransportRegistry<HelloWorldSettings>
{
// Listen for incoming messages from "Ponger"
public PongApp()
{
Channel(x => x.Ponger)
.ReadIncoming();
}
}
The FubuTransportRegistry
is a subclass of the FubuRegistry class that adds additional options
germaine to the service bus feature.
Listening to Messages from a Channel
While you can always send messages to any configured channel, you must explicitly mark which
channels should be monitored for incoming messages to the current node. That's done with the
ReadIncoming()
method shown below:
public class ListeningApp : FubuTransportRegistry<HelloWorldSettings>
{
public ListeningApp()
{
// This directs
Channel(x => x.Pinger).ReadIncoming();
}
}
Static Message Routing Rules
When you publish a message using IServiceBus
without explicitly setting the Uri of the desired
destination, FubuMVC has to invoke the known message routing rules and dynamic subscriptions to
figure out which locations should receive the message. Consider this code that publishes a
PingMessage
:
public class SendingExample
{
public async Task SendPingsAndPongs(IServiceBus bus)
{
// Publish a message
bus.Send(new PingMessage());
// Request/Reply
var pong = await bus.Request<PongMessage>(new PingMessage());
// "Delay" Send
bus.DelaySend(new PingMessage(), TimeSpan.FromDays(1));
}
}
To route PingMessage
to a channel, we can apply static message routing rules directly on to a
channel by using one of the Accepts**** methods as shown below:
public class StaticRoutingApp : FubuTransportRegistry<AppSettings>
{
public StaticRoutingApp()
{
Channel(x => x.Transactions)
// Explicitly add a single message type
.AcceptsMessage<PingMessage>()
// Explicitly add a single message type
.AcceptsMessage(typeof(PongMessage))
// Publish any types matching the supplied filter
// to this channel
.AcceptsMessages(type => type.Name.EndsWith("Message"))
// Publish any message type contained in the assembly
// to this channel, by supplying a type contained
// within that assembly
.AcceptsMessagesInAssemblyContainingType<PingMessage>()
// Publish any message type contained in the named
// assembly to this channel
.AcceptsMessagesInAssembly("MyMessageLibrary")
// Publish any message type contained in the
// namespace given to this channel
.AcceptsMessagesInNamespace("MyMessageLibrary")
// Publish any message type contained in the namespace
// of the type to this channel
.AcceptsMessagesInNamespaceContainingType<PingMessage>();
}
}
Do note that doing the message type filtering by namespace will also include child namespaces. In our own usage we try to rely on either namespace rules or by using shared message assemblies.
See also the Dynamic Subscriptions
Message Persistence
If the transport you're using supports this switch (the LightningQueues transport does), you can declare channels to publish messages with either a delivery guaranteed, persistent strategy or by a non-persistent strategy. The non-guaranteed delivery mode is significantly faster, but probably only suitable for message types where throughput is more important than message reliability.
Below is a sample of explicitly controlling the channel persistence:
public class AppSettings
{
// This channel handles "fire and forget"
// control messages
public Uri Control { get; set; }
= new Uri("lq.tcp://localhost:2345/control");
// This channel handles normal business
// processing messages
public Uri Transactions { get; set; }
= new Uri("lq.tcp://localhost:2346/transactions");
}
public class BigApp : FubuTransportRegistry<AppSettings>
{
public BigApp()
{
// Declare that the "Control" channel
// use the faster, but unsafe transport mechanism
Channel(x => x.Control)
.DeliveryFastWithoutGuarantee()
.UseAsControlChannel();
Channel(x => x.Transactions)
// This is the default, but you can
// still configure it explicitly
.DeliveryGuaranteed();
}
}
Control Queues
FubuMVC may need to send messages between running service bus nodes to coordinate activities, register for dynamic subscriptions, or perform health checks. Some of these messages are time sensitive, so it will frequently be valuable to set up a separate "control" channel for these messages so they aren't stuck in a backed up queue with your normal messages.
Also, the control messages fit the "fire and forget" messaging model, so we recommend using the non-persistent channel mode with these channels.
Below is a sample of setting up a control channel:
public class ControlChannelApp : FubuTransportRegistry<AppSettings>
{
public ControlChannelApp()
{
Channel(x => x.Control)
.UseAsControlChannel()
.DeliveryFastWithoutGuarantee();
}
}