.Net多线程编程—System.Threading.Tasks.Parallel

1、Parallel.Invoke
主要用于任务的并行
  这个函数的功能和Task有些相似,就是并发执行一系列任务,然后等待所有完成。和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相关管理功能。它有两种形式:
  Parallel.Invoke( params Action[] actions);
  Parallel.Invoke(Action[] actions,TaskManager
manager,TaskCreationOptions options);

System.Threading.Tasks.Parallel类提供了Parallel.Invoke,Parallel.For,Parallel.ForEach这三个静态方法。

图片 1图片 2

1 Parallel.Invoke

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var actions = new Action[]
            {
                () => ActionTest("test 1"),
                () => ActionTest("test 2"),
                () => ActionTest("test 3"),
                () => ActionTest("test 4")
            };

            Console.WriteLine("Parallel.Invoke 1 Test");
            Parallel.Invoke(actions);

            Console.WriteLine("结束!");
        }

        static void ActionTest(object value)
        {
            Console.WriteLine(">>> thread:{0}, value:{1}",
            Thread.CurrentThread.ManagedThreadId, value);
        }
    }
}

尽可能并行执行所提供的每个操作,除非用户取消了操作。

Program

方法:

2、For方法,主要用于处理针对数组元素的并行操作(数据的并行

1)public static void Invoke(params
Action[] actions);

图片 3图片 4

2)public static void
Invoke(ParallelOptions parallelOptions,

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.For(0, nums.Length, (i) =>
            {
                Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }
    }
}

params Action[] actions);

Program

参数:

3、Foreach方法,主要用于处理泛型集合元素的并行操作(数据的并行)

parallelOptions:一个对象,用于配置此操作的行为。

图片 5图片 6

Actions:要执行的操作数组

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.ForEach(nums, (item) =>
            {
                Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }
    }
}

异常:

Program

对方法1:

  数据的并行的方式二(AsParallel()):

    System.ArgumentNullException:
actions 参数为 null。

图片 7图片 8

    System.AggregateException:当 actions
数组中的任何操作引发异常时引发的异常。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evenNumbers = nums.AsParallel().Select(item => Calculate(item));
            //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
            Console.WriteLine(evenNumbers.Count());
            //foreach (int item in evenNumbers)
            //    Console.WriteLine(item);
            Console.ReadKey();
        }

        static int Calculate(int number)
        {
            Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
            return number * 2;
        }
    }
}

System.ArgumentException:actions数组包含
null 个元素。

Program

对方法2除上述异常外还包括:

  .AsOrdered() 对结果进行排序:

System.OperationCanceledException:parallelOptions
设置了System.Threading.CancellationToken。

图片 9图片 10

System.ObjectDisposedException:在
parallelOptions 中与 System.Threading.CancellationToken
关联的System.Threading.CancellationTokenSource已被释放。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{

    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evenNumbers = nums.AsParallel().AsOrdered().Select(item => Calculate(item));
            //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
            //Console.WriteLine(evenNumbers.Count());
            foreach (int item in evenNumbers)
                Console.WriteLine(item);
            Console.ReadKey();
        }

        static int Calculate(int number)
        {
            Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
            return number * 2;
        }
    }
}

说明:

Program

1)Invoke方法只有在actions全部执行完才会返回,即使在执行过程中出现异常也会完成。

  ForEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现。

2)不能保证actions中的所有操作同时执行。比如actions大小为4,但硬件线程数为2,那么同时运行的操作数最多为2。

图片 11图片 12

3)actions中的操作并行的运行且与顺序无关,若编写与运行顺序有关的并发代码,应选择其他方法。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            for (int j = 1; j < 4; j++)
            {
                ConcurrentBag<int>  bag = new ConcurrentBag<int>();
                var watch = Stopwatch.StartNew();
                watch.Start();
                Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
                {
                    for (int m = i.Item1; m < i.Item2; m++)
                    {
                        bag.Add(m);
                    }
                });
                Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
                GC.Collect();

            }
        }
    }
}

4)如果使用Invoke加载多个操作,多个操作运行时间迥异,总的运行时间以消耗时间最长操作为基准,这会导致很多逻辑内核长时间处于空闲状态。

Program

5)受限的并行可扩展性,这源于Invoke所调用的委托数目是固定的。

  ParallelOptions类
  ParallelOptions options = new ParallelOptions();
  //指定使用的硬件线程数为4
  options.MaxDegreeOfParallelism = 4;
  有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了MaxDegreeOfParallelism属性。

2 Parallel.For

图片 13图片 14

