C#并行编程-Task

简介: 原文:C#并行编程-Task菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,如有错误,欢迎指正。 任务简介 TPL引入新的基于任务的编程模型,通过这种编程模型可以发挥多核的功效,提升应用程序的性能,不需要编写底层复杂且重量级的线程代码。
原文: C#并行编程-Task

菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,如有错误,欢迎指正。

任务简介

TPL引入新的基于任务的编程模型,通过这种编程模型可以发挥多核的功效,提升应用程序的性能,不需要编写底层复杂且重量级的线程代码。

但需要注意:任务并不是线程(任务运行的时候需要使用线程,但并不是说任务取代了线程,任务代码是使用底层的线程(软件线程,调度在特定的硬件线程或逻辑内核上)运行的,任务与线程之间并没有一对一的关系。)

创建一个新的任务时,调度器(调度器依赖于底层的线程池引擎)会使用工作窃取队列找到一个最合适的线程,然后将任务加入队列,任务所包含的代码会在一个线程中运行。如图:

 

System.Threading.Tasks.Task

一个Task表示一个异步操作,Task提供了很多方法和属性,通过这些方法和属性能够对Task的执行进行控制,并且能够获得其状态信息。

Task的创建和执行都是独立的,因此可以对关联操作的执行拥有完全的控制权。

使用Parallel.For、Parallel.ForEach的循环迭代的并行执行,TPL会在后台创建System.Threading.Tasks.Task的实例。

使用Parallel.Invoke时,TPL也会创建与调用的委托数目一致的System.Threading.Tasks.Task的实例。

 

注意项

程序中添加很多异步的操作作为Task实例加载的时候,为了充分利用运行时所有可用的逻辑内核,任务调度器会尝试的并行的运行这些任务,也会尝试在所有的可用内核上对工作进行负载均衡。

但在实际的编码过程当中,并不是所有的代码片段都能够方便的用任务来运行,因为任务会带来额外的开销,尽管这种开销比添加线程所带来的开销要小,但是仍然需要将这个开销考虑在内。

Task状态与生命周期

一个Task实例只会完成其生命周期一次,当Task到达它的3种肯呢过的最终状态之一是,就无法回到之前的任何状态

下面贴代码,详解见注释,方便大家理解Task的状态:

    class Program
    {
        /*  coder:释迦苦僧    */
        static void Main(string[] args)
        {
            /*  创建一个任务 不调用 不执行  状态为Created */
            Task tk = new Task(() =>
            {
            });
            Console.WriteLine(tk.Status.ToString());

            /*  创建一个任务 执行  状态为 WaitingToRun */
            Task tk1 = new Task(() =>
            {
            });
            tk1.Start();/*对于安排好的任务,就算调用Start方法也不会立马启动 此时任务的状态为WaitingToRun*/
            Console.WriteLine(tk1.Status.ToString());

            /*  创建一个主任务 */
            Task mainTask = new Task(() =>
            {
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 30000);
            });
            /*  将子任务加入到主任务完成之后执行 */
            Task subTask = mainTask.ContinueWith((t1) =>
            {
            });
            /*  启动主任务 */
            mainTask.Start();
            /*  此时子任务状态为 WaitingForActivation */
            Console.WriteLine(subTask.Status.ToString());


            /*  创建一个任务 执行 后 等待一段时间 并行未结束的情况下 状态为 Running */
            Task tk2 = new Task(() =>
            {
                SpinWait.SpinUntil(() => false, 30000);
            });
            tk2.Start();/*对于安排好的任务,就算调用Start方法也不会立马启动*/
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk2.Status.ToString());


            /*  创建一个任务 然后取消该任务 状态为Canceled */
            CancellationTokenSource cts = new CancellationTokenSource();
            Task tk3 = new Task(() =>
            {
                for (int i = 0; i < int.MaxValue; i++)
                {
                    if (!cts.Token.IsCancellationRequested)
                    {
                        cts.Token.ThrowIfCancellationRequested();
                    }
                }
            }, cts.Token);
            tk3.Start();/*启动任务*/
            SpinWait.SpinUntil(() => false, 100);
            cts.Cancel();/*取消该任务执行 但并非立马取消 所以对于Canceled状态也不会立马生效*/
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);

            /*创建一个任务 让它成功的运行完成 会得到 RanToCompletion 状态*/
            Task tk4 = new Task(() =>
            {
                SpinWait.SpinUntil(() => false, 10);
            });
            tk4.Start();
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk4.Status.ToString());

            /*创建一个任务 让它运行失败 会得到 Faulted 状态*/
            Task tk5 = new Task(() =>
            {
                throw new Exception();
            });
            tk5.Start();
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk5.Status.ToString());

            Console.ReadLine();
        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

 

