Topics.Core InProcess Messaging

Topics.Core messaging is based on the AMQP standard. The AMQP messaging standard is a protocol for messaging, queuing and routing. The routing features include point-to-point as well as publish-and-subscribe. The basic concept is that the applications establish an “exchange” where message producers send their messages. The exchange is not a queue but more or less a routing system. Consumers of messages create queues and bind them to the exchange using a routing-key. The message bus then routes messages to the appropriate consumers based on exchange type and routing-key where listeners service the queue and deliver the message to the application. Multiple consumers can bind to the same exchange with the same routing key, creating interesting possibilities. If the exchange is of type “fanout”, the messages are broadcast to many consumer queues at once. If the exchange is set up as a “topic” exchange, consumers receive messages based on a routing key pattern. A very good description of these concepts can be found on the RabbitMQ site.

Topics.Core contains an interface to several message bus platforms that support AMQP. There is a unified API so switching between underlying message bus platforms can accomplished through configuration rather than having to recompile and redeploy systems. In Topics.Core, the gateway to the various underlying message bus platforms is called a “Topic Bus”.

Let’s take a look at the InProcessTopicBus. This is an in-process implementation of an AMQP Broker that is based on the Producer Consumer Pattern. It’s a very handy and light-weight way to keep various parts of your application up to date with real time messages that may be flowing through your system. It’s especially handy in GUI applications, for example, communicating between multiple documents in a WPF MVVM application. The best way to learn anything is with an example. I’ve put together a collection functional tests in the Tests folder of the Topics.Core solution. This example is located in:

Topics.Core->Development->Tests->Messaging->Topics.Core.Messaging.InProcessUnitTests

In Topics.Core, messaging bindings are typically expressed in xml configuration files and Spring.Net is used as the Dependency Injection (DI) container. See this blog post for more information on how Topics.Core leverages Spring.Net. Here is the Messaging.xml file located in the Config folder containing the messaging binding.

<objects xmlns="http://www.springframework.net"
         xmlns:nms="http://www.springframework.net/nms">

  <alias name="InProcessTopicBus" alias="DefaultTopicBus"/>

  <object id="InProcessTopicBus" type="Topics.Core.Messaging.InProcess.InProcessTopicBus, Topics.Core">
    <property name="MessageConverter" ref="SimpleMessageConverter"/>
    <property name="MessageListenerFactory" ref="ActionMessageListenerFactory"/>
    <property name="ReplyTimeout" value="3000"/>
  </object>

  <object name="SimpleMessageConverter" type="Topics.Core.Messaging.SimpleMessageConverter, Topics.Core">
  </object>

  <object name="ActionMessageListenerFactory" type="Topics.Core.Messaging.ActionMessageListenerFactory, Topics.Core">
  </object>

</objects>

Firstly, by defining the alias of DefaultTopicBus, we can omit specifying which TopicBus to use inside the application. We can then simply modify the alias declaration in configuration to point to a different underlying message bus, for example RabbitMQTopicBus. The InProcessTopicBus is a gateway to our built-in Producer-Consumer message broker. The required properties for InProcessTopicBus are MessageConverter and MessageListenerFactory. When one creates a queue and binds to an exchange, a message listener created by the MessageListenerFactory services the queue. In this case, ActionMessageListenerFactory creates a MessageListener that calls either an Action or Func<T,R> based on the type contained in the message payload. The MessageConverter allows one to choose the best format for data in transit based on the underlying transport. In this case since we are in-process, the SimpleMessageConverter is passing references to objects. That’s a very important point to understand. We are passing references for this example, so consumers should take care if updating any fields or properties on the message payload. All other TopicBus gateways pass copies of messages in a serialized format (JSON, XML, or binary).

Now let’s take a look at the test located in MessagingTests.cs. In the initialization routine, we get access to the DI container by constructing a new XmlApplicationContext from one or more XML streams. In this case, the XML is contained in an embedded resource on the assembly. Next, the GetObject() method retrieves the instance of the object called “DefaultTopicBus” from the DI container. If you recall, this is an alias for InProcessTopicBus.

        [TestInitialize]
        public void TestInitialize()
        {
            IApplicationContext ctx = new XmlApplicationContext(
            "assembly://Topics.Core.Messaging.InProcess.UnitTests/Topics.Core.Messaging.InProcess.UnitTests.Config/Messaging.xml");
            _topicBus = (ITopicBus)ctx.GetObject("DefaultTopicBus");
        }

