• 下午一口气把EC的后面几章读完聊,有点晕!
  • Jerry大牛的那几篇文章还是要好好读下的,Jerry大牛
  • 使用P/Invoke来实现对完成端口的C#编程,熟悉下.Net 3.5下Marshal类的新特性,复习非托管类型封送的关键点。[代码工程]

1。工具以及应用资料:P/Invoke的值以及引用封送CLR的封送处理;建议使用P/Invoke的兄弟去看看这个工具,P/Invoke Interop Assistant 1.0[含代码],MS的P/Invoke买一送一工具!

2。线程池搭配完成端口的实现[和Jerry写的那个基本上一个模子倒出来的]

  • 花了点时间把Jerry的版本[核心5第10章,IO完成端口的例子]用P/Invoke转为C#的版本,主要再熟悉下P/Invoke在3.5环境中的用法和改进。[比2003强大多了]犯了几个错误,还有一些心得,都整理下吧,计划把Jerry的例子都转为C#放在自己的函数库里面,这样快速开发的时候应该有很多可用的素材。毕竟。NET的界面开发还是要方便的多。
  • 正题之前感概一下,唉!线程池,完成端口这些重武器被我拿来写异步拷贝文件,大才小用了!

1.一个误用关于PreserveSig = false

P/Invoke原型如下:

[DllImportAttribute("kernel32.dll", EntryPoint = "CreateFileW", SetLastError = true /*PreserveSig = false*/)]
        public static extern IntPtr CreateFileW([InAttribute()] [MarshalAsAttribute(UnmanagedType.LPWStr)] string lpFileName, FileAccessRight dwDesiredAccess, FileSharedMode dwShareMode,
            [InAttribute()] IntPtr lpSecurityAttributes, CreatePosition dwCreationDisposition, FileFlagAndAttribute dwFlagsAndAttributes, [InAttribute()] IntPtr hTemplateFile);
先看下<.NET本质论>的描述:“DllImport在OLE32Wrapper.CoSomeAPI32的情况下,以HRESULT返回的方法为short值,对于用底层的[out,retval]参数”。再看下MSDN的描述:

简单的说,这个参数是将HRESULT返回忽略,代之以“[out,retval]”的参数[多out参数呢?],然后用COMexception来处理返回异常。OK,而此处封装的CreateFileW并不满足这个条件,如果让引擎忽略了返回句柄,那等于白干了!

2.封送类型[Marshal Type],没什么说的,Don的书上说的很清楚。这里再次崇拜MS的强大,那个工具里面可变类型和不可变类型能够轻松识别,当然这一切的基础在于正规化的Win32变量命名。[再次说明了开发要正规化的远见卓识]

3.P/Invoke的基石:值类型的默认Marshal处理[这个文章是P/Invoke必读!]由于CLR要与非托管模式交互,其类型布局和访问权限就必须要在类型定义中显式指明,这也就是很多人遇到了“无法预知长度”异常的原因了!注意怎么指定Union结构体的布局,本质论上有,也可以参见文章!

4.P/Invoke的麻烦:本地代码和CLR环境的转化,这就是P/Invoke被称为怪物的原因了!转化起来很麻烦,而且有的时候不得不面临一些资源泄露[虽然我们可以说GC在,没问题,但有的地方GC却无法插手!而那个时候,就该你挽起袖子干活]

所以,第一,对于为IntPtr用Marshal显式分配的内存,必须在托管环境下显式回收!第二,当你需要处理指向指针的指针,并且存在类派生时[很多是指向值类型的指针],要在托管环境中显式处理。第三,IntPtr这个怪物,是在托管环境中的非托管指针,所以请你千万要给它以及它的孪生兄弟指向的托管类型一个内存,在用Marshal.StructureToPtr之前!动手编程之前最好看下小巧但威力强大的Marshal,比原来的版本强了不止一个数量级,呵呵!可以直接从非托管环境里面读取值类型,包括上面提到的StructurToPtr,和逆过程

  • 下面是IORequest的代码,提醒自己InPtr的回收和指向指针的指针处理[还好这里是值类型,可以用ref省去一些力气]

