APM之异步IO-1

本文涉及的产品
云拨测,每月3000次拨测额度
简介: 概念 异步执行计算限制的操作,可以使用线、线程池、Task在多个内核上调度任务,使多个线程并发的工作,从而高效的使用系统资源,同时提升应用程序的吞吐能力。 异步I/O操作,允许将任务交由硬件设备处理,期间完全不占用线程和CPU资源,这样系统资源可以高效的使用。

概念

异步执行计算限制的操作,可以使用线、线程池、Task在多个内核上调度任务,使多个线程并发的工作,从而高效的使用系统资源,同时提升应用程序的吞吐能力。

异步I/O操作,允许将任务交由硬件设备处理,期间完全不占用线程和CPU资源,这样系统资源可以高效的使用。I/O操作的结果是由线程池来处理的。

 

Windows中使用I/O完成端口的形式完成异步I/O<Windows核心编程>有详细描述。

执行异步操作是构建高性能、可伸缩应用程序的关键,它允许用非常少的线程执行许多操作,和线程池配合,异步操作允许利用机器的所有CPU

为此,CLR团队提供了一种模式:异步编程模型(Asynchronous Programming Modelm APM)

Sytem.IO.Stream的类型都提供了BeginReadBeginWrite的类别的异步操作

 

通过使用Wintellect Power Threading库可以显著减少异步开发的复杂性,如下是个例子:

    private static void ImplementedViaAsyncEnumerator()

    {

        // Start 1 server per CPU

        for (Int32 n = 0; n < Environment.ProcessorCount; n++)

        {

            var ae = new AsyncEnumerator();

            ae.BeginExecute(PipeServerAsyncEnumerator(ae), ae.EndExecute);

        }

 

        // Now make a 100 client requests against the server

        for (Int32 n = 0; n < 100; n++)

        {

            var ae = new AsyncEnumerator();

            ae.BeginExecute(PipeClientAsyncEnumerator(ae, "localhost""Request #" + n), ae.EndExecute);

        }

    }

 

    // This field records the timestamp of the most recent client's request

    private static DateTime s_lastClientRequestTimestamp = DateTime.MinValue;

 

    // The SyncGate enforces thread-safe access to the s_lastClientRequestTimestamp field

    private static readonly SyncGate s_gate = new SyncGate();

 

    private static IEnumerator<Int32> PipeServerAsyncEnumerator(AsyncEnumerator ae)

    {

        // Each server object performs asynchronous operations on this pipe

        using (var pipe = new NamedPipeServerStream(

           "Echo"PipeDirection.InOut, -1, PipeTransmissionMode.Message,

           PipeOptions.Asynchronous | PipeOptions.WriteThrough))

        {

 

            // Asynchronously accept a client connection

            pipe.BeginWaitForConnection(ae.End(), null);

            /*ae.End 这个方法返回一个委托,它引用了AsyncEnumerator内部的方法

            操作完成时,线程池通知AsyncEnumerator对象继续执行yield return 1语句后面的迭代器方法

             */

            yield return 1; //异步操作的地方,使用这个允许线程返回它原来的地方,便于它做更多的工作

 

            // A client connected, let's accept another client

            // 客户连接后再建立一个新的对象提供服务

            var aeNewClient = new AsyncEnumerator();

            aeNewClient.BeginExecute(PipeServerAsyncEnumerator(aeNewClient), aeNewClient.EndExecute);

 

            // Accept the client connection

            //DequeueAsyncResult返回当异步操作完成时,由线程池线程传给AsyncEnumerator对象的IAsyncResult对象

            pipe.EndWaitForConnection(ae.DequeueAsyncResult());

 

            // Asynchronously read a request from the client

            Byte[] data = new Byte[1000];

            pipe.BeginRead(data, 0, data.Length, ae.End(), null);

            yield return 1;

 

            // The client sent us a request, process it. 

            Int32 bytesRead = pipe.EndRead(ae.DequeueAsyncResult());

 

            // Get the timestamp of this client's request

            DateTime now = DateTime.Now;

 

            // We want to save the timestamp of the most-recent client request. Since multiple

            // clients are running concurrently, this has to be done in a thread-safe way

            s_gate.BeginRegion(SyncGateMode.Exclusive, ae.End()); // Request exclusive access

            yield return 1;   // The iterator resumes when exclusive access is granted

 

            if (s_lastClientRequestTimestamp < now)

                s_lastClientRequestTimestamp = now;

 

            s_gate.EndRegion(ae.DequeueAsyncResult());   // Relinquish exclusive access

 

            // My sample server just changes all the characters to uppercase

            // But, you can replace this code with any compute-bound operation

            data = Encoding.UTF8.GetBytes(

               Encoding.UTF8.GetString(data, 0, bytesRead).ToUpper().ToCharArray());

 

            // Asynchronously send the response back to the client

            pipe.BeginWrite(data, 0, data.Length, ae.End(), null);

            yield return 1;

            // The response was sent to the client, close our side of the connection

            pipe.EndWrite(ae.DequeueAsyncResult());

        } // Close the pipe NamedPipeServerStream对象销毁

    }

 

    private static IEnumerator<Int32> PipeClientAsyncEnumerator(AsyncEnumerator ae, String serverName, String message)

    {

        // Each client object performs asynchronous operations on this pipe

        using (var pipe = new NamedPipeClientStream(serverName, "Echo",

              PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough))

        {

            pipe.Connect(); // Must Connect before setting ReadMode

            pipe.ReadMode = PipeTransmissionMode.Message;

 

            // Asynchronously send data to the server

            Byte[] output = Encoding.UTF8.GetBytes(message);

            pipe.BeginWrite(output, 0, output.Length, ae.End(), null);

            yield return 1;

 

            // The data was sent to the server

            pipe.EndWrite(ae.DequeueAsyncResult());

 

            // Asynchronously read the server's response

            Byte[] data = new Byte[1000];

            pipe.BeginRead(data, 0, data.Length, ae.End(), data);

            yield return 1;

 

            // The server responded, display the response and close out connection

            Int32 bytesRead = pipe.EndRead(ae.DequeueAsyncResult());

 

             Console.WriteLine("Server response: " + Encoding.UTF8.GetString(data, 0, bytesRead));

        }  // Close();      

}

