您的位置:首页 > 其它

微软同步框架入门之七--定制同步提供程序(SyncProvider)

2009-01-04 16:59 477 查看
在上一篇文章当中,简要介绍了一下如何使用现有的 SyncProvider 对象来进行文件(夹)同步。今天的这个DEMO主要演示通过继承方式来实现自己的SyncProvider来进行相关同步元数据存储,当然这样做会给我们以额外的好处,比如按自己的意愿来创建、版本和删除项的信息等。另外就是对同步应用程序的工作原理也会有一个大概的认识。
在开始正文之前,我在网上看到了这篇文章对MSF的负责报道,当然它所说的部分观点我同意。MSF的确不那么好学,官方的文档的确也难懂,但事情总要慢慢来,只有通过不断积累不断进步,产品才会做的越来越好。就让我们一起与MSF共同进步吧!
好了,开始今天的正文。

首先要解释一下自定义同步程序(本DEMO中为“MySyncProvider”)所要继承和实现的接口:
KnowledgeSyncProvider : 表示使用知识执行同步的同步提供程序

IChangeDataRetriever:表示目标提供程序从源提供程序检索项数据所使用的机制。

INotifyingChangeApplierTarget:表示可将项变更保存到副本中的对象

备注:当使用 NotifyingChangeApplier 对象以帮助将变更应用到目标副本时,目标提供程序必须实现此接口。INotifyingChangeApplierTarget 对象传递给 ApplyChanges 方法。然后,NotifyingChangeApplier 对象调用 INotifyingChangeApplierTarget 方法,将变更和冲突保存到目标副本中。

KnowledgeSyncProvider方法定义如下主要方法:

BeginSession 在派生类中重写时,通知提供程序它将联接一个同步会话。

EndSession 在派生类中重写时,通知提供程序它所登记到的同步会话已经完成。

GetChangeBatch 在派生类中重写时,获取包含某些项的项元数据的变更批,这些项不包含在来自目标提供程序的指定知识中。

GetFullEnumerationChangeBatch 在派生类中重写时,获取作为完全枚举一部分的变更批,该变更批中包含ID 大于指定下限的项的项元数据。

GetSyncBatchParameters 在派生类中重写时,获取要包含在变更批中的项变更数,以及同步作用域的当前知识。

ProcessChangeBatch 在派生类中重写时,通过检测冲突和将变更应用到项存储区中来处理变更组。

ProcessFullEnumerationChangeBatch 在派生类中重写时,通过将变更应用到项存储区中来处理变更组,以用于完全枚举。

下面是其GetChangeBatch中的方法:
/// <summary>
/// 获取包含某些项的项元数据的变更批,这些项不包含在来自目标提供程序的指定知识中。
/// </summary>
/// <param name="batchSize">要包含在变更批中的变更数</param>
/// <param name="destinationKnowledge">来自目标提供程序的知识</param>
/// <param name="dataRetriever">返回一个可用于检索变更数据的对象</param>
/// <returns>包含某些项的项元数据的变更批,这些项不包含在来自目标提供程序的指定知识中</returns>
public override ChangeBatch GetChangeBatch(uint batchSize, SyncKnowledge destinationKnowledge, out object dataRetriever)
{
//得到累加的tick count
GetNextTickCount();

//获得本地修改变
List<ItemChange> changes = DetectChanges(destinationKnowledge, batchSize);

// 使用知识副本的tickCount
myKnowledge.SetLocalTickCount(tickCount);

//将tick count 保存到磁盘
SaveTickCountFile();

//ChangeBatch表示一组变更的元数据。此类不能继承。同步提供程序使用变更批将项变更的元数据从源提供
//程序传输到目标提供程序。源提供程序枚举变更,并将指定数目的变更添加到变更批中。然后将变更批发送
//到目标提供程序。目标提供程序枚举变更批中的变更,并将这些变更应用到项存储区。
ChangeBatch changeBatch = new ChangeBatch(IdFormats, destinationKnowledge, myForgottenKnowledge);

//在变更批中打开一个未排序的组。此组中的项变更可以按任意顺序进行排序。 在调用此方法后添加到变更
//批的项变更将添加到打开的组中。只有先打开一个组,项变更才能添加到变更批中。
changeBatch.BeginUnorderedGroup();

//将指定的项变更组添加到当前打开的组 在项变更可以添加到 ChangeBatchBase 对象之前,必须通过调用
//ChangeBatch 对象的 BeginOrderedGroup 或 BeginUnorderedGroup 方法来打开组。否则,此方法将引发
//InvalidOperationException。
changeBatch.AddChanges(changes);

bool isLastBatch = false;
if((changes.Count < batchSize) || (changes.Count == 0))
{
//设置一个值,用于指示这是同步会话的最后一批。 此方法必须针对源提供程序发送的最后一个变更
//批进行调用;否则 Sync Framework 将继续请求更多的变更批。
changeBatch.SetLastBatch();
isLastBatch = true;
}
//关闭先前在变更批中打开的未排序的组。
changeBatch.EndUnorderedGroup(
myKnowledge, //包含在组中的变更的生成知识。它通常是生成此组的副本的知识。
isLastBatch);//当 madeWithKnowledge 中包含的所有变更都包含在此变更批中时,为 true;
//否则为 false。

dataRetriever = this;

// 构造该变更批之后返回它
return changeBatch;
}