使用任务来对代码进行并行化

使用Parallel.Invoke可以并行加载多个方法,使用Task实例也能完成同样的工作,下面贴代码:

    class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:释迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);});
            Task tk2 = new Task(() => SetProduct(2));
            tk1.Start();
            tk2.Start();
          
          
            Console.ReadLine();
        }
        static void SetProduct(int index)
        {
            Parallel.For(0, 10000, (i) =>
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            });
            Console.WriteLine("SetProduct {0} 执行完成", index);
        }
    } 
    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

等待任务完成Task.WaitAll
Task.WaitAll 方法,这个方法是同步执行的,在Task作为参数被接受,所有Task结束其执行前,主线程不会继续执行下一条指令,下面贴代码

    class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:释迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); });
            Task tk2 = new Task(() => SetProduct(2));
            tk1.Start();
            tk2.Start();
            /*等待任务执行完成后再输出 ====== */
            Task.WaitAll(tk1, tk2);
            Console.WriteLine("等待任务执行完成后再输出 ======");

            Task tk3 = new Task(() => { SetProduct(1); SetProduct(3); });
            Task tk4 = new Task(() => SetProduct(2));
            tk3.Start();
            tk4.Start();
            /*等待任务执行前输出 ====== */
            Console.WriteLine("等待任务执行前输出 ======");
            Task.WaitAll(tk3, tk4);


            Console.ReadLine();
        }
        static void SetProduct(int index)
        {
            Parallel.For(0, 10000, (i) =>
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            });
            Console.WriteLine("SetProduct {0} 执行完成", index);
        }
    }
View Code

Task.WaitAll 限定等待时长

            queue = new ConcurrentQueue<Product>();
            Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);}); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); /*如果tk1 tk2 没能在10毫秒内完成 则输出 ***** */ if (!Task.WaitAll(new Task[] { tk1, tk2 }, 10)) { Console.WriteLine("******"); } Console.ReadLine();
View Code

如图10毫秒没有完成任务,则输出了****

通过取消标记取消任务

通过取消标记来中断Task实例的执行。 CancellationTokenSource,CancellationToken下的IsCanceled属性标志当前是否已经被取消,取消任务,任务也不一定会马上取消,下面贴代码:

    class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:释迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            System.Threading.CancellationTokenSource token = new CancellationTokenSource();
            Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Task tk2 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Thread.Sleep(10);
            /*取消任务操作*/
            token.Cancel();
            try
            {
                /*等待完成*/
                Task.WaitAll(new Task[] { tk1, tk2 });
            }
            catch (AggregateException ex)
            {
                /*如果当前的任务正在被取消,那么还会抛出一个TaskCanceledException异常,这个异常包含在AggregateException异常中*/
                Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
                Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
            }

            Thread.Sleep(2000);
            Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
            Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
            Console.ReadLine();
        }
        static void SetProduct(System.Threading.CancellationToken ct)
        {
            /* 每一次循环迭代,都会有新的代码调用 ThrowIfCancellationRequested 
             * 这行代码能够对 OpreationCanceledException 异常进行观察
             * 并且这个异常的标记与Task实例关联的那个标记进行比较,如果两者相同 ,而且IsCancelled属性为True,那么Task实例就知道存在一个要求取消的请求,并且会将状态转变为Canceled状态,中断任务执行。  
             * 如果当前的任务正在被取消,那么还会抛出一个TaskCanceledException异常,这个异常包含在AggregateException异常中
            /*检查取消标记*/
            ct.ThrowIfCancellationRequested();
            for (int i = 0; i < 50000; i++)
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            
                ct.ThrowIfCancellationRequested();
            }
            Console.WriteLine("SetProduct   执行完成");
        }
    }
    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

