Sync101 with Custom Filtering

Shows how to track custom filters and how to use a standard custom provider to send changes from a filter-tracking replica to two different filtered replicas.

C# (58.2 KB)
 
 
 
 
 
(0)
1,322 times
Add to favorites
5/13/2011
E-mail Twitter del.icio.us Digg Facebook
using System;
using System.Collections.Generic;
using Microsoft.Synchronization;
using System.Globalization;

namespace CustomFilterSyncSample
{
    public class FilterTrackingFullReplicaProvider<T>
        : BaseProvider<T>,
          ISupportFilteredSync,
          IFilterTrackingProvider,
          IFilterTrackingNotifyingChangeApplierTarget where T:ICloneable
    {
        public FilterTrackingFullReplicaProvider(
            Guid replicaId, 
            IDataItemOperations<T> iUpdatable)
            : base(replicaId, iUpdatable)       
        {
            _filterKeyMap = new FilterKeyMap(IdFormatGroup);
            _trackedCustomFilters = new SortedList<uint, IBaseFilter<T>>();
            _filterForgottenKnowledges = new SortedList<uint, ForgottenKnowledge>();
        }

        #region KnowledgeSyncProvider
        //
        // KnowledgeSyncProvider
        //

        //
        // KSP methods
        //

        public override void BeginSession(
            SyncProviderPosition position, 
            SyncSessionContext syncSessionContext)
        {
            base.BeginSession(position, syncSessionContext); 
            _filterAccepted = false;
            SessionCount = 0;
        }

        public override void EndSession(
            SyncSessionContext syncSessionContext)
        {
            base.EndSession(syncSessionContext);
            _acceptedFilterIndex = 0;
            _filterAccepted = false;
        }

        /// <summary>
        /// Get the changes from the source based on the desctination's knowledge
        /// </summary>
        /// <param name="batchSize"></param>
        /// <param name="requestKnowledge"></param>
        /// <param name="changeDataRetriever"></param>
        /// <returns></returns>
        public override ChangeBatch GetChangeBatch(
            uint batchSize,
            SyncKnowledge destinationKnowledge,
            out object changeDataRetriever)
        {
            SyncKnowledge mappedRequestKnowledge = MyKnowledge.MapRemoteKnowledgeToLocal(destinationKnowledge);
            ChangeBatch changeBatch;

            //
            // If a filter is accepted other replica has same filter
            // else other replica is a full replica
            //
            if (_filterAccepted)
            {
                changeBatch = new ChangeBatch(
                                    IdFormatGroup,
                                    destinationKnowledge,
                                    MyForgottenKnowledge,
                                    new CustomFilterInfo(IdFormatGroup, _trackedCustomFilters[_acceptedFilterIndex]));
            }
            else
            {
                changeBatch = new ChangeBatch(
                                    IdFormatGroup,
                                    destinationKnowledge,
                                    MyForgottenKnowledge);
            }

            // Filter key map needs to be specified before any groups are started
            changeBatch.FilterKeyMap = _filterKeyMap;

            if (SessionCount == 0)
            {
                ItemDataEnumerator = MyStore.GetEnumerator();
                changeBatch.BeginOrderedGroup(IdFormatGroup.ItemIdFormat.Zero);
            }
            else
            {
                changeBatch.BeginOrderedGroup(ItemDataEnumerator.Current.Key);
            }

            // Filter forgotten knowlege needs to be specified after group has started since it is per group
            foreach (uint filterIndex in _filterForgottenKnowledges.Keys)
            {
                changeBatch.SetFilterForgottenKnowledge(filterIndex, _filterForgottenKnowledges[filterIndex]);
            }

            uint count = 0;
            while (count < batchSize && ItemDataEnumerator.MoveNext())
            {
                SyncableItem<T> item = ItemDataEnumerator.Current.Value;
                ItemChange itemChange = null;

                //
                // If a filter is accepted only process changes to items that
                // are in the filter or had been in the filter recently
                //
                if (!_filterAccepted ||
                    (item.FilterChanges[_acceptedFilterIndex].IsMoveIn ||
                     item.FilterChanges[_acceptedFilterIndex].MoveVersion.TickCount > 0))
                {
                    if (!item.IsDeleted)
                    {
                        //
                        // Include all change units if item's creation version is not known,
                        // or all change units required marker is present in the request knowledge
                        //
                        if (!mappedRequestKnowledge.Contains(
                                                             ReplicaId,
                                                             item.GlobalId,
                                                             item.CurrentVersion))
                        {

                            ChangeKind changeKind = ChangeKind.Update;

                            //
                            // Items that are outside the filter will IsMoveIn = false
                            //
                            if (_filterAccepted && !item.FilterChanges[_acceptedFilterIndex].IsMoveIn)
                            {
                                changeKind = ChangeKind.Ghost;
                            }

                            itemChange = new ItemChange(
                                                IdFormatGroup,
                                                ReplicaId,
                                                item.GlobalId,
                                                changeKind,
                                                item.CreationVersion,
                                                item.CurrentVersion);
                        }
                    
                    }
                    else
                    {
                        if (!mappedRequestKnowledge.Contains(ReplicaId, item.GlobalId, item.CurrentVersion))
                        {
                            itemChange = new ItemChange(
                                                IdFormatGroup,
                                                ReplicaId,
                                                item.GlobalId,
                                                ChangeKind.Deleted,
                                                item.CreationVersion,
                                                item.CurrentVersion);
                        }
                    }
                }

                if (itemChange != null)
                {
                    count++;
                    changeBatch.AddChange(itemChange);

                    foreach (uint filterIndex in item.FilterChanges.Keys)
                    {
                        itemChange.AddFilterChange(filterIndex, item.FilterChanges[filterIndex]);
                    }
                }
                SessionCount++;
            }

            if (SessionCount != MyStore.Count)
            {
                changeBatch.EndOrderedGroup(ItemDataEnumerator.Current.Key, MyKnowledge);
            }
            else
            {
                changeBatch.SetLastBatch();
                changeBatch.EndOrderedGroup(IdFormatGroup.ItemIdFormat.Infinity, MyKnowledge);
            }

            changeDataRetriever = this;
            return changeBatch;
        }

        /// <summary>
        /// Save Item
        /// </summary>
        /// <param name="saveChangeAction"></param>
        /// <param name="change"></param>
        /// <param name="context"></param>
        public override void SaveItemChange(
            SaveChangeAction saveChangeAction, 
            ItemChange change, 
            SaveChangeContext context)
        {
            SyncableItem<T> item = null;
            MyStore.TryGetValue(change.ItemId,out item);
            SortedList<uint, FilterChange> destFilterChanges = null;
            if (item != null)
            {
                destFilterChanges = item.FilterChanges;
            }

            switch (saveChangeAction)
            {
                case SaveChangeAction.Create:
                case SaveChangeAction.UpdateVersionOnly:
                case SaveChangeAction.UpdateVersionAndData:
                case SaveChangeAction.UpdateVersionAndMergeData:
                    base.SaveItemChanges(saveChangeAction, change, context);
                    break;
                case SaveChangeAction.DeleteAndStoreTombstone:                    
                    item.DeleteItem(change.ChangeVersion);
                    item.ChangeKind = ChangeKind.Deleted;
                    break;

                default:
                    throw new InvalidOperationException("Unexpected saveChangeAction " + saveChangeAction.ToString());
            }

            SaveItemFilterChange(change, destFilterChanges);
        }
      
        #endregion KnowledgeSyncProvider

        #region ISupportFilteredSync methods        
        /// <summary>
        /// ISupportFilteredSync, implemented by the source / full replica provider
        /// </summary>
        /// <param name="requestedFilter"></param>
        /// <param name="filteringType"></param>
        /// <returns></returns>
        public bool TryAddFilter(
            object filter, 
            FilteringType filteringType)
        {
            _filterAccepted = false;

            if (_trackedCustomFilters != null)
            {
                foreach (uint index in _trackedCustomFilters.Keys)
                {
                    if (_trackedCustomFilters[index].IsIdentical((ISyncFilter)filter))
                    {
                        _filterAccepted = true;
                        _acceptedFilterIndex = index;
                        _filteringType = filteringType;
                        break;
                    }
                }
            }

            return _filterAccepted;
        }
        #endregion ISupportFilteredSync methods

        #region IFilterTrackingProvider
        public void SpecifyTrackedFilters(
            RequestTrackedFilterCallback filterTrackingRequestCallback)
        {            
            if (_trackedCustomFilters != null)
            {
                foreach (IBaseFilter<T> baseFilter in _trackedCustomFilters.Values)
                {
                    filterTrackingRequestCallback(baseFilter);
                }
            }            
        }
        
        /// <summary>
        /// Called on the source provider to add a tracked filter
        /// </summary>
        /// <param name="filter"></param>
        /// <returns></returns>
        public bool TryAddTrackedFilter(
            ISyncFilter filter)
        {
            bool filterFound = false;

            if (_trackedCustomFilters != null)
            {
                foreach (IBaseFilter<T> baseFilter in _trackedCustomFilters.Values)
                {
                    if (baseFilter.IsIdentical(filter))
                    {
                        filterFound = true;
                    }
                }
            }

            if (!filterFound)
            {
                StartTrackingFilter((IBaseFilter<T>) filter);
            }            

            // We always accept every filter
            return true;
        }
        #endregion IFilterTrackingProvider        

        #region IFilterTrackingNotifyingChangeApplierTarget
        public FilterKeyMap FilterKeyMap
        {
            get
            {
                return _filterKeyMap;
            }
        }

        public ForgottenKnowledge GetFilterForgottenKnowledge(
            uint filterIndex)
        {
            return _filterForgottenKnowledges[filterIndex];
        }

        public void SaveKnowledgeWithFilterForgottenKnowledge(
                        SyncKnowledge syncKnowledge, 
                        ForgottenKnowledge forgottenKnowledge, 
                        ForgottenKnowledge[] filterForgottenKnowledge)
        {
            base.SaveKnowledges(syncKnowledge, forgottenKnowledge);
        }        
        #endregion  IFilterTrackingNotifyingChangeApplierTarget

        #region Other public methods
        public SortedList<uint, IBaseFilter<T>> TrackedFilters
        {
            get
            {
                return _trackedCustomFilters;
            }
        }

        public void StartTrackingFilter(IBaseFilter<T> filter)
        {
            if (filter == null)
            {
                throw new ArgumentException("Null filter specified");
            }

            FilterCount++;
            
            // Add filter            
            TrackedFilters[FilterCount - 1] = filter;
           
            
            // Set filter forgotten knowledge to current knowledge           
            _filterForgottenKnowledges[FilterCount - 1] = new ForgottenKnowledge(IdFormatGroup, MyKnowledge);
            _filterForgottenKnowledges[FilterCount - 1].Combine(MyKnowledge);

            _filterKeyMap.AddFilter(filter);

            
            // Now go through all the items in the store and mark them as they have never
            // been in the filter before            
            SortedDictionary<SyncId, SyncableItem<T>>.Enumerator localEnumerator = MyStore.GetEnumerator();

            while (localEnumerator.MoveNext())
            {
                SyncableItem<T> item = localEnumerator.Current.Value;
                
                bool isInTheFilter = item.Item != null && filter.IsInFilter(item.Item);
                item.FilterChanges[FilterCount - 1] = new FilterChange(isInTheFilter /* IsMoveIn */, SyncVersion.UnknownVersion);
            }
        }

        public void UpdateFilterChangeInfos(
            SyncableItem<T> syncableItem, 
            SyncVersion updateVersion)
        { 
            foreach (uint filterIndex in _trackedCustomFilters.Keys)
            {
                bool isInTheFilter = syncableItem.Item != null && _trackedCustomFilters[filterIndex].IsInFilter(syncableItem.Item);
                if (!syncableItem.FilterChanges.ContainsKey(filterIndex))
                {
                    syncableItem.FilterChanges[filterIndex] = new FilterChange(isInTheFilter, SyncVersion.UnknownVersion);
                }
                else
                {
                    // Move change
                    if (isInTheFilter != syncableItem.FilterChanges[filterIndex].IsMoveIn)
                    {
                        syncableItem.FilterChanges[filterIndex] = new FilterChange(isInTheFilter, updateVersion);
                    }  
                }
            }
        }

        public void SaveItemFilterChange(
            ItemChange change, 
            SortedList<uint, 
            FilterChange> destFilterChanges)
        {
            SyncableItem<T> item = MyStore.GetSyncableItem(change.ItemId);

            // We just have only 1 filter for now
            foreach (uint filterIndex in TrackedFilters.Keys)
            {
                FilterChange soureFilterChange = null;
                FilterTrackingStatus filterTrackStatus;
                change.GetFilterChange(filterIndex, out filterTrackStatus, out soureFilterChange);

                FilterChange destFilterChange = null;
                if (destFilterChanges != null)
                {
                    destFilterChange = destFilterChanges[filterIndex];
                }

                if (soureFilterChange != null)
                {
                    SyncVersion sourceMoveVersion = soureFilterChange.MoveVersion;
                    if (destFilterChange == null)
                    {
                        if (_trackedCustomFilters[_acceptedFilterIndex].IsInFilter(item.Item) != soureFilterChange.IsMoveIn)
                        {
                            item.FilterChanges[filterIndex] = new FilterChange(TrackedFilters[filterIndex].IsInFilter(item.Item),
                                                                           new SyncVersion(0, MyStore.TickCount));
                        }
                        else
                        {
                            item.FilterChanges[filterIndex] = new FilterChange(soureFilterChange.IsMoveIn, soureFilterChange.MoveVersion);
                        }
                    }
                    else
                    {
                        bool isConflictFilterChanges = false;
                        if (soureFilterChange.IsMoveIn != destFilterChange.IsMoveIn)
                        {
                            isConflictFilterChanges = true;
                        }

                        bool isSourceMoveVersionObsolete = true;
                        
                        if (!MyKnowledge.Contains(ReplicaId, item.GlobalId, item.CurrentVersion))
                        {
                            isSourceMoveVersionObsolete = false;
                        }                        

                        if (!isSourceMoveVersionObsolete)
                        {
                            if (!isConflictFilterChanges)
                            {
                                SyncKnowledge mappedSourceItemMWKnowledge = MyKnowledge.MapRemoteKnowledgeToLocal(change.MadeWithKnowledge);
                                SyncVersion destMoveVersion = destFilterChange.MoveVersion;
                                
                                if (!mappedSourceItemMWKnowledge.Contains(ReplicaId, item.GlobalId, item.CurrentVersion))
                                {
                                    isConflictFilterChanges = true;
                                    break;
                                }                              
                            }
                        }

                        // No conflict on filter change move version
                        if (!isConflictFilterChanges)
                        {
                            if (!isSourceMoveVersionObsolete)
                            {
                                item.FilterChanges[filterIndex] = new FilterChange(soureFilterChange.IsMoveIn, soureFilterChange.MoveVersion);
                            }
                        }
                        else // conflict on filter changes, needs to resolve conflict by assign new move version to filter change
                        {
                            item.FilterChanges[filterIndex] = new FilterChange(TrackedFilters[filterIndex].IsInFilter(item.Item),
                                                                               new SyncVersion(0, MyStore.TickCount));
                        }
                    }
                }
                else
                {
                    if (destFilterChange == null || destFilterChange.IsMoveIn != _trackedCustomFilters[filterIndex].IsInFilter(item.Item))
                    {
                        item.FilterChanges[filterIndex] = new FilterChange(TrackedFilters[filterIndex].IsInFilter(item.Item),
                                                                           new SyncVersion(0, MyStore.TickCount));
                    }
                }
            }
        }
                
        public override void PrepareForSync()
        {
            base.PrepareForSync();

            SortedDictionary<SyncId, SyncableItem<T>>.Enumerator itemEnumerator = MyStore.GetEnumerator();

            while (itemEnumerator.MoveNext())
            {                
                foreach(uint filterIndex in TrackedFilters.Keys)
                {
                    UpdateFilterChangeInfos(itemEnumerator.Current.Value, new SyncVersion(0, MyStore.TickCount));
                }
            }
        }
         
        #endregion Other public methods
        
        #region Private fields

        //
        // Member variables for filtered replica
        FilterKeyMap                            _filterKeyMap;
        SortedList<uint, IBaseFilter<T>>        _trackedCustomFilters;
        SortedList<uint, ForgottenKnowledge>    _filterForgottenKnowledges;

        //
        // Member variables for change enumeration
        //
        bool                    _filterAccepted;
        uint                    _acceptedFilterIndex;
        FilteringType           _filteringType;
        #endregion Private fields
    }  
}