Skip to main content

How to integrate BizTalk Server 2010 / 2013 with Service Bus for Windows Server

This solution shows how to integrate a BizTalk Server 2010/2013 application with Service Bus for Windows Server using the WCF-Custom adapter to exchange messages with external systems in a reliable, flexible, and scalable manner.

C# (5.3 MB)
 
 
 
 
 
4.7 Star
(6)
1,708 times
Add to favorites
6/20/2014
E-mail Twitter del.icio.us Digg Facebook

Solution explorer

C#
#region Copyright
//=======================================================================================
//Microsoft Windows Server AppFabric Customer Advisory Team (CAT)  
//
// This sample is supplemental to the technical guidance published on the community
// blog at http://blogs.msdn.com/paolos. 
// 
// Author: Paolo Salvatori
//=======================================================================================
// Copyright � 2011 Microsoft Corporation. All rights reserved.
// 
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER 
// EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF 
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. YOU BEAR THE RISK OF USING IT.
//=======================================================================================
#endregion

#region Using Directives
using System;
using System.Xml;
using System.Collections.Generic;
using System.Diagnostics;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.ServiceModel.Dispatcher;
using Microsoft.ServiceBus.Messaging;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBusForWindowsServer.MessageInspector
{
    /// <summary>
    /// This class is used to promote\write properties to the message context.
    /// </summary>
    public class ServiceBusMessageInspector : IDispatchMessageInspector, IClientMessageInspector
    {
        #region Private Constants
        //***************************
        // Constants
        //***************************

        private const string Source = "ServiceBusMessageInspector";
        private const string BrokeredMessagePropertySchemaNamespace = "http://schemas.microsoft.com/servicebusforwindowsserver/2013/brokered-message-property";
        private const string PropertiesToPromoteKey = "http://schemas.microsoft.com/BizTalk/2006/01/Adapters/WCF-properties/Promote";
        private const string PropertiesToWriteKey = "http://schemas.microsoft.com/BizTalk/2006/01/Adapters/WCF-properties/WriteToContext";
        private const string ContentTypeProperty = "http://schemas.microsoft.com/servicebusforwindowsserver/2013/brokered-message-property#ContentType";
        private const string CorrelationIdProperty = "http://schemas.microsoft.com/servicebusforwindowsserver/2013/brokered-message-property#CorrelationId";
        private const string LabelProperty = "http://schemas.microsoft.com/servicebusforwindowsserver/2013/brokered-message-property#Label";
        private const string ReplyToProperty = "http://schemas.microsoft.com/servicebusforwindowsserver/2013/brokered-message-property#ReplyTo";
        private const string ReplyToSessionIdProperty = "http://schemas.microsoft.com/servicebusforwindowsserver/2013/brokered-message-property#ReplyToSessionId";
        private const string SessionIdProperty = "http://schemas.microsoft.com/servicebusforwindowsserver/2013/brokered-message-property#SessionId";
        private const string ToProperty = "http://schemas.microsoft.com/servicebusforwindowsserver/2013/brokered-message-property#To";
        private const string ContentType = "ContentType";
        private const string CorrelationId = "CorrelationId";
        private const string DeliveryCount = "DeliveryCount";
        private const string EnqueuedTimeUtc = "EnqueuedTimeUtc";
        private const string ExpiresAtUtc = "ExpiresAtUtc";
        private const string Label = "Label";
        private const string MessageId = "MessageId";
        private const string ReplyTo = "ReplyTo";
        private const string ReplyToSessionId = "ReplyToSessionId";
        private const string ScheduledEnqueueTimeUtc = "ScheduledEnqueueTimeUtc";
        private const string SequenceNumber = "SequenceNumber";
        private const string SessionId = "SessionId";
        private const string TimeToLive = "TimeToLive";
        private const string To = "To";
        private const string PropertiesToPromote = "PropertiesToPromote";
        private const string PropertiesToWrite = "PropertiesToWrite";
        private const string PropertiesToSend = "PropertiesToSend";

        //***************************
        // Messages
        //***************************
        private const string PropertiesElementCannotBeNull = "[ServiceBusMessageInspector] The PropertiesElement object cannot be null.";
        private const string NameAndNamespaceNumberAreDifferent = "[ServiceBusMessageInspector] The number of items in the name and namespace comma-separated lists is different in the {0} property.";
        private const string PropertyAddedToList = "[ServiceBusMessageInspector] The following property was added to the [{0}] list:\n\r - Name=[{1}]\n\r - Namespace=[{2}]\n\r - Value=[{3}]";
        private const string ExceptionFormat = "[ServiceBusMessageInspector] The following exception occurred=[{0}].";
        #endregion

        #region Private Fields
        //***************************
        // Private Fields
        //***************************

        private readonly bool isComponentEnabled;
        private readonly bool isTrackingEnabled;
        private readonly Dictionary<string, string> propertiesToPromoteDictionary = new Dictionary<string, string>();
        private readonly Dictionary<string, string> propertiesToWriteDictionary = new Dictionary<string, string>();
        private readonly Dictionary<string, string> propertiesToSendDictionary = new Dictionary<string, string>();
        #endregion

        #region Public Constructors
        //***************************
        // Public Constructors
        //***************************

        /// <summary>
        /// Initializes a new instance of the ServiceBusMessageInspector class.
        /// </summary>
        public ServiceBusMessageInspector()
        {
            isComponentEnabled = true;
        }

        /// <summary>
        /// Initializes a new instance of the ServiceBusMessageInspector class.
        /// </summary>
        /// <param name="isComponentEnabled">This value indicates whether the message inspector is enabled.</param>
        /// <param name="isTrackingEnabled">This value indicates whether tracking is enabled.</param>
        /// <param name="propertiesToPromote">This object defines the properties to promote.</param>
        /// <param name="propertiesToWrite">This object defines the properties to write.</param>
        /// <param name="propertiesToSend">This object defines the properties to send.</param>
        public ServiceBusMessageInspector(bool isComponentEnabled,
                                          bool isTrackingEnabled,
                                          PropertiesElement propertiesToPromote,
                                          PropertiesElement propertiesToWrite,
                                          PropertiesElement propertiesToSend)
        {
            this.isComponentEnabled = isComponentEnabled;
            this.isTrackingEnabled = isTrackingEnabled;

            // Propulate the dictionary that contains the properties to promote
            InitializeDictionary(PropertiesToPromote, propertiesToPromote, propertiesToPromoteDictionary);

            // Propulate the dictionary that contains the properties to write
            InitializeDictionary(PropertiesToWrite, propertiesToWrite, propertiesToWriteDictionary);

            // Propulate the dictionary that contains the properties to send out
            InitializeDictionary(PropertiesToSend, propertiesToSend, propertiesToSendDictionary);
        }
        #endregion

        #region IDispatchMessageInspector Members
        //**********************************
        // IDispatchMessageInspector Members
        //**********************************

        /// <summary>
        /// Called after an inbound message has been received but 
        /// before the message is dispatched to the intended operation.
        /// This method extracts the properties from the BrokeredMessageProperty 
        /// object contained in the WCF message properties and write or promote 
        /// them in the message context.
        /// </summary>
        /// <param name="request">The request message.</param>
        /// <param name="channel">The incoming channel.</param>
        /// <param name="instanceContext">The current service instance.</param>
        /// <returns>The object used to correlate state. This object is passed back in the BeforeSendReply method.</returns>
        public object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext)
        {
            try
            {
                if (!isComponentEnabled)
                {
                    return Guid.NewGuid();
                }

                var propertiesToPromoteList = new List<KeyValuePair<XmlQualifiedName, object>>();
                var propertiesToWriteList = new List<KeyValuePair<XmlQualifiedName, object>>();

                // Retreive the BrokeredMessageProperty from the current operation context
                var incomingProperties = OperationContext.Current.IncomingMessageProperties;
                var brokeredMessageProperty = (BrokeredMessageProperty)incomingProperties[BrokeredMessageProperty.Name];

                if (brokeredMessageProperty != null)
                {
                    // Include BrokeredMessageProperty explicit properties 
                    // in the list of properties to write to the BizTalk message context
                    if (!string.IsNullOrEmpty(brokeredMessageProperty.ContentType))
                    {
                        AddItemToList(ContentType, 
                                      BrokeredMessagePropertySchemaNamespace, 
                                      brokeredMessageProperty.ContentType, 
                                      PropertiesToWrite,
                                      propertiesToWriteList);
                    }
                    if (!string.IsNullOrEmpty(brokeredMessageProperty.CorrelationId))
                    {
                        AddItemToList(CorrelationId, 
                                      BrokeredMessagePropertySchemaNamespace, 
                                      brokeredMessageProperty.CorrelationId,
                                      PropertiesToWrite,
                                      propertiesToWriteList);
                    }
                    AddItemToList(DeliveryCount, 
                                  BrokeredMessagePropertySchemaNamespace, 
                                  brokeredMessageProperty.DeliveryCount,
                                  PropertiesToWrite,
                                  propertiesToWriteList);
                    AddItemToList(EnqueuedTimeUtc, 
                                  BrokeredMessagePropertySchemaNamespace, 
                                  brokeredMessageProperty.EnqueuedTimeUtc,
                                  PropertiesToWrite,
                                  propertiesToWriteList);
                    AddItemToList(ExpiresAtUtc, 
                                  BrokeredMessagePropertySchemaNamespace, 
                                  brokeredMessageProperty.ExpiresAtUtc,
                                  PropertiesToWrite,
                                  propertiesToWriteList);
                    if (!string.IsNullOrEmpty(brokeredMessageProperty.Label))
                    {
                        AddItemToList(Label, 
                                      BrokeredMessagePropertySchemaNamespace, 
                                      brokeredMessageProperty.Label,
                                      PropertiesToWrite,
                                      propertiesToWriteList);
                    }
                    
                    if (!string.IsNullOrEmpty(brokeredMessageProperty.MessageId))
                    {
                        AddItemToList(MessageId, 
                                      BrokeredMessagePropertySchemaNamespace, 
                                      brokeredMessageProperty.MessageId,
                                      PropertiesToWrite,
                                      propertiesToWriteList);
                    }
                    if (!string.IsNullOrEmpty(brokeredMessageProperty.ReplyTo))
                    {
                        AddItemToList(ReplyTo, 
                                      BrokeredMessagePropertySchemaNamespace, 
                                      brokeredMessageProperty.ReplyTo,
                                      PropertiesToWrite,
                                      propertiesToWriteList);
                    }
                    if (!string.IsNullOrEmpty(brokeredMessageProperty.ReplyToSessionId))
                    {
                        AddItemToList(ReplyToSessionId, 
                                      BrokeredMessagePropertySchemaNamespace, 
                                      brokeredMessageProperty.ReplyToSessionId,
                                      PropertiesToWrite,
                                      propertiesToWriteList);
                    }
                    AddItemToList(ScheduledEnqueueTimeUtc, 
                                  BrokeredMessagePropertySchemaNamespace, 
                                  brokeredMessageProperty.ScheduledEnqueueTimeUtc,
                                  PropertiesToWrite,
                                  propertiesToWriteList);
                    AddItemToList(SequenceNumber, 
                                  BrokeredMessagePropertySchemaNamespace, 
                                  brokeredMessageProperty.SequenceNumber,
                                  PropertiesToWrite,
                                  propertiesToWriteList);
                    if (!string.IsNullOrEmpty(brokeredMessageProperty.SessionId))
                    {
                        AddItemToList(SessionId, 
                                      BrokeredMessagePropertySchemaNamespace, 
                                      brokeredMessageProperty.SessionId,
                                      PropertiesToWrite,
                                      propertiesToWriteList);
                    }
                    AddItemToList(TimeToLive, 
                                  BrokeredMessagePropertySchemaNamespace, 
                                  brokeredMessageProperty.TimeToLive.TotalSeconds,
                                  PropertiesToWrite,
                                  propertiesToWriteList);
                    if (!string.IsNullOrEmpty(brokeredMessageProperty.To))
                    {
                        AddItemToList(To, 
                                      BrokeredMessagePropertySchemaNamespace, 
                                      brokeredMessageProperty.To,
                                      PropertiesToWrite,
                                      propertiesToWriteList);
                    }

                    // Promote or write properties indicated in the
                    // configuration of the message inspector.
                    if (brokeredMessageProperty.Properties != null &&
                        brokeredMessageProperty.Properties.Count > 0)
                    {
                        foreach (var property in brokeredMessageProperty.Properties)
                        {
                            if (propertiesToPromoteDictionary.ContainsKey(property.Key))
                            {
                                AddItemToList(property.Key,
                                              propertiesToPromoteDictionary[property.Key],
                                              property.Value,
                                              PropertiesToPromote,
                                              propertiesToPromoteList);
                                continue;
                            }
                            if (propertiesToWriteDictionary.ContainsKey(property.Key))
                            {
                                AddItemToList(property.Key,
                                              propertiesToWriteDictionary[property.Key],
                                              property.Value,
                                              PropertiesToWrite,
                                              propertiesToWriteList);
                            }
                        }
                    }
                }

                // Notify the WCF Adapter the properties to promote
                if (propertiesToPromoteList.Count > 0)
                {
                    request.Properties.Add(PropertiesToPromoteKey, propertiesToPromoteList);
                }

                // Notify the WCF Adapter the properties to write
                if (propertiesToWriteList.Count > 0)
                {
                    request.Properties.Add(PropertiesToWriteKey, propertiesToWriteList);
                }
            }
            catch (Exception ex)
            {
                Trace.WriteLineIf(isTrackingEnabled, string.Format(ExceptionFormat, ex.Message));
                EventLog.WriteEntry(Source, ex.Message, EventLogEntryType.Error);
                throw;
            }
            return Guid.NewGuid();
        }

        /// <summary>
        /// Called after the operation has returned but before the reply message is sent. 
        /// </summary>
        /// <param name="reply">The reply message. This value is null if the operation is one way.</param>
        /// <param name="correlationState">The correlation object returned from the AfterReceiveRequest method.</param>
        public void BeforeSendReply(ref Message reply, object correlationState)
        {
        }
        #endregion

        #region IClientMessageInspector Members
        //**********************************
        // IClientMessageInspector Members
        //**********************************

        /// <summary>
        /// Enables inspection or modification of a message after a reply message is received but prior to passing it back to the client application. 
        /// </summary>
        /// <param name="reply">The message to be transformed into types and handed back to the client application.</param>
        /// <param name="correlationState">Correlation state data.</param>
        public void AfterReceiveReply(ref Message reply, object correlationState)
        {
        }

        /// <summary>
        /// Enables inspection or modification of a message before a request message is sent to a service.
        /// </summary>
        /// <param name="message">The message to be sent to the service.</param>
        /// <param name="channel">The client object channel.</param>
        /// <returns>The object that is returned as the correlationState argument of the AfterReceiveReply method.</returns>
        public object BeforeSendRequest(ref Message message, IClientChannel channel)
        {
            try
            {
                if (!isComponentEnabled)
                {
                    return message.Headers.Action;
                }

                // Create a new BrokeredMessageProperty object
                var brokeredMessageProperty = new BrokeredMessageProperty();

                // Adds properties to the BrokeredMessageProperty object
                foreach (var property in message.Properties)
                {
                    if (string.Compare(property.Key, ContentTypeProperty, StringComparison.OrdinalIgnoreCase) == 0)
                    {
                        brokeredMessageProperty.ContentType = message.Properties[ContentTypeProperty] as string;
                        Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList,
                                                                           PropertiesToSend,
                                                                           ContentType,
                                                                           BrokeredMessagePropertySchemaNamespace,
                                                                           brokeredMessageProperty.ContentType));
                        continue;
                    }
                    if (string.Compare(property.Key, CorrelationIdProperty, StringComparison.OrdinalIgnoreCase) == 0)
                    {
                        brokeredMessageProperty.CorrelationId = message.Properties[CorrelationIdProperty] as string;
                        Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList,
                                                                           PropertiesToSend,
                                                                           CorrelationId,
                                                                           BrokeredMessagePropertySchemaNamespace,
                                                                           brokeredMessageProperty.CorrelationId));
                        continue;
                    }
                    if (string.Compare(property.Key, LabelProperty, StringComparison.OrdinalIgnoreCase) == 0)
                    {
                        brokeredMessageProperty.Label = message.Properties[LabelProperty] as string;
                        Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList,
                                                                           PropertiesToSend,
                                                                           Label,
                                                                           BrokeredMessagePropertySchemaNamespace,
                                                                           brokeredMessageProperty.Label));
                        continue;
                    }
                    if (string.Compare(property.Key, ReplyToProperty, StringComparison.OrdinalIgnoreCase) == 0)
                    {
                        brokeredMessageProperty.ReplyTo = message.Properties[ReplyToProperty] as string;
                        Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList,
                                                                           PropertiesToSend,
                                                                           ReplyTo,
                                                                           BrokeredMessagePropertySchemaNamespace,
                                                                           brokeredMessageProperty.ReplyTo));
                        continue;
                    }
                    if (string.Compare(property.Key, ReplyToSessionIdProperty, StringComparison.OrdinalIgnoreCase) == 0)
                    {
                        brokeredMessageProperty.ReplyToSessionId = message.Properties[ReplyToSessionIdProperty] as string;
                        Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList,
                                                                           PropertiesToSend,
                                                                           ReplyToSessionId,
                                                                           BrokeredMessagePropertySchemaNamespace,
                                                                           brokeredMessageProperty.ReplyToSessionId));
                        continue;
                    }
                    if (string.Compare(property.Key, SessionIdProperty, StringComparison.OrdinalIgnoreCase) == 0)
                    {
                        brokeredMessageProperty.SessionId = message.Properties[SessionIdProperty] as string;
                        Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList,
                                                                           PropertiesToSend,
                                                                           SessionId,
                                                                           BrokeredMessagePropertySchemaNamespace,
                                                                           brokeredMessageProperty.SessionId));
                        continue;
                    }
                    if (string.Compare(property.Key, ToProperty, StringComparison.OrdinalIgnoreCase) == 0)
                    {
                        brokeredMessageProperty.To = message.Properties[ToProperty] as string;
                        Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList,
                                                                           PropertiesToSend,
                                                                           To, 
                                                                           BrokeredMessagePropertySchemaNamespace,
                                                                           brokeredMessageProperty.To));
                        continue;
                    }

                    // Include properties indicated in the
                    // configuration of the message inspector.
                    var parts = property.Key.Split('#');
                    if (parts.Length == 2)
                    {
                        if (propertiesToSendDictionary.ContainsKey(parts[1]) &&
                            propertiesToSendDictionary[parts[1]] == parts[0])
                        {
                            brokeredMessageProperty.Properties[parts[1]] = property.Value;
                            Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList, 
                                                                               PropertiesToSend, 
                                                                               parts[0], 
                                                                               parts[1],
                                                                               property.Value));
                        }
                    }
                }

                // Add the BrokeredMessageProperty to the WCF message properties
                message.Properties[BrokeredMessageProperty.Name] = brokeredMessageProperty;
            }
            catch (Exception ex)
            {
                Trace.WriteLineIf(isTrackingEnabled, string.Format(ExceptionFormat, ex.Message));
                EventLog.WriteEntry(Source, ex.Message, EventLogEntryType.Error);
                throw;
            }
            return Guid.NewGuid();
        }
        #endregion

        #region Private Methods
        //******************
        // Private Methods
        //******************

        /// <summary>
        /// Initialize the dictionary passed as second parameter 
        /// with the information read from the first argument.
        /// In particular, the Name and Namespace properties of the 
        /// PropertiesElement object contain respectively 
        /// the comma-separated list of property names and namespaces
        /// of a set of properties. The name of each property will
        /// become the key in the dictionary, whereas the namespace
        /// will become the corresponding value.
        /// </summary>
        /// <param name="dictionaryName">The name of the dictionary.</param>
        /// <param name="properties">A PropertiesElement object.</param>
        /// <param name="dictionary">The dictionary to populate.</param>
        private void InitializeDictionary(string dictionaryName, PropertiesElement properties, Dictionary<string, string> dictionary)
        {
            if (properties == null)
            {
                Trace.WriteLineIf(isTrackingEnabled, PropertiesElementCannotBeNull);
                throw new ApplicationException(PropertiesElementCannotBeNull);
            }
            if (!string.IsNullOrEmpty(properties.Name) &&
                !string.IsNullOrEmpty(properties.Namespace))
            {
                var nameArray = properties.Name.Split(new[] { ',', ';', '|' });
                var namespaceArray = properties.Namespace.Split(new[] { ',', ';', '|' });
                if (namespaceArray.Length == nameArray.Length)
                {
                    if (nameArray.Length > 0)
                    {
                        for (int i = 0; i < nameArray.Length; i++)
                        {
                            dictionary[nameArray[i]] = namespaceArray[i];
                        }
                    }
                }
                else
                {
                    Trace.WriteLineIf(isTrackingEnabled, string.Format(NameAndNamespaceNumberAreDifferent, dictionaryName));
                    throw new ApplicationException(string.Format(NameAndNamespaceNumberAreDifferent, dictionaryName));
                }
            }
        }

        /// <summary>
        /// Adds a new property to the specified list.
        /// </summary>
        /// <param name="name">The name of a property.</param>
        /// <param name="ns">The namespace of a property.</param>
        /// <param name="value">The value of a property.</param>
        /// <param name="listName">The name of the list.</param>
        /// <param name="list">The list to add the property to.</param>
        private void AddItemToList(string name, 
                                   string ns, 
                                   object value, 
                                   string listName, 
                                   List<KeyValuePair<XmlQualifiedName, object>> list)
        {
            list.Add(new KeyValuePair<XmlQualifiedName, object>(new XmlQualifiedName(name, ns), value));
            Trace.WriteLineIf(isTrackingEnabled, string.Format(PropertyAddedToList, 
                                                               listName, 
                                                               name, 
                                                               ns, 
                                                               value));
        }
        #endregion
    }
}