Task异常处理 当很多任务并行运行的时候,可能会并行发生很多异常。Task实例能够处理一组一组的异常,这些异常有System.AggregateException类处理

    class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:释迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            System.Threading.CancellationTokenSource token = new CancellationTokenSource();
            Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Thread.Sleep(2000);
            if (tk1.IsFaulted)
            {
                /*  循环输出异常    */
                foreach (Exception ex in tk1.Exception.InnerExceptions)
                {
                    Console.WriteLine("tk1 Exception:{0}", ex.Message);
                }
            }
            Console.ReadLine();
        }

        static void SetProduct(System.Threading.CancellationToken ct)
        {
            for (int i = 0; i < 5; i++)
            {
                throw new Exception(string.Format("Exception Index {0}", i));
            }
            Console.WriteLine("SetProduct   执行完成");
        }
    }
View Code

Task返回值  Task<TResult>

    class Program
    {
        /*  coder:释迦苦僧    */
        static void Main(string[] args)
        {
            Task<List<Product>> tk1 = Task<List<Product>>.Factory.StartNew(() => SetProduct());
            Task.WaitAll(tk1);
            Console.WriteLine(tk1.Result.Count);
            Console.WriteLine(tk1.Result[0].Name);
            Console.ReadLine();
        }
        static List<Product> SetProduct()
        {
            List<Product> result = new List<Product>();
            for (int i = 0; i < 500; i++)
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                result.Add(model);
            }
            Console.WriteLine("SetProduct   执行完成");
            return result;
        }
    }
View Code

通过延续串联多个任务

ContinueWith:创建一个目标Task完成时,异步执行的延续程序,await,如代码所示:

    class Program
    {
        /*  coder:释迦苦僧    */
        static void Main(string[] args)
        {
            /*创建任务t1*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("执行 t1 任务");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

            });
            /*创建任务t2   t2任务的执行 依赖与t1任务的执行完成*/
            Task t2 = t1.ContinueWith((t) =>
            {
                Console.WriteLine("执行 t2 任务"); 
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

            });    
            /*创建任务t3   t3任务的执行 依赖与t2任务的执行完成*/
            Task t3 = t2.ContinueWith((t) =>
            {
                Console.WriteLine("执行 t3 任务");
            });
            Console.ReadLine();
        }
    }
View Code

TaskContinuationOptions

TaskContinuationOptions参数,可以控制延续另一个任的任务调度和执行的可选行为。下面看代码:

    class Program
    {
        /*  coder:释迦苦僧    */
        static void Main(string[] args)
        {
            /*创建任务t1*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("执行 t1 任务");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);
                throw new Exception("异常");
            });

            /*创建任务t2   t2任务的执行 依赖与t1任务的执行完成*/
            Task t2 = t1.ContinueWith((t) =>
            {
                Console.WriteLine(t.Status);
                Console.WriteLine("执行 t2 任务");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

                /*定义 TaskContinuationOptions 行为为 NotOnFaulted 在 t1 任务抛出异常后,t1 的任务状态为 Faulted , 则t2 不会执行里面的方法 但是需要注意的是t3任务*/
                /*t2在不符合条件时 返回Canceled状态状态让t3任务执行*/
            }, TaskContinuationOptions.NotOnFaulted);
            /*创建任务t3   t3任务的执行 依赖与t2任务的执行完成*/

            /*t2在不符合条件时 返回Canceled状态状态让t3任务执行*/
            Task t3 = t2.ContinueWith((t) =>
            {
                Console.WriteLine(t.Status);
                Console.WriteLine("执行 t3 任务");
            });

            Console.ReadLine();
        }
    }
View Code

TaskContinuationOptions 属性有很多,如下所示

 关于并行编程中的Task就写到这,如有问题,请指正。


作者:释迦苦僧 出处:http://www.cnblogs.com/woxpp/p/3928788.html
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。