下面是其GetSyncBatchParameters中的方法:

/// <summary>
/// 在派生类中重写时,获取要包含在变更批中的项变更数,以及同步作用域的当前知识。
/// </summary>
/// <param name="batchSize">要包含在此对象返回的变更批中的项变更数</param>
/// <param name="destinationKnowledge">同步作用域的当前知识,或者新创建的知识对象(如果没有当前知识)</param>
public override void GetSyncBatchParameters(out uint batchSize,out SyncKnowledge destinationKnowledge)
{
if(myKnowledge == null)
throw new InvalidOperationException("Knowledge not yet loaded.");

//设置具有此知识的副本的滴答计数。在向另一副本发送此知识之前,滴答计数必须为当前滴答计数。通常,
//提供程序在发送其知识的前一刻才调用此方法;但是,可以随时调用此方法。
myKnowledge.SetLocalTickCount(tickCount);

batchSize = RequestedBatchSize;

destinationKnowledge = myKnowledge.Clone();
}

ProcessChangeBatch方法:
/// <summary>
/// 通过检测冲突和将变更应用到项存储区中来处理一组变更。
/// 如果源变更包含变更单位变更,则目标提供程序必须确定哪个变更单位版本(如果有)包含在发送至变更应用方的目标版本批中。此决定取决于来自源提供程序的变更的类型以及项在目标副本中是否标记为已删除。有关更多信息,请参见同步变更单位。
/// </summary>
/// <param name="resolutionPolicy">此方法应用变更时使用的冲突解决策略</param>
/// <param name="sourceChanges">要在本地应用的源提供程序中的变更批</param>
/// <param name="dataRetriever">一个可用于检索变更数据的对象。该对象可以是 ISynchronousDataRetriever 对象,也可以是提供程序特定的对象</param>
/// <param name="callbackProxy">一个在变更应用期间接收事件通知的对象</param>
/// <param name="syncSessionStatistics">跟踪变更统计信息。对于使用自定义变更应用的提供程序,必须使用变更应用的结果来更新此对象</param>
public override void ProcessChangeBatch(ConflictResolutionPolicy resolutionPolicy,ChangeBatch sourceChanges,
object dataRetriever, SyncCallbacks callbackProxy, SyncSessionStatistics syncSessionStatistics)
{
ulong tickCount;

//累加tick count.
tickCount = GetNextTickCount();

// 更新元数据存储用于最新的本地变更
FindLocalFileChanges();

// 累加本地知识的tick count.
myKnowledge.SetLocalTickCount(tickCount);

// 持久化当前本地的 tickcount
SaveTickCountFile();

// 创建一个相对应的修改批。

List<ItemChange> localVersions = new List<ItemChange>();

// Iterate over changes in the source ChangeBatch

foreach (ItemChange ic in sourceChanges)
{
ItemMetadata item;//重写时,表示与同步作用域中的项相关联的元数据。 项的元数据中包含其创建版本、当前版本、全局 ID 和一个指示是否已从项存储区中删除该项的值。此外,该元数据还包含由用于初始化副本元数据的 FieldSchema 类定义的自定义字段。

ItemChange change;//表示对项的变更

// 循环每个item,以便获得相应的本地存储 version(版本)
if (metadataStore.TryGetItem(ic.ItemId, out item))
{
// 在本地元数据中找到相应的 item

// 从元数据中获得本地创建version 和修改version

//ChangeKind:表示对项所做的变更的类型。
// Deleted 变更是一个删除操作。
// UnknownItem 要变更的项对于副本来说是未知的。
// Update 变更是一个更新操作。

change = new ItemChange(IdFormats, ReplicaId, item.ItemId,
item.IsTombstone ? ChangeKind.Deleted : ChangeKind.Update, // If local item is a tombstone, mark it accordingly
item.CreationVersion, item.ChangeVersion);
}
else
{
// 远程 item 不具备本地相应项(counterpart),则该item 未知
change = new ItemChange(IdFormats, replicaId, ic.ItemId,
ChangeKind.UnknownItem, // Mark the change as unknown
SyncVersion.UnknownVersion, //表示项或变更单位的版本。 ReplicaKey 获取与该版本相关联的副本键。 TickCount 获取与该版本相关联的滴答计数。 UnknownVersion 返回一个包含副本键并且滴答计数设置为 0 的 SyncVersion 对象。
SyncVersion.UnknownVersion);
}

// Add our change to the change list

localVersions.Add(change);
}
// 构造变更批

// 现在构造修改应用器,该应用器会比较本地和远程版本, 应用不存在冲突的修改。并且会检测冲突并按指定进行响应
ForgottenKnowledge destinationForgottenKnowledge = new ForgottenKnowledge(IdFormats, myKnowledge);

//表示一个变更应用方,它检查源提供程序中的一组变更,检测与目标副本中的项的冲突,并根据需要调用已注册的变更应用方目标来保存变更或保存冲突。
NotifyingChangeApplier changeApplier = new NotifyingChangeApplier(IdFormats);
//对变更批执行冲突检测、冲突处理和变更应用。
changeApplier.ApplyChanges(resolutionPolicy, //用于解决冲突的策略。
sourceChanges, //来自源提供程序的变更批
dataRetriever as IChangeDataRetriever,//可用于从源副本中检索项数据的对象
localVersions,//一个包含存储在目标副本中的项的版本的变更批。 这些项与 sourceChanges 中的项相对应
myKnowledge.Clone(),//目标副本的知识
destinationForgottenKnowledge, //目标副本的遗忘知识
this,//将被调用以用于保存变更和冲突的对象
currentSessionContext,//有关当前会话的状态信息
callbackProxy);//将接收有关变更应用事件的通知的回调
}