[StructLayout(LayoutKind.Sequential)]
    public class IORequest : OVERLAPPED, IDisposable
    {
        //private field
        public static int BUFFSIZE = 64 * 1024;
        private uint m_nBuffSize;
        private IntPtr m_pvData;

        /// <summary>
        /// Initializes a new instance of the <see cref="IORequest"/> class.
        /// </summary>
        public IORequest()
        {
            this.Internal = this.InternalHigh = 0;
            this.Union1.Struct1.Offset = this.Union1.Struct1.OffsetHigh = 0;
            hEvent = IntPtr.Zero;
            m_nBuffSize = 0; m_pvData = IntPtr.Zero;
        }

        /// <summary>
        /// Thises to int PTR.OVERLAPPED method!
        /// </summary>
        /// <returns></returns>
        private IntPtr ThisToIntPtr()
        {
            OVERLAPPED cur = (OVERLAPPED)this;
            IntPtr pnt = Marshal.AllocHGlobal(Marshal.SizeOf(cur));//显式类型转化,已经非托管指针转化
            Marshal.StructureToPtr(cur, pnt, false);

            return pnt;
        }

        /// <summary>
        /// Allocs the buffer.
        /// </summary>
        /// <param name="nBuffSize">Size of the n buff.</param>
        /// <returns></returns>
        public bool AllocBuffer(uint nBuffSize)
        {
            m_nBuffSize = nBuffSize;
            m_pvData = Marshal.AllocHGlobal((int)nBuffSize);
            return (m_pvData != IntPtr.Zero);
        }

        /// <summary>
        /// Reads the specified h device.
        /// </summary>
        /// <param name="hDevice">The h device.</param>
        /// <param name="pliOffset">The pli offset.</param>
        /// <returns></returns>
        public bool Read(IntPtr hDevice, IntPtr pliOffset)
        {
            if (pliOffset != IntPtr.Zero)
            {
                LARGE_INTEGER liOffset = (LARGE_INTEGER)Marshal.PtrToStructure(pliOffset, typeof(LARGE_INTEGER));
                this.Union1.Struct1.Offset = liOffset.Struct1.LowPart;
                this.Union1.Struct1.OffsetHigh = (uint)liOffset.Struct1.HighPart;
            }
            // Initialize unmanged memory to hold the struct, sign! dummy implementation
           
            return (Win32File.ReadFile(hDevice, m_pvData, m_nBuffSize, IntPtr.Zero, ThisToIntPtr()));
        }

        /// <summary>
        /// Writes the specified h device.
        /// </summary>
        /// <param name="hDevice">The h device.</param>
        /// <param name="pliOffset">The pli offset.</param>
        /// <returns></returns>
        public bool Write(IntPtr hDevice, IntPtr pliOffset)
        {
            if (pliOffset != IntPtr.Zero)
            {
                LARGE_INTEGER liOffset = (LARGE_INTEGER)Marshal.PtrToStructure(pliOffset, typeof(LARGE_INTEGER));
                this.Union1.Struct1.Offset = liOffset.Struct1.LowPart;
                this.Union1.Struct1.OffsetHigh = (uint)liOffset.Struct1.HighPart;
            }
            // Initialize unmanged memory to hold the struct, sign! dummy implementation
            return (Win32File.WriteFile(hDevice, m_pvData, m_nBuffSize, IntPtr.Zero, ThisToIntPtr()));
        }

        #region IDisposable Members
        public void Dispose()
        {
            if (m_pvData != IntPtr.Zero) Marshal.FreeHGlobal(m_pvData);//挽起袖子干的脏活,GC应该干不料,它没那么聪明!
        }
        #endregion

    }

  • 完成端口配合线程池的实现[这个线程的入口是一个委托的异步调用,至于为什么异步是个线程,本质论的异步方法调用P193有描述,这个模型的后续产物就是,我们在 private void Completed(IAsyncResult call) 里面需要UI线程判断以及另一个委托来更新界面,具体的实现可以参见Chris Sells的《Windows Form程序设计》第14章 多线程环境下的用户界面 或者MSDN里面搜索InvokeRequired或者UIDispatch]。这个括号打太大了,见笑,下面是将Jerry的代码转化为托管之后的实现,当然代码有点C++的风格,呵呵!Jerry的实现是用UI线程来处理的,所以要死一段时间。这里没有这个问题,不过,我写的很粗糙,设计模式完全没学好!

        /// <summary>
        /// Threads the proc.
        /// </summary>
        public void ThreadProc()
        {
            //文件初始化处理

            //following logic: to create IOCompletionPort then start Four [Herusitic Method] ThreadPool Task to I/O handling!
            //however, how about file lock? please dwell on it!
            this.ioCP.AssociateDevice(this.hFileSrc, CK_READ);
            this.ioCP.AssociateDevice(this.hFileDist, CK_WRITE);

            //shared parameters intialization
            this.nReadsInProgress = 0;
            this.nWritesInProgress = 4;
            this.liNextReadOffset = new LARGE_INTEGER();
            ioTaskDone.Reset();//Events
            //ThreadPool for Task item
            ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadPoolWorkItem));

            //Then IOrequest item
            for (int i = 0; i < 4; i++)
            {
                Debug.Assert(ioRequest[i].AllocBuffer(BUFFSIZE));
                IntPtr ior_ptr = Marshal.AllocHGlobal(Marshal.SizeOf(ioRequest[i]));
                Marshal.StructureToPtr(ioRequest[i], ior_ptr, false);
                this.ioCP.PostStatus(CK_WRITE, 0, ior_ptr);//Post to IO Completion Port
            }

            ioTaskDone.WaitOne();
            //shrink 文件结束处理
        }

        //ThreadPool WorkItem
        private void ThreadPoolWorkItem(object state)
        {
            //ignore the loop
            while (this.nReadsInProgress > 0 || this.nWritesInProgress > 0)
            {
                //Get IO Completion Key from caller
                uint CompletionKey = 0, dwNumBytes = 0;
                IORequest curRequest = new IORequest();
                IntPtr pior = Marshal.AllocHGlobal(Marshal.SizeOf(curRequest));
                this.ioCP.GetStatus(ref CompletionKey, ref dwNumBytes, ref pior, Win32Helper.INFINITE);
                //convert pior back to curRequest
                Marshal.PtrToStructure(pior, curRequest);

                //judge by Completion Key, back c
                if (CompletionKey == CK_READ)
                {
                    Interlocked.Decrement(ref this.nReadsInProgress);
                    if (!curRequest.Write(this.hFileDist, IntPtr.Zero))
                    {
                        //Debug.WriteLine(Marshal.GetLastWin32Error());
                    }                   
                    Interlocked.Increment(ref this.nWritesInProgress);
                }
                else if (CompletionKey == CK_WRITE)
                {
                    Interlocked.Decrement(ref this.nWritesInProgress);
                    if (this.liNextReadOffset.QuadPart < this.liFileSizeDist.QuadPart)
                    {
                        //not end
                        IntPtr nextReads = Marshal.AllocHGlobal(Marshal.SizeOf(this.liNextReadOffset));
                        Marshal.StructureToPtr(this.liNextReadOffset, nextReads, true);
                        if (!curRequest.Read(this.hFileSrc, nextReads))
                        {
                            //Debug.WriteLine(Marshal.GetLastWin32Error());
                        }
                        Interlocked.Increment(ref this.nReadsInProgress);
                        Interlocked.Add(ref this.liNextReadOffset.QuadPart, BUFFSIZE);
                    }
                }
            }
            this.ioTaskDone.Set();//release resource
        }

  • APM from Jerry Richter. [if you have read Jerry’s previous books, or see his speech online, yeah….funny!]
