并行开发之信号量简介

轻量级信号量:

CountdownEvent,SemaphoreSlim,ManualResetEventSlim

CountdownEvent

采用信号计数的方式,即定义了最多能够进入关键代码的线程数。并且可以动态改变“信号计数”的大小。

官方示例请参考:CountdownEvent

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp4
{
internal class Program
{
//默认的容纳大小为“硬件线程“数
private static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);

private static void Main()
{
int userTaskCount = 5;

//重置信号
cde.Reset(userTaskCount);

for (int i = 0; i < userTaskCount; i++)
{
Task.Factory.StartNew(LoadUser, i);
}

//等待所有任务执行完毕
cde.Wait();
Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}",
cde.InitialCount, cde.CurrentCount, cde.IsSet);
Console.WriteLine("\nUser表数据全部加载完毕!\n");

//加载product需要8个任务
var productTaskCount = 8;

cde.Reset(productTaskCount);
cde.AddCount(2);

Console.WriteLine("After Reset(8), AddCount(2): InitialCount={0}, CurrentCount={1}, IsSet={2}",
cde.InitialCount, cde.CurrentCount, cde.IsSet);

for (int i = 0; i < cde.CurrentCount; i++)
{
Task.Factory.StartNew(LoadProduct, i);
}
//等待所有任务执行完毕
cde.Wait();
Console.WriteLine("\nProduct表数据全部加载完毕!\n");

// Now try waiting with cancellation
CancellationTokenSource cts = new CancellationTokenSource();
cts.Cancel(); // cancels the CancellationTokenSource
try
{
cde.Wait(cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("cde.Wait(preCanceledToken) threw OCE, as expected");
}
finally
{
cts.Dispose();
}
// It's good to release a CountdownEvent when you're done with it.
cde.Dispose();

Console.ReadKey();
}

private static void LoadUser(object obj)
{
try
{
Console.WriteLine("当前任务:{0}正在加载User部分数据!", obj);
}
finally
{
cde.Signal();
}
}

private static void LoadProduct(object obj)
{
try
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", obj);
}
finally
{
cde.Signal();
}
}
}
}

SemaphoreSlim

对可同时访问资源或资源池的线程数加以限制

官方示例参考:SemaphoreSlim

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp4
{
internal class Program
{
private static SemaphoreSlim semaphore;

// A padding interval to make the output more orderly.
private static int padding;

public static void Main()
{
// Create the semaphore.
semaphore = new SemaphoreSlim(0, 3);
Console.WriteLine("{0} tasks can enter the semaphore.",
semaphore.CurrentCount);
Task[] tasks = new Task[5];

// Create and start five numbered tasks.
for (int i = 0; i <= 4; i++)
{
tasks[i] = Task.Run(() =>
{
// Each task begins by requesting the semaphore.
Console.WriteLine("Task {0} begins and waits for the semaphore.",
Task.CurrentId);

// 信号量: semaphore.Wait()和semaphore.Release()方法范围
semaphore.Wait();

Interlocked.Add(ref padding, 1000);

Console.WriteLine("DateTime:{0},Task {1} enters the semaphore.", DateTime.Now, Task.CurrentId);

// The task just sleeps for 1+ seconds.
Thread.Sleep(1000 + padding);
int id = semaphore.Release();

Console.WriteLine("Task {0} releases the semaphore; previous count: {1}",
Task.CurrentId, id);
});
}

// Wait for half a second, to allow all the tasks to start and block.
Thread.Sleep(500);

// Restore the semaphore count to its maximum value.
Console.Write("Main thread calls Release(3) --> ");
semaphore.Release(3);// 释放3次
Console.WriteLine("{0} tasks can enter the semaphore.",
semaphore.CurrentCount);
// Main thread waits for the tasks to complete.
Task.WaitAll(tasks);

Console.WriteLine("Main thread exits.");

Console.ReadKey();
}
}
}

运行结果:

微信截图_20190306162258.png

ManualResetEventSlim

ManualResetEventSlim是ManualReset的轻量级版本,采用的是”自旋等待“+”内核等待“,也就是说先采用”自旋等待的方式“等待,直到另一个任务调用set方法来释放它。如果迟迟等不到释放,那么任务就会进入基于内核的等待,所以说如果我们知道等待的时间比较短,采用轻量级的版本会具有更好的性能。

官方示例请参考:

ManualResetEventSlim Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp4
{
internal class Program
{
public static void Main()
{
MRES_SetWaitReset();
MRES_SpinCountWaitHandle();
Console.ReadKey();
}

private static void MRES_SetWaitReset()
{
ManualResetEventSlim mres1 = new ManualResetEventSlim(false,2000); // initialize as unsignaled

Task[] task=new Task[5];

for (var i = 0; i < 5; i++)
{
task[i] = Task.Factory.StartNew(() =>
{
// 等待mres1.Set();
mres1.Wait();
Console.WriteLine("当前时间:{0},任务ID:mres1收到信号开始执行!", DateTime.Now);
});
}
Console.WriteLine("当前时间:{0} 主线程ID:{1} \n",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId);

// 等待两秒
Thread.Sleep(2000);

try
{
// 设置信号,使等待线程开始执行
mres1.Set();
Task.WaitAll(task);
}
finally
{
// 释放
mres1.Dispose();
}
}

// Demonstrates:
// ManualResetEventSlim construction w/ SpinCount
// ManualResetEventSlim.WaitHandle
private static void MRES_SpinCountWaitHandle()
{
// Construct a ManualResetEventSlim with a SpinCount of 1000
// Higher spincount => longer time the MRES will spin-wait before taking lock
ManualResetEventSlim mres1 = new ManualResetEventSlim(false, 1000);
ManualResetEventSlim mres2 = new ManualResetEventSlim(false, 1000);

Task bgTask = Task.Factory.StartNew(() =>
{
// Just wait a little
Thread.Sleep(100);

// Now signal both MRESes
Console.WriteLine("Task signalling both MRESes");
mres1.Set();
mres2.Set();
});

// A common use of MRES.WaitHandle is to use MRES as a participant in
// WaitHandle.WaitAll/WaitAny. Note that accessing MRES.WaitHandle will
// result in the unconditional inflation of the underlying ManualResetEvent.
WaitHandle.WaitAll(new WaitHandle[] { mres1.WaitHandle, mres2.WaitHandle });
Console.WriteLine("WaitHandle.WaitAll(mres1.WaitHandle, mres2.WaitHandle) completed.");

// Clean up
bgTask.Wait();
mres1.Dispose();
mres2.Dispose();
}
}
}

参考:

8天玩转并行开发——第五天 同步机制(下)