Simple StreamInsight Application

This shows a simple StreamInsight application that can leverage the different hosting models and features for StreamInsight. It is built using input and output adapters from the StreamInsight Samples and a custom host application.

C# (113.5 KB)
 
 
 
 
 
4 Star
(1)
1,490 times
Add to favorites
3/9/2012
E-mail Twitter del.icio.us Digg Facebook
//*********************************************************
//
//  Copyright (c) J Sawyer. All rights reserved.
//  This code is licensed under the Apache 2.0 License.
//  THIS CODE IS PROVIDED *AS IS* WITHOUT WARRANTY OR
//  CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED,
//  INCLUDING, WITHOUT LIMITATION, ANY IMPLIED WARRANTIES
//  OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR
//  PURPOSE, MERCHANTABILITY, OR NON-INFRINGEMENT.
//
//*********************************************************

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.Text;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using StreamInsight.Samples.Adapters.AsyncCsv;
using StreamInsight.Samples.Adapters.DataGenerator;

namespace DevBiker.SimpleSIApp
{
    class Program
    {

        static void Main(string[] args)
        {
            if (AppSettings.Current.RunInProcess)
            {
                //If in process, we *create* the StreamInsight server. 
                Console.WriteLine("Creating in process StreamInsight server.");

                // Create the in proc server. 
                // Can't pass null for the metadata provider config.

                using (Server cepServer = AppSettings.Current.UseMetadata ?
                    Server.Create(AppSettings.Current.StreamInsightInstanceName, GetMetadataService()) :
                    Server.Create(AppSettings.Current.StreamInsightInstanceName))
                {


                    if (!String.IsNullOrWhiteSpace(AppSettings.Current.ManagementServiceUrl))
                    {
                        //Make sure that you are running as admin or this will cause an exception
                        try
                        {
                            ServiceHost managementServiceHost = new ServiceHost(cepServer.CreateManagementService());
                            //Adding the service host. 
                            //This allows remote clients to access this application, including the Query Debugger.
                            managementServiceHost.AddServiceEndpoint(
                                typeof(IManagementService),
                                new WSHttpBinding(SecurityMode.Message), AppSettings.Current.ManagementServiceUrl);
                            managementServiceHost.Open();
                            Console.WriteLine("Management service started.");
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine("Management service could not be started: " + ex.Message);
                        }
                    }


                    StartAppAndQueries(cepServer);
                    Console.WriteLine("Startup completed.");
                    Console.WriteLine("Enter D to delete the application");

                    if (Console.ReadLine() == "D")
                    {
                        DeleteApplication(cepServer);
                    }
                }

            }
            else
            {
                //If out of process, we *connect* to the existing StreamInsight server.
                //This can be running the built-in StreamInsight server (add @ install time)
                //or it can be a custom application that has published the management service. 
                var endpointAddress = new System.ServiceModel.EndpointAddress(
                    @"http://localhost/StreamInsight/" + AppSettings.Current.StreamInsightInstanceName);
                Console.WriteLine("Connecting to StreamInsight server");
                using (Server cepServer = Server.Connect(endpointAddress))
                {
                    StartAppAndQueries(cepServer);
                    Console.WriteLine("Startup completed.");
                    Console.WriteLine("Enter D to delete the application");

                    if (Console.ReadLine() == "D")
                    {
                        DeleteApplication(cepServer);
                    }
                }
            }

        }

        /// <summary>
        /// Deletes the application and all related queries. 
        /// </summary>
        /// <param name="cepServer"></param>
        private static void DeleteApplication(Server cepServer)
        {
            Console.WriteLine("Stopping Queries");
            Application cepApplication = cepServer.Applications[AppSettings.Current.StreamInsightAppName];
            foreach (var query in cepApplication.Queries)
            {
                query.Value.Stop();
            }
            Console.WriteLine("Deleting application");
            cepServer.Applications[AppSettings.Current.StreamInsightAppName].Delete();
        }

        private static SqlCeMetadataProviderConfiguration GetMetadataService()
        {
            SqlCeMetadataProviderConfiguration metadataServiceProvider =
                new SqlCeMetadataProviderConfiguration()
            {
                CreateDataSourceIfMissing = true,
                DataSource = "SimpleSIApp.sdf"
            };

            return metadataServiceProvider;

        }