1。you need to avoid having state data reside on the thread’s stack because data on the thread’s stack cannot migrate to another thread.[Lost virtual pointer, Common Sense].
2。you need to split up your code into callback methods, sometimes called continuations.[Yeah,in delegation, we also get familiar with that, however, someone will forget on which thread they’re handling with!]
3。Third, there are many useful programming constructs—such as try/catch/finally statements, the lock statement, the using statement, and even loops [lock,Mointor.enter(),try()….,OK,don’t forget the basic unit of synblock under CLR is object!]
4。Finally, trying to offer features such as coordinating the result of multiple overlapping operations, cancellation, timeout, and marshaling UI control modifications to the GUI thread in a Windows® Forms or Windows Presentation Foundation (WPF) application adds even more complexity to using the APM.[OK,the master seems to say that take care of single UI principle, yeah, it’s important and sometims always be ingored by some guys]
 

Orlando try to do a simple exercise,howeve,just a minute,a glass of water.Let’s enjoy coding time now! Found it’s a little bit easy for me to complete, yeah, let’s turn to implementation, however, just take care of coding from byte to char.

using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Text;
using System.Diagnostics;
using System.Threading;

namespace APMpro
{
    /// <summary>
    /// APM programming learnt from Jerry richter
    /// </summary>
    class Program
    {
        static string file = "..\\..\\1.mht";
        static int len = 8192;

        static void Main(string[] args)
        {
            SynMethod();
            ApmPatternWithMultipleMethods();
            ApmLamdba();
            Console.Read();
        }

        //dummy implemenation for process data
        static void processData(byte[] data)
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("***************************************************");
            //UnicodeEncoding _curCoding = new UnicodeEncoding();
           
            //Console.WriteLine(new String(_curCoding.GetChars(data)));
            char[] csofFile=new char[Encoding.ASCII.GetCharCount(data, 0, data.Length)];
            Encoding.ASCII.GetChars(data, 0, data.Length, csofFile, 0/*csofFile.Length*/);
            Console.WriteLine(csofFile);
            Console.WriteLine("***************************************************");
        }