可能会并行运行迭代,可以监视和操作循环的状态。Parallel.For有多个重载的方法,下面列举部分方法。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class Student
    {
        public int ID { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
        public DateTime CreateTime { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var dic = LoadData();
            Stopwatch watch = new Stopwatch();
            watch.Start();
            var query2 = (from n in dic.Values.AsParallel()
                          where n.Age > 20 && n.Age < 25
                          select n).ToList();
            watch.Stop();
            Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds);

            Console.Read();
        }

        public static ConcurrentDictionary<int, Student> LoadData()
        {
            ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
            ParallelOptions options = new ParallelOptions();
            //指定使用的硬件线程数为4
            options.MaxDegreeOfParallelism = 4;
            //预加载1500w条记录
            Parallel.For(0, 15000000, options, (i) =>
            {
                var single = new Student()
                {
                    ID = i,
                    Name = "hxc" + i,
                    Age = i % 151,
                    CreateTime = DateTime.Now.AddSeconds(i)
                };
                dic.TryAdd(i, single);
            });

            return dic;
        }
    }
}

方法:

Program

1)public static ParallelLoopResult
For(int fromInclusive, int toExclusive, Action<int> body);

常见问题的处理

2)public static ParallelLoopResult
For(int fromInclusive, int toExclusive, Action<int,
ParallelLoopState> body);

  <1> 如何中途退出并行循环?
  是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。
  Break:
当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。

3)public static ParallelLoopResult
For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions,
Action<int, ParallelLoopState> body);

  Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。

4)public static ParallelLoopResult
For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions
parallelOptions, Func<TLocal> localInit, Func<int,
ParallelLoopState, TLocal, TLocal> body, Action<TLocal>
localFinally);

图片 15图片 16

参数:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            ConcurrentBag<int> bag = new ConcurrentBag<int>();

            Parallel.For(0, 20000000, (i, state) =>
            {
                if (bag.Count == 1000)
                {
                    //state.Break();
                    state.Stop();
                    return;
                }
                bag.Add(i);
            });

            Console.WriteLine("当前集合有{0}个元素。", bag.Count);

        }
    }
}

fromInclusive:开始索引(含)。

Program

toExclusive:结束索引(不含)。

  取消(cancel)

body:将被每个迭代调用一次的委托。

图片 17图片 18

parallelOptions:一个对象,用于配置此操作的行为。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        public static void Main()
        {

            var cts = new CancellationTokenSource();
            var ct = cts.Token;
            Task.Factory.StartNew(() => fun(ct));
            Console.ReadKey();
            //Thread.Sleep(3000);
            cts.Cancel();
            Console.WriteLine("任务取消了!");

        }

        static void fun(CancellationToken token)
        {
            Parallel.For(0, 100000,
                        new ParallelOptions { CancellationToken = token },
                        (i) =>
                        {
                            Console.WriteLine("针对数组索引{0}的一些工作代码……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId);
                        });
        }
    }
}

localInit:一个委托,用于返回每个任务的本地数据的初始状态。

Program

localFinally:一个委托,用于对每个任务的本地状态执行一个最终操作。

  <2> 并行计算中抛出异常怎么处理?
  首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就可以获取到一组异常。

返回结果:

图片 19图片 20

ParallelLoopResult
:包含有关已完成的循环部分的信息。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                Parallel.Invoke(Run1, Run2);
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            Console.WriteLine("结束了!");
            //Console.Read();
        }

        static void Run1()
        {
            Thread.Sleep(3000);
            throw new Exception("我是任务1抛出的异常");
        }

        static void Run2()
        {
            Thread.Sleep(5000);
            throw new Exception("我是任务2抛出的异常");
        }
    }
}

异常:

Program

System.ArgumentNullException:body 参数为
null,或 localInit 参数为 null,或 localFinally 参数为 null,或
parallelOptions 参数为 null。
System.AggregateException:包含在所有线程上引发的全部单个异常的异常。

  注意Parallel里面 不建议抛出异常
因为在极端的情况下比如进去的第一批线程先都抛异常了
此时AggregateExcepation就只能捕获到这一批的错误,然后程序就结束了

对于方法3)和4)除包含以上异常外还包括:

图片 21图片 22

System.OperationCanceledException:在
parallelOptions 设置了参数 System.Threading.CancellationToken。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class TestClass
    {
        public static List<int> NumberList = null;
        private static readonly object locker = new object();
        public void Test(int Number)
        {
            throw new Exception("1111");
            //lock (locker)
            //{
            //    if (NumberList == null)
            //    {
            //        Console.WriteLine("执行添加");
            //        NumberList = new List<int>();
            //        NumberList.Add(1);
            //        //Thread.Sleep(1000);
            //    }
            //}
            //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
            //Console.WriteLine(Number);
        }
    }

    class Program
    {
        private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            try
            {
                Parallel.For(0, 10, (i) =>
                {
                    try
                    {
                        TestClass a = new TestClass();
                        a.Test(i);
                    }
                    catch (Exception ex)
                    {
                        lock (locker)
                        {
                            errList.Add(ex.Message);
                            throw ex;
                        }
                    }
                });
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的错误:{1}", Index++, err);
            }
        }
    }
}