        private static void StartAppAndQueries(Server cepServer)
        {
            Application cepApplication = GetApplication(cepServer);

            //Create the settings for the input adapter. 
            GeneratorConfig config = new GeneratorConfig()
                                        {
                                            CtiFrequency = 500,
                                            DeviceCount = 50,
                                            EventInterval = 10,
                                            EventIntervalVariance = 5,
                                            MaxValue = 100
                                        };


            //Check for the query to see if it exists.
            if (!cepApplication.Queries.ContainsKey("initialQuery"))
            {

                var initialStream = CepStream<GeneratedEvent>.Create(cepApplication,
                                                     "generatedStream", typeof(GeneratorFactory),
                                                     config, EventShape.Point);

                //**** For something different, comment out the lines above and uncomment the lines below. 
                //**** The code below sets the AdvanceTimeSettings when the query is created. 
                //**** This overrides the CTI's generated by the input adapter. 

                //var initialStream = CepStream<GeneratedEvent>.Create(cepApplication,
                //                                     "generatedStream", typeof(GeneratorFactory),
                //                                     config, EventShape.Point,
                //                                     new AdvanceTimeSettings(
                //                                         new AdvanceTimeGenerationSettings(
                //                                             TimeSpan.FromMilliseconds(10), 
                //                                             TimeSpan.FromSeconds(0), true), null, AdvanceTimePolicy.Adjust));

                var csvConfig = new CsvOutputConfig()
                                    {
                                        CultureName = System.Globalization.CultureInfo.CurrentUICulture.Name,
                                        Delimiter = new string[] { "," },
                                        Fields = new List<string> { "DeviceId", "Value" },
                                        OutputFileName = BuildOutputFilePath("initialQuery")
                                    };

                Console.WriteLine("Creating initial query.");

                Query initialQuery = CreateAndStartQuery<GeneratedEvent>(
                    initialStream, "initialQuery", "Initial query from generator",
                    EventShape.Point, csvConfig, cepApplication);

                //Taking this query and converting to a stream allows us to build on it further. 
                //This is called Dynamic Query Composition (DQC)
                var sourceStream = initialQuery.ToStream<GeneratedEvent>("sourceStream");


                //Now, let's do a simple hopping window.
                //We're cheating a little by assuming that the aggregate query won't be there 
                //if the source query is not. But it should be cool.
                var hoppingWindowStream = from s in sourceStream
                                          group s by s.DeviceId into aggregateGroup
                                          from item in aggregateGroup.HoppingWindow(
                                            TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2),
                                            HoppingWindowOutputPolicy.ClipToWindowEnd)
                                          select new AggregateItem
                                          {
                                              DeviceId = aggregateGroup.Key,
                                              Average = item.Avg(e => e.Value),
                                              Count = item.Count(),
                                              Sum = item.Sum(e => e.Value)
                                          };

                csvConfig.OutputFileName = BuildOutputFilePath("hoppingWindowAggegate");
                csvConfig.Fields = new List<string> { "DeviceId", "Average", "Count", "Sum" };

                //Create the query and send to output adapter.
                //For fun, change the Event Shape to EventShape.Point
                Query hoppingWindowQuery = CreateAndStartQuery<AggregateItem>(hoppingWindowStream,
                    "hoppingWindowAggregate", "Hopping aggregate query from generator",
                    EventShape.Interval, csvConfig, cepApplication);


                //Now, let's do a snapshot - just to show the difference in the output
                //We'll extend the event lifetimes to make it sure we have multiple events in the snapshot.
                var snapshotWindowStream = from s in sourceStream.AlterEventDuration(e => TimeSpan.FromSeconds(10))
                                           group s by s.DeviceId into snapshotGroup
                                           from item in snapshotGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                                           select new AggregateItem
                                           {
                                               DeviceId = snapshotGroup.Key,
                                               Average = item.Avg(e => e.Value),
                                               Count = item.Count(),
                                               Sum = item.Sum(e => e.Value)
                                           };

                csvConfig.OutputFileName = BuildOutputFilePath("snapshotWindowQuery");

                Query snapshotWindowQuery = CreateAndStartQuery<AggregateItem>(
                    snapshotWindowStream, "snapshotWindowQuery", "Snapshot window query from generator",
                    EventShape.Interval, csvConfig, cepApplication);

            }
            else
            {
                //Just check to make sure that all queries are running. 
                //If using metadata, queries will be created but not started so we'll start them now. 
                foreach (Query query in cepApplication.Queries.Values)
                {
                    if (query.GetQueryState() != QueryState.Running)
                    {
                        query.Start();
                    }
                }
            }

        }

        /// <summary>
        /// Gets/Creates the Cep Application
        /// </summary>
        /// <param name="cepServer">Existing Server instance</param>
        /// <returns>The Cep Application</returns>
        /// <remarks>
        /// The application object is created or fetched.
        /// The application object may already exist if:
        ///     - You are connecting to an out-of-process StreamInsight instance
        ///     - You are using the Sql CE Metadata provider.
        /// </remarks>
        private static Application GetApplication(Server cepServer)
        {

            if (!cepServer.Applications.ContainsKey(AppSettings.Current.StreamInsightAppName))
            {
                Console.WriteLine("Creating new Cep Application");
                return cepServer.CreateApplication(AppSettings.Current.StreamInsightAppName);
            }
            else
            {
                Console.WriteLine("Connecting to existing Cep Application");
                return cepServer.Applications[AppSettings.Current.StreamInsightAppName];
            }

        }

        /// <summary>
        /// Creates a query from a stream and starts it bound to the CSV Output Adapter.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="sourceStream"></param>
        /// <param name="queryName"></param>
        /// <param name="queryDescription"></param>
        /// <param name="eventShape"></param>
        /// <param name="outputConfig"></param>
        /// <param name="cepApplication"></param>
        /// <returns></returns>
        /// <remarks>If the query already exists, it is started as needed and returned.</remarks>
        private static Query CreateAndStartQuery<T>(CepStream<T> sourceStream, string queryName, string queryDescription, EventShape eventShape,
            CsvOutputConfig outputConfig, Application cepApplication)
        {
            Query query;
            if (!cepApplication.Queries.ContainsKey(queryName))
            {
                query = sourceStream.ToQuery(cepApplication, queryName, queryDescription,
                                             typeof(CsvOutputFactory), outputConfig, EventShape.Point,
                                             StreamEventOrder.FullyOrdered);
            }
            else
            {
                query = cepApplication.Queries[queryName];
            }
            if (query.GetQueryState() != QueryState.Running)
            {
                //Query is not running. Start.
                query.Start();
                Console.WriteLine("Initial query started.");
            }
            return query;

        }

        private static string BuildOutputFilePath(string queryName)
        {
            return System.IO.Path.Combine(AppSettings.Current.OutputFolder,
                queryName + DateTime.Now.ToString("yyMMdd-hhmmss") + ".csv");
        }

    }
}