IAsyncResult转为Task

异步的通用接口IAsyncResult,使用Task的形式可以加入其它的控制(返回值处理等),如:

 

普通的异步处理方法

        // Instead of this:

        WebRequest webRequest = WebRequest.Create("http://Wintellect.com/");

        webRequest.BeginGetResponse(result =>

        {

            WebResponse webResponse = null;

            try

            {

                webResponse = webRequest.EndGetResponse(result);

                Console.WriteLine("Content length: " + webResponse.ContentLength);

            }

            catch (WebException we)

            {

                Console.WriteLine("Failed: " + we.GetBaseException().Message);

            }

            finally

            {

                if (webResponse != null) webResponse.Close();

            }

        }, null);

 

        Console.ReadLine();  // for testing purposes

 

使用Task的形式进行请求

        // Make a Task from an async operation that FromAsync starts

        /*WebRequest*/

        webRequest = WebRequest.Create("http://Wintellect.com/");

        var t1 = Task.Factory.FromAsync<WebResponse>(webRequest.BeginGetResponse, webRequest.EndGetResponse, nullTaskCreationOptions.None);

        var t2 = t1.ContinueWith(task =>

        {

            WebResponse webResponse = null;

            try

            {

                webResponse = task.Result;

                Console.WriteLine("Content length: " + webResponse.ContentLength);

            }

            catch (AggregateException ae)

            {

                if (ae.GetBaseException() is WebException)

                    Console.WriteLine("Failed: " + ae.GetBaseException().Message);

                else throw;

            }

            finally { if (webResponse != null) webResponse.Close(); }

        });

 

        try

        {

            t2.Wait();  // for testing purposes only

        }

        catch (AggregateException) { }

 

应用程序及其线程处理模型

每个应用程序都可能引入自己的线程处理模型

控制台、Windows服务、Asp.netWeb Service:没有引入任何种类的线程处理模型,任何线程都可以在任何时候做它爱做的任何事情

