Sync101 with Custom Filtering

Shows how to track custom filters and how to use a standard custom provider to send changes from a filter-tracking replica to two different filtered replicas.

C# (58.2 KB)
 
 
 
 
 
(0)
1,322 times
Add to favorites
5/13/2011
E-mail Twitter del.icio.us Digg Facebook
using System;
using System.Collections.Generic;
using Microsoft.Synchronization;


namespace CustomFilterSyncSample
{
    public class FilteredReplicaProvider<T>
        : BaseProvider<T>,
          ISupportFilteredSync,
          IRequestFilteredSync,
          IFilteredReplicaNotifyingChangeApplierTarget where T:ICloneable
    {
        public FilteredReplicaProvider(
            Guid replicaId, IDataItemOperations<T> iUpdatable, 
            IBaseFilter<T> filter) : base(replicaId, iUpdatable)       
        {
            _customFilter   = filter;
            FilterCount = 1;
            _filterKeyMap = new FilterKeyMap(IdFormatGroup);
            _filterKeyMap.AddFilter(filter);
            _filterForgottenKnowledge = new ForgottenKnowledge(IdFormatGroup, MyKnowledge);
        }

        #region KnowledgeSyncProvider
        //
        // KnowledgeSyncProvider
        //

        public override void BeginSession(
            SyncProviderPosition position, 
            SyncSessionContext syncSessionContext)
        {
            SessionCount = 0;
            base.BeginSession(position, syncSessionContext);
        }

        public override void EndSession(SyncSessionContext syncSessionContext)
        {
            base.EndSession(syncSessionContext);

            _filterAccepted = false;
        }

        public override ChangeBatch GetChangeBatch(
            uint batchSize,
            SyncKnowledge destinationKnowledge,
            out object changeDataRetriever)
        {
            SyncKnowledge mappedRequestKnowledge = MyKnowledge.MapRemoteKnowledgeToLocal(destinationKnowledge);
            ChangeBatch changeBatch;

            //
            // If a filter is accepted other replica has same filter
            // else other replica is a full replica
            //
            if (_filterAccepted)
            {
                CustomFilterInfo customFilterInfo = new CustomFilterInfo(IdFormatGroup, _customFilter);

                changeBatch = new ChangeBatch(
                                    IdFormatGroup,
                                    destinationKnowledge,
                                    MyForgottenKnowledge,
                                    customFilterInfo);
            }
            else
            {
                changeBatch = new ChangeBatch(
                                    IdFormatGroup,
                                    destinationKnowledge,
                                    MyForgottenKnowledge);
            }

            // Filter key map needs to be specified before any groups are started
            changeBatch.FilterKeyMap = _filterKeyMap;

            if (SessionCount == 0)
            {
                ItemDataEnumerator = MyStore.GetEnumerator();
                changeBatch.BeginOrderedGroup(IdFormatGroup.ItemIdFormat.Zero);
            }
            else
            {
                changeBatch.BeginOrderedGroup(ItemDataEnumerator.Current.Key);
            }

            // Filter forgotten knowlege needs to be specified after group has started since it is per group
            changeBatch.SetFilterForgottenKnowledge(0, _filterForgottenKnowledge);

            uint count = 0;
            while (count < batchSize && ItemDataEnumerator.MoveNext())
            {
                SyncableItem<T> item = ItemDataEnumerator.Current.Value;
                ItemChange itemChange = null;

                if (!item.IsDeleted)
                {
                    //
                    // Ghosts can be skipped when enumerating changes to full replicas
                    // forgotten knowledge will take care of excluding them
                    //
                    if (_filterAccepted || !item.IsGhost)
                    {
                        //
                        // Include all change units if item's creation version is not known,
                        // or all change units required marker is present in the request knowledge
                        //
                        if (!mappedRequestKnowledge.Contains(
                                                            ReplicaId,
                                                            item.GlobalId,
                                                            item.CurrentVersion))
                        {

                            itemChange = new ItemChange(
                                                IdFormatGroup,
                                                ReplicaId,
                                                item.GlobalId,
                                                item.ChangeKind,
                                                item.CreationVersion,
                                                item.CurrentVersion);
                        }
                        
                    }
                }
                else
                {
                    if (!mappedRequestKnowledge.Contains(ReplicaId, item.GlobalId, item.CurrentVersion))
                    {
                        itemChange = new ItemChange(
                                            IdFormatGroup,
                                            ReplicaId,
                                            item.GlobalId,
                                            item.ChangeKind,
                                            item.CreationVersion,
                                            item.CurrentVersion);
                    }
                }

                if (itemChange != null)
                {
                    count++;
                    changeBatch.AddChange(itemChange);
                    //
                    // Filtered replica only has one filter change
                    //
                    itemChange.AddFilterChange(0, item.FilterChanges[0]);
                }
                SessionCount++;
            }

            if (SessionCount != MyStore.Count)
            {
                changeBatch.EndOrderedGroup(ItemDataEnumerator.Current.Key, MyKnowledge);
            }
            else
            {
                changeBatch.SetLastBatch();
                changeBatch.EndOrderedGroup(IdFormatGroup.ItemIdFormat.Infinity, MyKnowledge);
            }

            changeDataRetriever = this;
            return changeBatch;
        }
        #endregion KnowledgeSyncProvider

        #region ISupportFilteredSync methods
        //
        // ISupportFilteredSync
        //
        public bool TryAddFilter(
            object filter, 
            FilteringType filteringType)
        {
            ISyncFilter syncFilter = filter as ISyncFilter;

            _filterAccepted = syncFilter != null && _customFilter.IsIdentical(syncFilter);

            _filteringType = filteringType;

            return _filterAccepted;
        }
        #endregion ISupportFilteredSync methods

        #region IRequestFilteredSync methods
        //
        // IRequestFilteredSync
        //
        public void SpecifyFilter(
            FilterRequestCallback filterRequest)
        {
            if (!filterRequest(_customFilter, FilteringType.CurrentItemsAndVersionsForMovedOutItems))
            {
                throw new Exception("Filter not accepted at source");
            }

            _filterAccepted = true;
        }
        #endregion IRequestFilteredSync methods

        #region INotifyingChangeApplierTarget
        public override bool TryGetDestinationVersion(
            ItemChange sourceChange, 
            out ItemChange destinationVersion)
        {
            destinationVersion = null;
            SyncableItem<T> item;

            if (MyStore.TryGetValue(sourceChange.ItemId, out item))
            {
                if (!item.IsDeleted)
                { 
                    //
                    // Item is alive or ghost
                    //
                    destinationVersion = new ItemChange(
                                                IdFormatGroup,
                                                ReplicaId,
                                                item.GlobalId,
                                                item.ChangeKind,
                                                item.CreationVersion,
                                                item.CurrentVersion);
                  
                    foreach (uint filterIndex in item.FilterChanges.Keys)
                    {
                        destinationVersion.AddFilterChange(filterIndex, item.FilterChanges[filterIndex]);
                    }
                   
                }
                else
                {
                    destinationVersion = new ItemChange(
                                                IdFormatGroup,
                                                ReplicaId,
                                                item.GlobalId,
                                                ChangeKind.Deleted,
                                                item.CreationVersion,
                                                item.CurrentVersion);
                }
            }

            return destinationVersion != null;
        }

        public override void SaveItemChange(
            SaveChangeAction saveChangeAction,
            ItemChange change,
            SaveChangeContext context)
        {
            SyncableItem<T> item = null;
            MyStore.TryGetValue(change.ItemId,out item);
            SortedList<uint, FilterChange> destFilterChanges = null;
            if (item != null)
            {
                destFilterChanges = item.FilterChanges;
            }

            switch (saveChangeAction)
            {
                case SaveChangeAction.CreateGhost:
                    MyStore.CreateGhost(change);
                    base.SaveItemChanges(SaveChangeAction.UpdateVersionOnly, change, context);
                    break;

                case SaveChangeAction.MarkItemAsGhost:
                    MyStore.MarkItemAsGhost(change);
                    base.SaveItemChanges(SaveChangeAction.UpdateVersionOnly, change, context);
                    break;

                case SaveChangeAction.UnmarkItemAsGhost:
                    MyStore.UnMarkItemAsGhost(change,(T)((T)context.ChangeData).Clone());
                    base.SaveItemChanges(SaveChangeAction.UpdateVersionOnly, change, context);
                    break;

                case SaveChangeAction.UpdateGhost:
                    base.SaveItemChanges(SaveChangeAction.UpdateVersionOnly, change, context);
                    break;
                case SaveChangeAction.Create:
                case SaveChangeAction.UpdateVersionOnly:
                case SaveChangeAction.UpdateVersionAndData:
                case SaveChangeAction.UpdateVersionAndMergeData:
                    base.SaveItemChanges(saveChangeAction, change, context);
                    break;
                case SaveChangeAction.DeleteGhostAndStoreTombstone:
                case SaveChangeAction.DeleteAndStoreTombstone:
                    item.DeleteItem(change.ChangeVersion);
                    item.ChangeKind = ChangeKind.Deleted;
                    break;

                default:
                    throw new InvalidOperationException("Unexpected saveChangeAction " + saveChangeAction.ToString());
            }

            SaveItemFilterChange(change, destFilterChanges);
        }
        
        #endregion INotifyingChangeApplierTarget


        #region IFilteredReplicaNotifyingChangeApplierTarget
        public FilterKeyMap FilterKeyMap
        {
            get
            {
                return _filterKeyMap;
            }
        }

        public ForgottenKnowledge GetFilterForgottenKnowledge(
            uint filterIndex)
        {
            if (filterIndex != 0)
            {
                throw new IndexOutOfRangeException("there is only one forgotten knowledge");
            }

            return _filterForgottenKnowledge;
        }

        public IEnumerator<SyncId> GetNewMoveInItems(
            SyncKnowledge baseKnowledge)
        {
            List<SyncId> newMoveIns = new List<SyncId>();
            SortedDictionary<SyncId, SyncableItem<T>>.Enumerator localEnumerator = MyStore.GetEnumerator();

            while (localEnumerator.MoveNext())
            {
                SyncableItem<T> item = localEnumerator.Current.Value;

                if (!item.IsDeleted &&
                    !item.IsGhost &&
                    !baseKnowledge.Contains(ReplicaId, item.GlobalId, item.FilterChanges[0].MoveVersion))
                {
                    newMoveIns.Add(item.GlobalId);
                }
            }

            return newMoveIns.GetEnumerator();
        }      

        public void SaveKnowledgeWithFilterForgottenKnowledge(
                    SyncKnowledge syncKnowledge, 
                    ForgottenKnowledge forgottenKnowledge, 
                    ForgottenKnowledge[] filterForgottenKnowledge)
        {
            if (filterForgottenKnowledge != null && filterForgottenKnowledge.Length != 1)
            {
                throw new ArgumentException("filterForgottenKnowledges", "There should only be one filter forgotten knowledge.");
            }

            base.SaveKnowledges(syncKnowledge, forgottenKnowledge);

            if (filterForgottenKnowledge != null)
            {
                _filterForgottenKnowledge = filterForgottenKnowledge[0];
            }
        }        
        #endregion  IFilteredReplicaNotifyingChangeApplierTarget

        public void SaveItemFilterChange(
            ItemChange change, 
            SortedList<uint, FilterChange> destFilterChanges)
        {
            SyncableItem<T> item = MyStore.GetSyncableItem(change.ItemId);

            uint filterIndex = 0;
            FilterChange soureFilterChange = null;
            FilterTrackingStatus filterTrackStatus;
            change.GetFilterChange(filterIndex,out filterTrackStatus,out soureFilterChange);

            FilterChange destFilterChange = null;
            if (destFilterChanges != null)
            {
                destFilterChange = destFilterChanges[filterIndex];
            }

            if (soureFilterChange != null)
            {
                SyncVersion sourceMoveVersion = soureFilterChange.MoveVersion;
                if (destFilterChange == null)
                {
                    if (_customFilter.IsInFilter(item.Item) != soureFilterChange.IsMoveIn)
                    {
                        item.FilterChanges[filterIndex] = new FilterChange(_customFilter.IsInFilter(item.Item),
                                                                       new SyncVersion(0, MyStore.TickCount));
                    }
                    else
                    {
                        item.FilterChanges[filterIndex] = new FilterChange(soureFilterChange.IsMoveIn, soureFilterChange.MoveVersion);
                    }
                }
                else
                {
                    bool isConflictFilterChanges = false;
                    if (soureFilterChange.IsMoveIn != destFilterChange.IsMoveIn)
                    {
                        isConflictFilterChanges = true;
                    }

                    bool isSourceMoveVersionObsolete = true;
                    if (!MyKnowledge.Contains(ReplicaId, item.GlobalId, item.CurrentVersion))
                    {
                        isSourceMoveVersionObsolete = false;
                    }
                    

                    if (!isSourceMoveVersionObsolete)
                    {
                        if (!isConflictFilterChanges)
                        {
                            SyncKnowledge mappedSourceItemMWKnowledge = MyKnowledge.MapRemoteKnowledgeToLocal(change.MadeWithKnowledge);
                            SyncVersion destMoveVersion = destFilterChange.MoveVersion;
                            
                            if (!mappedSourceItemMWKnowledge.Contains(ReplicaId, item.GlobalId, item.CurrentVersion))
                            {
                                isConflictFilterChanges = true;                                
                            }
                        
                        }
                    }

                    // No conflict on filter change move version
                    if (!isConflictFilterChanges)
                    {
                        if (!isSourceMoveVersionObsolete)
                        {
                            item.FilterChanges[filterIndex] = new FilterChange(soureFilterChange.IsMoveIn, soureFilterChange.MoveVersion);
                        }
                    }
                    else // conflict on filter changes, needs to resolve conflict by assign new move version to filter change
                    {
                        item.FilterChanges[filterIndex] = new FilterChange(_customFilter.IsInFilter(item.Item),
                                                                               new SyncVersion(0, MyStore.TickCount));
                    }
                }
            }
            else
            {
                if (destFilterChange == null || destFilterChange.IsMoveIn != _customFilter.IsInFilter(item.Item))
                {
                    item.FilterChanges[filterIndex] = new FilterChange(_customFilter.IsInFilter(item.Item),
                                                                       new SyncVersion(0, MyStore.TickCount));
                }
            }
        }

        #region Private fields
        //
        // Member variables for filtered replica
        IBaseFilter<T>          _customFilter;
        FilterKeyMap            _filterKeyMap;
        ForgottenKnowledge      _filterForgottenKnowledge;

        //
        // Member variables for change enumeration
        //
        bool                _filterAccepted;
        FilteringType       _filteringType;
        #endregion Private fields        
    }
}