The first TestMethod is an example of a synchronous RPC. The method that sends the message, blocks the executing thread until the response is received. If you recall, we set a SendTimeout value of 3 seconds in the configuration of the messaging binding. If the response is not received within 3 seconds, the send method will return a null value.

        [TestMethod()]
        public void SynchronousRPC()
        {
            LOG.Info("Starting SynchronousRPC");
            var requestExchange = "SynchronousRPC";
            var requestRoutingKey = "SynchronousRPCRoutingKey";
            var requestQueue = "SynchronousRPCQueue";
            var requestListenerName = "SynchronousRPCListener";

            _topicBus.ExchangeDeclare(requestExchange, "direct", false, false);
            _topicBus.QueueDeclare(requestQueue, false, false, false);
            _topicBus.QueueBind(requestQueue, requestExchange, requestRoutingKey);

            TextMessage textMessage = new TextMessage()
            {
                ContextID = Guid.NewGuid().ToString(),
                From = "Me",
                To = "You",
                Text = "Hello world"
            };

            _topicBus.CreateListener(requestListenerName, requestQueue, true, (Func<TextMessage, TextMessage>)((TextMessage message) =>
            {
                Thread.Sleep(2000);
                return message;
            }));

            var response = _topicBus.SendAndReceive(requestExchange, requestRoutingKey, textMessage) as TextMessage;
            Assert.IsTrue(response != null, "Return result not received");
            LOG.Info("Ending SynchronousRPC");
        }

First we create an exchange and give it a name. Then we create a queue and bind it to the exchange with a routing key. Once we’ve created our message payload, we create our listener by providing an Func<T,R> anonymous delegate. T is the type of message being received and R is the type of the return value. For a listener that does not return a value, simply pass in Action<T>. Multiple listeners can be created on a single Queue. Only messages containing a payload of type TextMessage, in this case, will be able to reach this handler. Now, we use the _topicBus to send the message and wait for the result. This is pseudo-synchronous because the handler is actually running on a separate thread. The SendAndReceive() method blocks for the configured amount of time before giving up. Set the Sleep() parameter to 4000 and the SendAndReceive() call will time out and return a null value.

This is a very contrived example for sure, but useful nonetheless. This is very handy technique when various parts of an application need to communicate, but there is no practical way to provide a reference to a message producer.  The InProcessTopicBus also saves the overhead of serialization and de-serialization that would be incurred when using an external message bus.

The second TestMethod demonstrates Publish and Subscribe. In this case the message subscriber is simply listening for messages and does not return a value. The subscriber uses a wildcard routing key to filter the messages that being routed through the exchange.

        public void PublishSubscribe()
        {
            LOG.Info("Starting PublishSubscribe");
            var requestExchange = "PublishSubscribe";
            var requestRoutingKeySendMatching = "PublishSubscribeRoutingKey";
            var requestRoutingKeySendNotMatching = "Publish";
            var requestRoutingKeyBind = "PublishSubscribe*";
            var requestQueue = "PublishSubscribeQueue";
            var requestListenerName = "PublishSubscribeListener";

            _topicBus.ExchangeDeclare(requestExchange, "topic", false, false);
            _topicBus.QueueDeclare(requestQueue, false, false, false);
            _topicBus.QueueBind(requestQueue, requestExchange, requestRoutingKeyBind);

            TextMessage textMessage = new TextMessage()
            {
                ContextID = Guid.NewGuid().ToString(),
                From = "Me",
                To = "You",
                Text = "Hello world"
            };

            AutoResetEvent messageReceived = new AutoResetEvent(false);
            _topicBus.CreateListener(requestListenerName, requestQueue, true, (Action<TextMessage>)((TextMessage message) =>
            {
                messageReceived.Set();
            }));

            //Send a message with routingkey matching the the wildcard routingkey used to bind the queue
            _topicBus.SendAndReceive(requestExchange, requestRoutingKeySendMatching, textMessage);
            if (!messageReceived.WaitOne(2000))
            {
                //If we didnt get the message within 2 seconds, its not coming.
                Assert.Fail("Message was not received even though the RoutingKeys matched");
            }

            //Send a message with routingkey not matching the the wildcard routingkey used to bind the queue
            _topicBus.SendAndReceive(requestExchange, requestRoutingKeySendNotMatching, textMessage);
            if (messageReceived.WaitOne(2000))
            {
                //If we did get the message, we shouldnt have
                Assert.Fail("Message was not received even though the RoutingKeys matched");
            }

            LOG.Info("Ending PublishSubscribe");
        }

Here, we set the type parameter of the exchange to “topic” and bind to that exchange using a routing key with wildcard (“*”). Now, only messages sent to this exchange with a routing key that begins with “PublishSubscribe” will be delivered. All others will fail. Using an AutoResetEvent in the handler signals where the message was received while the WaitOne(2000) call gives it 2 seconds to acknowledge.

 

We’re really just scratching the surface of Topics.Core messaging, and more articles will follow discussing other capabilities and TopicBus implementations.

What we’ve covered in this article is:

  • Topics.Core has a unified messaging API based on AMQP
  • There is a built-in TopicBus for sending messages inside your process.
  • There are other TopicBus implementations including RabbitMQ, SignalR and TibcoRV.
  • Topics.Core uses Spring.Core Dependency Injection container to configure TopicBus instances.
  • How to create an exchange and a queue.
  • How to bind the queue to the exchange with a routing key.
  • How to listen for messages with anonymous delegates based on Action<T> and Func<T,R>.
0 votes

BECOME A COMMENTERS?

You must be logged in to post a comment.