        #region synmethod
        private static void SynMethod()
        {
            using (FileStream inFile = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, len, FileOptions.None))
            {
                Byte[] data = new Byte[inFile.Length];
                inFile.Read(data, 0, len);
                processData(data);
                inFile.Close();
            }
        }
        #endregion

        #region APMwithAsyn
        private sealed class ApmData
        {
            public FileStream fs;
            public byte[] m_data;
        }

        private static void ApmPatternWithMultipleMethods()
        {
            FileStream inFile = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, len, FileOptions.Asynchronous);//don’t use using
            ApmData _data = new ApmData();
            _data.m_data = new byte[len];
            _data.fs = inFile;
            inFile.BeginRead(_data.m_data, 0, len, ReadCompleted, _data);
            //after read completed
        }

        private static void ReadCompleted(IAsyncResult result)
        {
            ApmData _curdata = (ApmData)result.AsyncState;
            //read bytes
            Int32 readBytes = _curdata.fs.EndRead(result);
            processData(_curdata.m_data);
            Debug.WriteLine("Read Completed : " + readBytes);
            _curdata.fs.Close();
        }

        #endregion

        #region APMwithLamdba Expression
        private static void ApmLamdba()
        {
            FileStream inFile = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, len, FileOptions.Asynchronous);//don’t use using
            byte[] data = new byte[len];
            inFile.BeginRead(data, 0, data.Length, result=>
            {
                Int32 bytesRead = inFile.EndRead(result);
                processData(data);
                inFile.Close(); // No ‘using’
            }, null);
        }
        #endregion
    }
}

1。一个是要加锁,Dictionary是无同步锁的集合,但这里还好是对集合的对象处理!本来是想对集合加锁的,但想了下好像没必要,这样的代码会长一些!

2。不过可以用MethodImpl来进行方法同步,细细想下,八杆子打不着!所以,直接对操作对象加锁就可以了!

3。ThreadPool里面用dummy来避免传递参数![试验了下和SilverLight一样线程池对象在Asp。Net里面都支持了,想当年做Asp。Net的时候,还没有这些东西,连脚本都是非常恶心的方式搞定的,时代在进步呀!忽然想到了一个东西如果那个时候有LINQ就好了,不用搞得那么头大了,做个中间层写了爆多的东西!PS:都是垃圾]

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Text;

namespace SynCache
{
    /// <summary>
    /// Sample for cache updating with ThreadPool implementation
    /// </summary>
    class Program
    {/// <summary>
        /// Medium : items, which are synchronized with Dictionary in the p/s cache.
        /// </summary>
        public class Medium
        {
            /// <summary>
            /// Gets or sets the content of Medium.
            /// </summary>
            /// <value>The store.</value>
            public object Store { get; set; }

            /// <summary>
            /// Gets or sets a value indicating whether this instance is updating.
            /// </summary>
            /// <value>
            ///  <c>true</c> if this instance is updating; otherwise, <c>false</c>.
            /// </value>
            public bool IsUpdating { get; set; }
        }

        /// <summary>
        /// Task items
        /// </summary>
        public class Task
        {
            private Func<object> func;
            private Medium target;
            public Task(Medium _t, Func<object> _func)
            {
                func = _func; target = _t;
            }
            public void ThreadProc(object dummy)
            {
                lock (target)
                {
                    target.IsUpdating = true;
                    Console.WriteLine("begin write");
                    target.Store = func.Invoke();
                    Console.WriteLine("end write");
                    target.IsUpdating = false;
                }
            }
        }

        public class DictionaryCache : System.Collections.IEnumerable
        {
            /// <summary>
            /// cache for dictionary
            /// </summary>
            private Dictionary<int, Medium> cache;

            //opertator [] for Dictionary
            public object this[int key]
            {
                get
                {//only way to get synchronization operation
                    if (!cache.ContainsKey(key)) return null;
                    return cache[key].Store;
                }
            }

            public void Add(int key, Func<object> func)
            {
                if (cache.ContainsKey(key))
                {//having key, update with ThreadPool
                    var elem = cache[key];
                    if (elem.IsUpdating) return;
                    Task workItem = new Task(elem, func);
                    try
                    {
                        ThreadPool.QueueUserWorkItem(new WaitCallback(workItem.ThreadProc), workItem);
                    }
                    catch
                    {
                    }
                }
                else
                {//for the current thread
                    Console.WriteLine("Begin first write");
                    cache.Add(key, new Medium { Store = func() });
                    Console.WriteLine("End first write");
                }
            }

            public DictionaryCache()
            {
                this.cache = new Dictionary<int, Medium>();
            }

            #region IEnumerable Members
            public System.Collections.IEnumerator GetEnumerator()
            {
                return cache.GetEnumerator();
            }
            #endregion
        }
        static void Main(string[] args)
        {
            var cache = new DictionaryCache();
            while (true)
            {
                cache.Add(1, GetValue);
                Thread.Sleep(100);
                Console.WriteLine(cache[1]);
            }
        }

        static object GetValue()
        {
            Thread.Sleep(400);//simulate database access
            return DateTime.Now;
        }
    }
}

Advertisements