下面是继承INotifyingChangeApplierTarget接口时的实现方法:
/// <summary>
/// 递增滴答计数并返回新的滴答计数(滴答计数必须单调递增,INotifyingChangeApplierTarget)
/// </summary>
/// <returns>新递增的滴答计数</returns>
public ulong GetNextTickCount()
{
return ++tickCount;
}

/// <summary>
/// 获取一个可用于从副本中检索项数据的对象。(INotifyingChangeApplierTarget)
/// </summary>
/// <returns></returns>
public virtual IChangeDataRetriever GetDataRetriever()
{
return this;
}

/// <summary>
/// 存储当前作用域的知识(INotifyingChangeApplierTarget)
/// </summary>
/// <param name="knowledge">作用域的现有知识</param>
/// <param name="forgottenKnowledge">要保存的遗忘知识</param>
public virtual void StoreKnowledgeForScope(SyncKnowledge knowledge, ForgottenKnowledge forgottenKnowledge)
{
myKnowledge = knowledge;
myForgottenKnowledge = forgottenKnowledge;
SaveKnowledgeFile();
}

/// <summary>
/// 将项变更保存到项存储区中(INotifyingChangeApplierTarget)
/// </summary>
/// <param name="saveAction">要为变更执行的操作</param>
/// <param name="change">要保存的项变更</param>
/// <param name="saveChangeContext">有关要应用的变更的信息</param>
public virtual void SaveItemChange(SaveChangeAction saveAction, ItemChange change, SaveChangeContext saveChangeContext)
{
// 获得变更 item
TransferMechanism data = saveChangeContext.ChangeData as TransferMechanism;

ItemMetadata item;

///SaveChangeAction: 表示变更所指示的操作类型。
/// Create 在目标项存储区中新建该项。
/// DeleteAndRemoveTombstone 从目标项存储区中删除该项。逻辑删除不保留在目标元数据中。此操作在遗忘知识恢复期间调用,与项删除对应,其中源副本已清除了逻辑删除。
/// DeleteAndStoreTombstone 从目标项存储区中删除该项。逻辑删除保存在目标元数据中。此操作与已从源副本中删除的项对应。
/// UpdateVersionAndData 将项数据应用于目标项存储区。现有数据被覆盖。提供的版本覆盖目标元数据中的现有版本。此操作与源副本中的项创建、项更新或变更单位更新对应。当源提供程序中的项被选为冲突入选方时也会发生此情况。
/// UpdateVersionAndMergeData 将项数据应用于目标项存储区。使用某种提供程序特定的方法将现有项数据与更新的项数据合并。该版本覆盖目标元数据中的现有版本。如果冲突解决操作是 Merge,则会执行此操作。
/// UpdateVersionOnly 在目标项存储区中不变更项数据。在目标元数据中覆盖版本。当目标提供程序中的项被选为冲突入选方时也会发生此情况。

if (saveAction == SaveChangeAction.UpdateVersionOnly || ((change.ChangeKind & ChangeKind.Deleted) == ChangeKind.Deleted))
{
if (!metadataStore.TryGetItem(change.ItemId, out item))
{
item = new ItemMetadata();
item.Uri = String.Empty;
}
}
else
{
item = new ItemMetadata();
item.Uri = data.Uri;
}

item.ItemId = change.ItemId;
item.CreationVersion = change.CreationVersion;
item.ChangeVersion = change.ChangeVersion;

if ((change.ChangeKind & ChangeKind.Deleted) == ChangeKind.Deleted)
item.IsTombstone = true;

if(!metadataStore.Has(item.ItemId))
{
ItemMetadata oldItem;

if(metadataStore.TryGetItem(item.Uri, out oldItem))
{
if (item.ItemId.CompareTo(oldItem.ItemId) > 0)
{
oldItem.IsTombstone = true;
oldItem.Uri = String.Empty;
oldItem.ChangeVersion = new SyncVersion(0, tickCount);
}
else
{
item.IsTombstone = true;
item.Uri = String.Empty;
item.ChangeVersion = new SyncVersion(0, tickCount);
}

metadataStore.SetItemInfo(item);
metadataStore.SetItemInfo(oldItem);
}
}

// 应用修改
if (!(saveAction == SaveChangeAction.UpdateVersionOnly) && ((change.ChangeKind & ChangeKind.Deleted) == 0))
{
if (item.Uri != String.Empty)
{
FileStream outputStream = new FileStream(Path.Combine(folderPath, item.Uri), FileMode.OpenOrCreate);

const int copyBlockSize = 4096;
byte[] buffer = new byte[copyBlockSize];

int bytesRead;

while ((bytesRead = data.DataStream.Read(buffer, 0, copyBlockSize)) > 0)
outputStream.Write(buffer, 0, bytesRead);

outputStream.SetLength(outputStream.Position);

outputStream.Close();

FileInfo fi = new FileInfo(Path.Combine(folderPath, item.Uri));

item.LastWriteTimeUtc = fi.LastWriteTimeUtc;
}

data.DataStream.Close();
}
else
{
if(item.IsTombstone && item.Uri != String.Empty)
{
File.Delete(Path.Combine(folderPath, item.Uri));
}
}

metadataStore.SetItemInfo(item);

metadataStore.Save();

//获取表示应用此变更后的目标知识状态的更新知识对象和遗忘知识对象。 此方法返回的知识对象应直接替换目标中的现有知识对象。
saveChangeContext.GetUpdatedDestinationKnowledge(out myKnowledge,//目标的更新知识
out myForgottenKnowledge);//目标的已更新的遗忘知识。
}