System.ObjectDisposedException:在
parallelOptions 中与 System.Threading.CancellationToken 关联的
System.Threading.CancellationTokenSource已被释放。

Program

说明:

  可以向下面这样来处理一下
  不在AggregateExcepation中来处理 而是在Parallel里面的try
catch来记录错误,或处理错误

1)不支持浮点和步进。

图片 23图片 24

2)无法保证迭代的执行顺序。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class TestClass
    {
        public static List<int> NumberList = null;
        private static readonly object locker = new object();
        public void Test(int Number)
        {
            throw new Exception("1111");
            //lock (locker)
            //{
            //    if (NumberList == null)
            //    {
            //        Console.WriteLine("执行添加");
            //        NumberList = new List<int>();
            //        NumberList.Add(1);
            //        //Thread.Sleep(1000);
            //    }
            //}
            //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
            //Console.WriteLine(Number);
        }
    }

    class Program
    {
        private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            Parallel.For(0, 10, (i) =>
            {
                try
                {
                    TestClass a = new TestClass();
                    a.Test(i);
                }
                catch (Exception ex)
                {
                    lock (locker)
                    {
                        errList.Add(ex.Message);
                    }
                    //Console.WriteLine(ex.Message);
                    //注:这里不再将错误抛出.....
                    //throw ex;
                }
            });

            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的错误:{1}", Index++, err);
            }
        }
    }
}

3)如果fromInclusive大于或等于toExclusive,方法立即返回而不会执行任何迭代。

Program

4)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。

 

5)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。

 

3 Parallel.ForEach

方法

1)public static ParallelLoopResult
ForEach(IEnumerable<TSource> source, Action<TSource>
body);

2)public static ParallelLoopResult
ForEach<TSource>(IEnumerable<TSource> source,
ParallelOptions parallelOptions, Action<TSource,
ParallelLoopState> body);

3)public static ParallelLoopResult
ForEach<TSource>(Partitioner<TSource> source,
Action<TSource> body);

参数:

source:数据源

body:将被每个迭代调用一次的委托。

parallelOptions:一个对象,用于配置此操作的行为。

返回结果:

ParallelLoopResult
:包含有关已完成的循环部分的信息。

异常:

System.ArgumentNullException:source
参数为 null。-或- 方body 参数为 null。

System.AggregateException:包含了所有线程上引发的全部单个异常。

对于方法2)还包括:

System.OperationCanceledException:在
parallelOptions 设置了参数 System.Threading.CancellationToken。

System.ObjectDisposedException:在
parallelOptions 中与 System.Threading.CancellationToken 关联的
System.Threading.CancellationTokenSource已被释放。

对于3)包括的异常为:

System.ArgumentNullException:source
参数为 null。-或- 方body 参数为 null。

System.InvalidOperationException:source
分区程序中的
System.Collections.Concurrent.Partitioner<TSource>.SupportsDynamicPartitions
属性返回 false。或 在 source 分区程序中的任何方法返回 null
时引发异常。或在source 分区程序中的
System.Collections.Concurrent.Partitioner<TSource>.GetPartitions(System.Int32)方法不返回正确数目的分区。

说明:

1)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。

2)Parallel.ForEach方法不保证执行顺序,它不像foreach循环那样总是顺序执行。

3)对于方法3)中的source,它的类型是Partitioner<TSource>。可以使用Partitioner.Create方法创建分区,该方法的几个重整方法为:

l public static
OrderablePartitioner<Tuple<int, int>> Create(int
fromInclusive, int toExclusive);

l public static
OrderablePartitioner<Tuple<int, int>> Create(int
fromInclusive, int toExclusive, int rangeSize);

fromInclusive为范围下限(含),toExclusive为范围下限(不含),rangeSize为每个子范围的大小。

使用Partitioner创建的子范围大小默认大约是计算机内核的三倍,而当使用rangeSize指定范围大小时,那么子范围大小为指定值。

4)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。

 

4 ParallelOptions

定义:

存储选项,用于配置
System.Threading.Tasks.Parallel 类的方法。

ParallelOptions属性

1)public CancellationToken
CancellationToken { get; set; }

获取或设置传播有关应取消操作的通知。

2)public int MaxDegreeOfParallelism {
get; set; }

获取或设置此 ParallelOptions
实例所允许的最大并行度。

发表评论

电子邮件地址不会被公开。 必填项已用*标注