`

利用迭代器让异步操作更加“人性化”-山寨版的AsyncEnumerator

阅读更多
在msdn上读到 使用 AsyncEnumerator 简化 APM 一文,深感启发,但是找寻power threading库的源代码未果,遂山寨之,简陋不周之处多多包涵。

.NET下通过AsyncCallback回调来实现异步访问IO,比如FileStream,NetworkStream,之类的都有BeginXXX,和EndXXX等成对出现的方法,将实现AsyncCallback回调的方法作为参数传递给BeginXXX方法,在AsyncCallback回调的方法中调用EndXXX方法来结束一次异步回调。这是异步操作的一般模式。

先看一段同步的读取文件的方法:
Code
<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1             byte[] buffer = new byte[SegmentLength];
 2             int readcount = 0;
 3             int readtotal = 0;
 4             while (true)
 5             {
 6                 readcount = IOStream.Read(buffer, 0, SegmentLength);
 7                 readtotal += readcount;
 8                 yield return buffer;
 9                 if (readtotal >= Length)
10                 {
11                     break;
12                 }
13             }
如果改成异步的方式,这个方法就要被拆分成两个方法,由于一次read只能读取一段,那么还需要额外的同步读取进度的对象,比如int rc,long total,byte[] buffer之类的具备变量就需要升格成为类变量。而读取的逻辑也由于方法被拆分而变得很难理解,代码如下:

Code
<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1         int rc;
 2         long total;
 3         byte[] buffer = new byte[1024];
 4         FileStream fs;
 5         long length;
 6         MemoryStream ms = new MemoryStream();
 7         public void ReadFile(string FileName)
 8         {
 9             fs = new FileStream(FileName, FileMode.Open, FileAccess.Read);
10             length = fs.Length;
11             fs.BeginRead(buffer, 01024, EndRead, null);
12         }
13         public void EndRead(IAsyncResult Ar)
14         {
15             rc = fs.EndRead(Ar);
16             total += rc;
17             ms.Write(buffer, 0, rc);
18             if (total < length)
19             {
20                 fs.BeginRead(buffer, 01024, EndRead, null);
21             }
22         }
我们可以看到这样子的代码很晦涩,且IO类的资源回收成了一个问题,在同步的模式下在一个方法里只需要使用using结构就不用担心IO的资源回收,这里我们就需要外部的类实现IDisposable接口来保证对IO资源的回收。
为了使异步模式更加容易理解,我们需要把分开地方法合并起来。前文利用迭代器在.NET中实现“超轻量级线程” 中我提到了迭代器的效果是把一个方法拆分成了多个方法,那么这里我们反过来,把多个方法合并成一个方法,其实也可以使用迭代器。但是由于异步模式是在多线程下工作的,所以我们不能使用foreach来驱动迭代器,所以就需要一个AsyncEnumerator的类,由于找不到Power Threading的源码,所以我们自己来山寨一个出来。

我们先来看使用了AsyncEnumerator来驱动的异步读取文件的代码:
Code
<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1 IEnumerable ReadFile(AsyncEnumerator Ae, MemoryStream Data)
 2         {
 3             int rc = 0;
 4             long total = 0;
 5             FileInfo fi = new FileInfo(@"a file path XXXXXXX mask");
 6             byte[] bf = new byte[1024];
 7             using (FileStream fs = new FileStream(fi.FullName, FileMode.Open, FileAccess.Read))
 8             {
 9                 fs.BeginRead(bf, 01024, Ae.EndAction, null);
10                 yield return 1;
11                 while (true)
12                 {
13                     rc = fs.EndRead(Ae.CurrentAr);
14                     total += rc;
15                     Data.Write(bf, 0, rc);
16                     if (total >= fi.Length)
17                     {
18                         fs.Close();
19                         break;
20                     }
21                     else
22                     {
23                         fs.BeginRead(bf, 01024, Ae.EndAction, null);
24                         yield return 1;
25                     }
26                 }
27                 yield break;
28             }
29         }

这段代码看起来就和同步读取时的逻辑差不多了,在一个方法内部,我们也可以使用using结构来确保回收资源。我们看到在beginread的时候使用了AsyncEnumerator的EndAction方法作为回调的方法,这个方法内部其实就是调用Enumerator对象的MoveNext方法,那么之后就会继续执行yield return 下边的代码。这里由于我们做了简化,一个AsyncEnumerator只能handle一个stream的一步操作,所以这里yield return 返回任意值都无所谓,由于EndXXX方法需要当前一步操作的结果对象一个IAsyncResult的对象作为参数,所以在AsyncEnumerator中有一个当前结果的属性,而在每一次EndAction的时候则用当前的结果对象更新之。遂得到AsyncEnumerator的内容为:
Code
<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1 class AsyncEnumerator
 2     {
 3         AsyncCallback end;
 4 
 5         public IAsyncResult CurrentAr
 6         {
 7             get;
 8             set;
 9         }
10 
11         public void EndAction(IAsyncResult Ar)
12         {
13             CurrentAr = Ar;
14             if (!enumerator.MoveNext())
15             {
16               
17             }
18         }
19         
20 }
但是这个时候我们无法开始整个迭代的过程,所以我们需要给AsyncEnumerator增加一个开始迭代的方法,同时,完成后也需要一个通知,所以开始迭代的方法里要增加一个回调方法的委托作为参数,由于在这个方法里驱动Enumerator的迭代所以也需要把IEnumerator的对象也传进去。最后得到完整的AsyncEnumerator的代码为:
Code
<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1    class AsyncEnumerator
 2     {
 3         IEnumerator enumerator;
 4         AsyncCallback end;
 5 
 6         public IAsyncResult CurrentAr
 7         {
 8             get;
 9             set;
10         }
11 
12         public void EndAction(IAsyncResult Ar)
13         {
14             CurrentAr = Ar;
15             if (!enumerator.MoveNext())
16             {
17                 end(Ar);
18             }
19         }
20         public void Start(IEnumerator Enumerator, AsyncCallback End)
21         {
22             enumerator = Enumerator;
23             if (enumerator.MoveNext())
24             {
25                 end = End;
26             }
27         }
28 
29     }

最后我们用他来读取一个文件:
Code
<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1     class Program
 2     {
 3         static MemoryStream data = new MemoryStream();
 4 
 5         static void Main(string[] args)
 6         {
 7             AsyncEnumerator Ae = new AsyncEnumerator();
 8             IEnumerator ie = ReadFile(Ae, data).GetEnumerator();
 9             Ae.Start(ie, End);
10             Console.Read();
11         }
12 
13         static void End(IAsyncResult Ar)
14         {
15             string txt = Encoding.UTF8.GetString(data.ToArray());
16             Console.WriteLine(txt);
17         }
18 
19         static IEnumerable ReadFile(AsyncEnumerator Ae, MemoryStream Data)
20         {
21             int rc = 0;
22             long total = 0;
23             FileInfo fi = new FileInfo(@"SOME FILE PATH XXXXXX");
24             byte[] bf = new byte[1024];
25             using (FileStream fs = new FileStream(fi.FullName, FileMode.Open, FileAccess.Read))
26             {
27                 fs.BeginRead(bf, 01024, Ae.EndAction, null);
28                 yield return 1;
29                 while (true)
30                 {
31                     rc = fs.EndRead(Ae.CurrentAr);
32                     total += rc;
33                     Data.Write(bf, 0, rc);
34                     if (total >= fi.Length)
35                     {
36                         fs.Close();
37                         break;
38                     }
39                     else
40                     {
41                         fs.BeginRead(bf, 01024, Ae.EndAction, null);
42                         yield return 1;
43                     }
44                 }
45                 yield break;
46             }
47         }
48 
49     }





分享到:
评论
2 楼 老八牛 2016-04-29  
   
1 楼 老八牛 2016-02-20  
为什么看不到代码?

相关推荐

Global site tag (gtag.js) - Google Analytics