还有IChangeDataRetriever 接口下的LoadChangeData方法:

/// <summary>
/// 检索变更的项数据。(IChangeDataRetriever接口)
/// 注:源提供程序确定由此方法返回的对象的类型。该对象可以像装箱值类型一样简单,也可以像包含
/// 高级数据检索方法的类一样复杂。
/// </summary>
/// <param name="context">对应检索其数据的变更进行描述的元数据</param>
/// <returns>变更的项数据</returns>
public virtual object LoadChangeData(LoadChangeContext context)
{
ItemMetadata item;

// 返回修改项的元数据
metadataStore.TryGetItem(context.ItemChange.ItemId, out item);

// 确保该项未删除

if (item.IsTombstone)
{
throw new InvalidOperationException("无法加载当前删除项的修改数据.");
}

//打开当前要传输文件的流
Stream dataStream = new FileStream(Path.Combine(FolderPath, item.Uri), FileMode.Open, FileAccess.Read, FileShare.Read);

// 创建用于传输修改数据的TransferMechanism实例

TransferMechanism transferMechanism = new TransferMechanism(dataStream, item.Uri);

//返回该数据传输对象
return transferMechanism;
}

下面是其相应的构造方法:
/// <summary>
/// Constructor
/// </summary>
/// <param name="replicaId">当前实例的副本id</param>
/// <param name="folderPath">当前副本的存放路径</param>
/// <param name="knowledgeFilePath">当前副本知识的存放路径</param>
/// <param name="metadataFilePath">当前副本的元数据存储路径</param>
/// <param name="tickCountFilePath">当前副本tick count数据的存储路径</param>
public MySyncProvider(Guid replicaId, string folderPath, string knowledgeFilePath,
string metadataFilePath, string tickCountFilePath)
{
ReplicaId = new SyncId(replicaId);
FolderPath = folderPath;
KnowledgeFilePath = knowledgeFilePath;
MetadataFilePath = metadataFilePath;
TickCountFilePath = tickCountFilePath;

// Set-up the metadata store

metadataStore = new MetadataStore();

if(File.Exists(MetadataFilePath))
{
// 文件存在则加载
metadataStore.Load(MetadataFilePath);
}
else
{
// 不存在则创建
metadataStore.Create(MetadataFilePath);
}

// 加载tick count
if(File.Exists(TickCountFilePath))
{
//文件存在则加载
LoadTickCountFile();
}
else
{
// 不存在则创建
// 新文件的tickCount 值为 1
SaveTickCountFile();
}

// 加载知识

if(File.Exists(KnowledgeFilePath))
{
// //文件存在则加载

LoadKnowledgeFile();
}
else
{
// 不存在则创建知识
CreateKnowledgeBlob();
}
}