GUI程序(Windows Form WPF Silverlight):引入了一个线程处理模型,在这个模型中,创建窗口的线程是唯一能对那个窗口进行更新的线程

 

Private static AsyncCallback SyncContextCallback(AsyncCallback callback)

    {

// Capture the calling thread's SynchronizationContext-derived object

SynchronizationContextsc = SynchronizationContext.Current;

 

// If there is no SC, just return what was passed in

if (sc == null) return callback;

 

// Return a delegate that, when invoked, posts to the captured SC a method that

// calls the original AsyncCallback passing it the IAsyncResult argument

returnasyncResult =>sc.Post(result => callback((IAsyncResult)result), asyncResult);

    }

 

如下的形式在UI程序中使用以上的处理就不用再处理手动的线程切换问题了

            var webRequest = WebRequest.Create("http://Wintellect.com/");

            webRequest.BeginGetResponse(SyncContextCallback(ProcessWebResponse), webRequest);

            base.OnMouseClick(e);

 

        private void ProcessWebResponse(IAsyncResult result)

        {

            // If we get here, this must be the GUI thread, it's OK to update the UI

            var webRequest = (WebRequest)result.AsyncState;

            using (var webResponse = webRequest.EndGetResponse(result))

            {

                Text = "Content length: " + webResponse.ContentLength;

            }

        }

 

详细参考:

Clr Via C#

http://transbot.blog.163.com

http://ys-f.ys168.com/?CLR_via_CSharp_3rd_Edition_Code_by_Jeffrey_Richter.zip_55bism1e0e7bkisjthit2bso0cm5bs4bs1b5bktnql0c0bu22f05f12z

相关文章
|
5月前
|
存储 网络协议 Linux
2.10 高性能异步IO机制:io_uring
2.10 高性能异步IO机制:io_uring
313 0
|
1月前
|
JavaScript Unix Linux
IO多路复用:提高网络应用性能的利器
IO多路复用:提高网络应用性能的利器
|
3月前
|
存储 消息中间件 缓存
高效并发处理之必备利器:线程池
高效并发处理之必备利器:线程池
|
4月前
|
监控 分布式数据库 流计算
Flink 异步IO优化任务
Flink 异步IO优化任务
38 0
|
4月前
|
网络协议 Linux
与epoll媲美的异步io机制io_uring
与epoll媲美的异步io机制io_uring
64 0
|
5月前
|
监控 关系型数据库 调度
盘点5个.Net开发的服务器进程监控、性能监控、任务调度的开源项目
盘点5个.Net开发的服务器进程监控、性能监控、任务调度的开源项目
125 0
|
7月前
|
设计模式 缓存 JavaScript
学完大佬的分布式系统核心:进程间的通信,事件驱动后,我顿悟了
在GUI编程中,事件是非常常见的。比如,用户在界面单击了按钮,就会发送一个“点击”事件,相应地,会有一个处理“点击”事件的事件处理器来处理该事件。因此,所谓事件驱动,简单地说就是你点什么按钮(即产生什么事件),计算机就执行什么操作(即调用什么函数)。当然事件也不仅限于用户的操作。事件驱动的核心自然是事件。
|
8月前
|
C# 开发者
C# 开发者技术:进程间数据共享之管道(Pipes)-异步通信版
主要类 1.NamedPipeClientStream 2.NamedPipeServerStream 解释:命名管道是一种进程间通信的方式,它允许不同进程之间在同一台机器上进行通信
447 2
C# 开发者技术:进程间数据共享之管道(Pipes)-异步通信版
|
存储 Java 容器
网络编程实战之高级篇, 彻底解决面试C10k问题, 高并发服务器, IO多路复用, 同时监视多个IO事件
网络编程实战之高级篇, 彻底解决面试C10k问题, 高并发服务器, IO多路复用, 同时监视多个IO事件
网络编程实战之高级篇, 彻底解决面试C10k问题, 高并发服务器, IO多路复用, 同时监视多个IO事件
|
消息中间件 API
HarmonyOS系统内核中消息队列的实现
大家好,今天主要来聊一聊,如何使用HarmonyOS开发实现消息队列。
200 0
HarmonyOS系统内核中消息队列的实现