目录
相关文章
|
14天前
|
开发框架 前端开发 .NET
C#编程与Web开发
【4月更文挑战第21天】本文探讨了C#在Web开发中的应用,包括使用ASP.NET框架、MVC模式、Web API和Entity Framework。C#作为.NET框架的主要语言,结合这些工具,能创建动态、高效的Web应用。实际案例涉及企业级应用、电子商务和社交媒体平台。尽管面临竞争和挑战,但C#在Web开发领域的前景将持续拓展。
|
14天前
|
SQL 开发框架 安全
C#编程与多线程处理
【4月更文挑战第21天】探索C#多线程处理,提升程序性能与响应性。了解C#中的Thread、Task类及Async/Await关键字,掌握线程同步与安全,实践并发计算、网络服务及UI优化。跟随未来发展趋势,利用C#打造高效应用。
|
14天前
|
存储 安全 网络安全
C#编程的安全性与加密技术
【4月更文挑战第21天】C#在.NET框架支持下,以其面向对象和高级特性成为安全软件开发的利器。本文探讨C#在安全加密领域的应用,包括使用System.Security.Cryptography库实现加密算法,利用SSL/TLS保障网络传输安全,进行身份验证,并强调编写安全代码的重要性。实际案例涵盖在线支付、企业应用和文件加密,展示了C#在应对安全挑战的同时,不断拓展其在该领域的潜力和未来前景。
|
14天前
|
人工智能 C# 开发者
C#编程中的图形界面设计
【4月更文挑战第21天】本文探讨了C#在GUI设计中的应用,介绍了Windows Forms、WPF和UWP等常用框架,强调了简洁界面、响应式设计和数据绑定等最佳实践。通过实际案例,展示了C#在企业应用、游戏开发和移动应用中的GUI实现。随着技术发展,C#在GUI设计的未来将趋向于跨平台、更丰富的组件和AI集成,为开发者创造更多可能性。
|
14天前
|
存储 算法 C#
C#编程与数据结构的结合
【4月更文挑战第21天】本文探讨了C#如何结合数据结构以构建高效软件,强调数据结构在C#中的重要性。C#作为面向对象的编程语言,提供内置数据结构如List、Array和Dictionary,同时也支持自定义数据结构。文章列举了C#实现数组、链表、栈、队列等基础数据结构的示例,并讨论了它们在排序、图算法和数据库访问等场景的应用。掌握C#数据结构有助于编写高性能、可维护的代码。
|
14天前
|
开发框架 Linux C#
C#编程的跨平台应用
【4月更文挑战第21天】C#与.NET Core的结合使得跨平台应用开发变得高效便捷,提供统一编程模型和高性能。丰富的类库、活跃的社区支持及Visual Studio Code、Xamarin等工具强化了其优势。广泛应用在企业系统、云服务和游戏开发中,虽面临挑战,但随着技术进步,C#在跨平台开发领域的前景广阔。
|
14天前
|
人工智能 C# 云计算
C#编程的未来发展趋向
【4月更文挑战第21天】C#编程未来将深化跨平台支持,强化云计算与容器技术集成,如.NET Core、Docker。在AI和ML领域,C#将提供更丰富框架,与AI芯片集成。语言和工具将持续创新,优化异步编程,如Task、async和await,提升多核性能。开源生态的壮大将吸引更多开发者,共创更多机遇。
|
2月前
|
C#
24. C# 编程:用户设定敌人初始血值的实现
24. C# 编程:用户设定敌人初始血值的实现
23 0
|
14天前
|
程序员 C#
C#编程中的面向对象编程思想
【4月更文挑战第21天】本文探讨了C#中的面向对象编程,包括类、对象、封装、继承和多态。类是对象的抽象,定义属性和行为;对象是类的实例。封装隐藏内部细节,只暴露必要接口。继承允许类复用和扩展属性与行为,而多态使不同类的对象能通过相同接口调用方法。C#通过访问修饰符实现封装,使用虚方法和抽象方法实现多态。理解并应用这些概念,能提升代码的清晰度和可扩展性,助你成为更好的C#程序员。
|
14天前
|
开发框架 安全 .NET
C#编程高手的成长之路
【4月更文挑战第21天】本文揭示了成为C#编程高手的路径:牢固掌握基础知识和面向对象编程,深入了解C#特性如泛型和委托,精通ASP.NET等框架工具,养成良好编程习惯,持续学习实践并参与开源项目,勇于挑战创新。通过这些步骤,不断提升编程技能,迈向C#编程的巅峰。