/// <summary>
/// 将tick count 保存到磁盘
/// </summary>
private void SaveTickCountFile()
{
FileStream stream = new FileStream(TickCountFilePath, FileMode.OpenOrCreate);

// 将tick count 写入文件
BinaryWriter bw = new BinaryWriter(stream);
bw.Write(tickCount);

bw.Close();
}

/// <summary>
/// 从磁盘上恢复
/// </summary>
private void LoadTickCountFile()
{
FileStream stream = new FileStream(TickCountFilePath, FileMode.OpenOrCreate);

// Read the tick count from the file
BinaryReader br = new BinaryReader(stream);
tickCount = br.ReadUInt64();

br.Close();
}

/// <summary>
/// 对当前副本创建一个新的知识实例
/// </summary>
private void CreateKnowledgeBlob()
{
myKnowledge = new SyncKnowledge(IdFormats, ReplicaId, tickCount);
myForgottenKnowledge = new ForgottenKnowledge(IdFormats, myKnowledge);
SaveKnowledgeFile();
}

private void SaveKnowledgeFile()
{
FileStream stream = new FileStream(KnowledgeFilePath, FileMode.OpenOrCreate);

// 将知识序列化到文件
BinaryFormatter bf = new BinaryFormatter();
bf.Serialize(stream, myKnowledge);
bf.Serialize(stream, myForgottenKnowledge);

// 关闭该文件
stream.Close();
}

private void LoadKnowledgeFile()
{
FileStream stream = new FileStream(KnowledgeFilePath, FileMode.Open);

// 反序列化知识文件

BinaryFormatter bf = new BinaryFormatter();
myKnowledge = (SyncKnowledge) bf.Deserialize(stream);

// 对知识进行判断,看其是否为当前副本知识
if(myKnowledge.ReplicaId != ReplicaId)
throw new Exception("Replica id of loaded knowledge doesn't match replica id provided in constructor.");

// 加载遗忘知识
myForgottenKnowledge = (ForgottenKnowledge) bf.Deserialize(stream);

//对遗忘知识进行判断,看其是否为当前副本遗忘知识
if (myForgottenKnowledge.ReplicaId != ReplicaId)
throw new Exception("Replica id of loaded forgotten knowledge doesn't match replica id provided in constructor.");

// 关闭文件
stream.Close();
}

好了,主要的代码就介绍的差不多了。下面是其运行后的结果:



注:本DEMO部分内容取自http://code.msdn.microsoft.com/sync/Release/ProjectReleases.aspx?ReleaseId=1713

今天的内容就先到这里了。

原文链接:http://www.cnblogs.com/daizhj/archive/2008/11/24/1339869.html
作者: daizhj, 代震军
Tags: 微软同步框架,file sync,文件同步,KnowledgeSyncProvider,SyncProvider
网址: http://daizhj.cnblogs.com/
DEMO下载,请点击这里
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息