分布式系统已经存在了一段时间,并且在设计它们时已经建立了众所周知的模式。今天,我们将讨论一种流行的模式:“锁”。
简单来说,锁是进程如何获得对资源的独占访问权以执行某个操作。例如,想象一下在一个存储帐户中有一堆 Blob,您需要一个实例的服务来处理每个 blob,以避免重复处理。实现的方法是在 blob 上获取锁,完成处理,然后释放锁。然而,如果在释放锁之前进程失败(无论是因为进程崩溃还是由于网络分区),就会出现潜在问题,导致资源被无限期锁定。这可能导致死锁和资源争用。
为了防止死锁,可以采用的一种策略是使用超时或租赁锁。
超时锁
- 在这种情况下,进程请求锁时有一个预定义的超时时间。如果在超时之前没有释放锁,系统会确保锁最终被释放。
租赁锁
-
对于基于租约的锁,提供了一个 续租 API,并且有超时机制。持锁的进程必须在租约到期之前调用此 API,以保持对资源的独占访问。如果进程未能及时续租,锁会自动释放,允许其他进程获取它。
超时和基于租约的锁的优缺点
Pros | Cons | |
---|---|---|
基于超时的锁 | 实施简单 | 需要仔细选择超时时间 |
防止永久锁定 | 如果处理未完成,则无法续租 | |
基于租约的锁 | 降低了提前 锁过期的风险 |
需要续租机制 |
进程可以继续请求租约,直到工作完成。 |
上述两种策略都是在 分布式系统 中快速恢复进程失败或网络分区的一种方式。
使用Azure存储的租约锁策略
让我们看看如何在Azure存储中使用租约锁策略。这也涵盖了超时锁策略。
步骤1: 导入Storage Blob Nuget
“12.23.0”是撰写本文时的最新版本。最新版本可在Azure存储Blob中找到。
XML
<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" Version="12.23.0" />
</ItemGroup>
步骤2:获取租约
以下是获取租约的代码。
C#
public async Task<string> TryAcquireLeaseAsync(string blobName, TimeSpan durationInSeconds, string leaseId = default)
{
BlobContainerClient blobContainerClient = new BlobContainerClient(new Uri($"https://{storageName}.blob.core.windows.net/processors"), tokenCredential, blobClientOptions);
BlobLeaseClient blobLeaseClient = blobContainerClient.GetBlobClient(blobName).GetBlobLeaseClient(leaseId);
try
{
BlobLease lease = await blobLeaseClient.AcquireAsync(durationInSeconds).ConfigureAwait(false);
return lease.LeaseId;
}
catch (RequestFailedException ex) when (ex.Status == 409)
{
return default;
}
}
- 首先,我们创建一个Blob容器客户端,并检索我们想要获取租约的特定blob的blob客户端。
- 其次,“Acquire Async”方法尝试为特定持续时间获取租约。如果获取成功,将返回租约ID,否则将抛出409(冲突的状态代码)。
- 这里的“Acquire Async”是关键方法。其余代码可以根据您的需求进行定制/编辑。
步骤3: 更新租约
- “Renew Async”是用于更新租约的Storage .NET SDK中的方法。
- 如果更新不成功,将抛出异常,同时说明失败原因。
C#
public async Task ReleaseLeaseAsync(string blobName, string leaseId)
{
BlobLeaseClient blobLeaseClient = this.blobContainerClient.GetBlobClient(blobName).GetBlobLeaseClient(leaseId);
await blobLeaseClient.RenewAsync().ConfigureAwait(false);
}
步骤4:编排获取和续订租约方法
- 最初,我们调用“Try Acquire Lease Async”从第2步获取租约标识符。一旦成功,就会启动一个后台任务,每隔X秒调用第3步中的“Renew Lease Async”。确保在超时和调用续订租约方法之间有足够的时间。
C#
string leaseId = await this.blobReadProcessor.TryAcquireLeaseAsync(blobName, TimeSpan.FromSeconds(60)).ConfigureAwait(false);
Task leaseRenwerTask = this.taskFactory.StartNew(
async () =>
{
while (leaseId != default && !cancellationToken.IsCancellationRequested)
{
await Task.Delay(renewLeaseMillis).ConfigureAwait(false);
await this.blobReadProcessor.RenewLeaseAsync(blobName, leaseId).ConfigureAwait(false);
}
},
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
- 取消令牌用于在不再需要时优雅地停止租约续订任务。
第5步:取消租约续订
- 当调用“Cancel Async”方法时,第4步中的“IsCancellationRequested”变为true,因此我们不再进入while循环并请求租约续订。
C#
await cancellationTokenSource.CancelAsync().ConfigureAwait(false);
await leaseRenwerTask.WaitAsync(Timeout.InfiniteTimeSpan).ConfigureAwait(false);
第6步:释放租约
最后,释放租约只需调用“Release Async”方法。
C#
public async Task ReleaseLeaseAsync(string blobName, string leaseId)
{
BlobLeaseClient blobLeaseClient = this.blobContainerClient.GetBlobClient(blobName).GetBlobLeaseClient(leaseId);
await blobLeaseClient.ReleaseAsync().ConfigureAwait(false);
}
结论
锁是分布式系统中的基本模式之一,用于获得对资源的独占访问。在处理它们时,有必要牢记其中的陷阱,以确保运行的顺畅。通过使用Azure Storage,我们可以实现这些高效的锁定机制,可以防止无限阻塞,并且同时提供锁定的维护弹性。
Source:
https://dzone.com/articles/locks-in-distributed-systems-timeout-lease-based