lith 4 tahun lalu
induk
melakukan
fa55df3294
23 mengubah file dengan 1132 tambahan dan 65 penghapusan
  1. 253 0
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Fast/DeliveryClient.cs
  2. 242 0
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Fast/DeliveryConnection.cs
  3. 389 0
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Fast/DeliveryServer.cs
  4. 1 1
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Simple/DeliveryClient.cs
  5. 1 1
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Simple/DeliveryConnection.cs
  6. 1 1
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Simple/DeliveryServer.cs
  7. 34 9
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/OrganizeClientBuilder.cs
  8. 36 7
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/OrganizeServerBuilder.cs
  9. 5 1
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Sers.CL.Socket.Iocp.csproj
  10. 9 3
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.ThreadWait/DeliveryConnection.cs
  11. 2 1
      dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/CmClient/appsettings.json
  12. 1 0
      dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/CmServer/appsettings.json
  13. 4 0
      dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryClient/DeliveryClient.csproj
  14. 4 0
      dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryServer/DeliveryServer.csproj
  15. 6 4
      dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryServer/Program.cs
  16. 4 2
      dotnet/Library/Sers/Sers.Core/Sers.Core/Module/Env/UsageReporter.cs
  17. 2 6
      dotnet/Library/Vit/Vit.Core/Test/Vit.Core.MsTest/Util/Threading/LongTaskHelp_Test.cs
  18. 38 0
      dotnet/Library/Vit/Vit.Core/Test/Vit.Core.MsTest/Util/Threading/SersTimer_SingleThread_Test.cs
  19. 38 0
      dotnet/Library/Vit/Vit.Core/Test/Vit.Core.MsTest/Util/Threading/SersTimer_Test.cs
  20. 8 2
      dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/ByteDataExtensions.cs
  21. 2 21
      dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/DataPool.cs
  22. 7 6
      dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Threading/SersTimer.cs
  23. 45 0
      dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Threading/SersTimer_SingleThead.cs

+ 253 - 0
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Fast/DeliveryClient.cs

@@ -0,0 +1,253 @@
+// https://freshflower.iteye.com/blog/2285286
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using Sers.Core.CL.MessageDelivery;
+using Vit.Core.Module.Log;
+using Vit.Core.Util.Net;
+using Vit.Core.Util.Pool;
+using Vit.Core.Util.Threading;
+
+namespace Sers.CL.Socket.Iocp.Mode.Fast
+{
+    public class DeliveryClient : IDeliveryClient
+    {
+
+
+        DeliveryConnection _conn = new DeliveryConnection();
+        public IDeliveryConnection conn => _conn;
+
+
+        public Action<IDeliveryConnection, ArraySegment<byte>> Conn_OnGetFrame { set { _conn.OnGetFrame = value; } }
+
+
+        public Action<IDeliveryConnection> Conn_OnDisconnected { set => _conn.Conn_OnDisconnected = value; }
+
+
+
+
+
+        /// <summary>
+        ///  服务端 host地址(默认 "127.0.0.1" )。例如: "127.0.0.1"、"sers.cloud"。
+        /// </summary>
+        public string host = "127.0.0.1";
+        /// <summary>
+        /// 服务端 监听端口号(默认4501)。例如: 4501。
+        /// </summary>
+        public int port = 4501;
+
+
+        /// <summary>
+        /// 接收缓存区大小
+        /// </summary>
+        public int receiveBufferSize = 8 * 1024;
+
+
+        public DeliveryClient()
+        {
+
+            _conn.receiveEventArgs = receiveEventArgs = new SocketAsyncEventArgs();
+
+            receiveEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
+
+        }
+
+
+
+
+
+
+        #region Connect Close
+
+
+
+        public bool Connect()
+        {
+            try
+            {
+                Logger.Info("[CL.DeliveryClient] Socket.Iocp,connecting... host:" + host + " port:" + port);
+
+                //(x.1) Instantiates the endpoint and socket.
+                var hostEndPoint = new IPEndPoint(NetHelp.ParseToIPAddress(host), port);
+                socket = new global::System.Net.Sockets.Socket(hostEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+
+                _conn.Init(socket);
+
+                var buff = DataPool.BytesGet(receiveBufferSize);
+                _conn.receiveEventArgs.SetBuffer(buff, 0, buff.Length);
+
+
+                //(x.2)
+                SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
+                connectArgs.RemoteEndPoint = hostEndPoint;
+                connectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnect);
+
+                autoResetEvent_OnConnected.Reset();
+                socket.ConnectAsync(connectArgs);
+
+
+                //(x.3) 阻塞. 让程序在这里等待,直到连接响应后再返回连接结果
+                if (!autoResetEvent_OnConnected.WaitOne(10000))
+                    return false;
+
+                if (connectArgs.SocketError != SocketError.Success)
+                {
+                    return false;
+                }
+
+
+
+                //(x.4)                
+                Send_timer.timerCallback = Send_Flush;
+                Send_timer.Start();
+
+
+
+
+                Logger.Info("[CL.DeliveryClient] Socket.Iocp,connected.");
+                return true;
+
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+            return false;
+        }
+
+
+        public void Close()
+        {
+            Send_timer.Stop();
+
+            if (null == _conn) return;
+            var conn = _conn;
+            _conn = null;
+            conn.Close();
+        }
+        #endregion
+
+
+
+        #region Iocp
+
+
+        readonly SocketAsyncEventArgs receiveEventArgs;
+
+
+        private global::System.Net.Sockets.Socket socket = null;
+
+
+        // Signals a connection.
+        private AutoResetEvent autoResetEvent_OnConnected = new AutoResetEvent(false);
+
+
+        // Calback for connect operation
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private void OnConnect(object sender, SocketAsyncEventArgs e)
+        {
+            // Signals the end of connection.
+            autoResetEvent_OnConnected.Set(); //释放阻塞.
+
+            //如果连接成功,则初始化socketAsyncEventArgs
+            if (e.SocketError == SocketError.Success)
+            {
+                //启动接收,不管有没有,一定得启动.否则有数据来了也不知道.
+                if (!socket.ReceiveAsync(receiveEventArgs))
+                    ProcessReceive(receiveEventArgs);
+            }
+            else
+            {
+                Close();
+            }
+
+        }
+
+
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        void IO_Completed(object sender, SocketAsyncEventArgs e)
+        {
+            // determine which type of operation just completed and call the associated handler
+            switch (e.LastOperation)
+            {
+                case SocketAsyncOperation.Receive:
+                    ProcessReceive(e);
+                    break;
+                case SocketAsyncOperation.Send:
+                    Logger.Info("[Iocp]IO_Completed Send");
+                    return;
+
+                //ProcessSend(e);
+                //break;
+                default:
+                    Logger.Info("[Iocp]IO_Completed default");
+                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
+            }
+        }
+
+        // This method is invoked when an asynchronous receive operation completes. 
+        // If the remote host closed the connection, then the socket is closed.  
+        // If data was received then the data is echoed back to the client.
+        //
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private void ProcessReceive(SocketAsyncEventArgs e)
+        {
+            try
+            {
+                // check if the remote host closed the connection 
+                if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
+                {
+                    //读取数据                  
+                    _conn.AppendData(new ArraySegment<byte>(e.Buffer, e.Offset, e.BytesTransferred));
+
+                    byte[] buffData = DataPool.BytesGet(receiveBufferSize);
+                    e.SetBuffer(buffData, 0, buffData.Length);
+
+                    // start loop
+                    if (!socket.ReceiveAsync(e))
+                        ProcessReceive(e);
+                    return;
+                }
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+            Close();
+        }
+
+        #endregion
+
+
+        #region Send
+
+        SersTimer_SingleThread Send_timer = new SersTimer_SingleThread { intervalMs = 1 };
+
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        void Send_Flush(object state)
+        {
+            try
+            {
+                _conn.Flush();
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+        }
+
+        #endregion
+
+
+
+
+
+
+    }
+}

+ 242 - 0
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Fast/DeliveryConnection.cs

@@ -0,0 +1,242 @@
+using System;
+using System.Collections.Concurrent;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+using Sers.Core.CL.MessageDelivery;
+using Vit.Core.Module.Log;
+using Vit.Core.Util.Pipelines;
+using Vit.Extensions;
+
+namespace Sers.CL.Socket.Iocp.Mode.Fast
+{
+    public class DeliveryConnection : IDeliveryConnection
+    {
+
+        public SocketAsyncEventArgs receiveEventArgs;
+
+
+        Sers.Core.Util.StreamSecurity.SecurityManager _securityManager;
+        public Sers.Core.Util.StreamSecurity.SecurityManager securityManager { set => _securityManager = value; }
+      
+
+        /// <summary>
+        /// 连接状态(0:waitForCertify; 2:certified; 4:waitForClose; 8:closed;)
+        /// </summary>
+        public byte state { get; set; } = DeliveryConnState.waitForCertify;
+
+
+        /// <summary>
+        /// 通信SOCKET
+        /// </summary>
+        public global::System.Net.Sockets.Socket socket { get; private set; }
+
+        /// <summary>
+        /// 连接时间
+        /// </summary>
+        private DateTime connectTime { get; set; }
+
+
+ 
+
+
+
+        /// <summary>
+        /// 请勿处理耗时操作,需立即返回。接收到客户端的数据事件
+        /// </summary>
+        public Action<IDeliveryConnection, ArraySegment<byte>> OnGetFrame { private get; set; }
+
+
+        public Action<IDeliveryConnection> Conn_OnDisconnected { get; set; }
+
+
+        public void Init(global::System.Net.Sockets.Socket socket)
+        {
+            this.socket = socket;
+            connectTime = DateTime.Now;
+        }
+
+        public void Close()
+        {
+            if (socket == null) return;
+
+
+            state = DeliveryConnState.closed;
+
+            var socket_ = socket;
+            socket = null;
+
+          
+
+            try
+            {
+                socket_.Close();
+                socket_.Dispose();
+
+                //socket_.Shutdown(SocketShutdown.Both);
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+
+            try
+            {
+                Conn_OnDisconnected?.Invoke(this);
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+
+        }
+
+
+     
+
+
+
+        #region AppendData        
+
+        PipeFrame pipe = new PipeFrame() { OnDequeueData = ArraySegmentByteExtensions.ReturnToPool };
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public void AppendData(ArraySegment<byte> data)
+        {            
+            pipe.Write(data);
+
+            while (pipe.TryRead_SersFile(out var msgFrame))
+            {
+                _securityManager?.Decryption(msgFrame);
+                OnGetFrame.Invoke(this, msgFrame);
+            }
+        }
+        #endregion
+
+
+
+        #region Send
+
+        ConcurrentQueue<ByteData> queue = new ConcurrentQueue<ByteData>();
+
+
+        const int buffLength = 1000;
+        ByteData[] list = new ByteData[buffLength];
+        int[] count = new int[buffLength];
+
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
+        {
+            queue.Enqueue(data);
+        }
+
+
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public void Flush()
+        {
+            int curIndex;
+             
+            try
+            {
+                while (true)
+                {
+                    curIndex = 0;
+                    while (true)
+                    {
+                        if (queue.TryDequeue(out var item))
+                        {
+                            list[curIndex++] = item;
+
+                            if (curIndex == buffLength)
+                            {
+                                break;
+                            }
+                        }
+                        else
+                        {
+                            if (curIndex == 0) return;
+                            break;
+                        }
+                    }
+                    var bytes = ByteDataArrayToBytes(list, curIndex);
+                    try
+                    {
+                        socket.SendAsync(bytes.BytesToArraySegmentByte(), SocketFlags.None);
+                        //socket.SendAsync(data, SocketFlags.None);
+                    }
+                    catch (Exception ex)
+                    {
+                        Logger.Error(ex);
+                        Close();
+                    }
+
+                    if (curIndex < buffLength)
+                    {
+                        return;
+                    }
+                }
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+        }
+
+
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        unsafe byte[] ByteDataArrayToBytes(ByteData[] byteDataArray, int arrayCount)
+        {
+            //(x.1)get count
+            int sumCount = 0;
+
+            int curCount;
+            int arrayIndex;
+            for (arrayIndex = 0; arrayIndex < arrayCount; arrayIndex++)
+            {
+                var byteData = byteDataArray[arrayIndex];
+                curCount = 0;
+                foreach (var item in byteData.byteArrayList)
+                {
+                    curCount += item.Count;
+                }
+                count[arrayIndex] = curCount;
+                sumCount += curCount;
+            }
+
+
+            //(x.2)copy data
+            var bytes = new byte[sumCount + arrayCount * 4];
+            arrayIndex = 0;
+            curCount = 0;
+
+            int curLength;
+
+            fixed (byte* pTarget = bytes)
+            {
+                for (arrayIndex = 0; arrayIndex < arrayCount; arrayIndex++)
+                {
+                    var byteData = byteDataArray[arrayIndex];
+                    ((int*)(pTarget + curCount))[0] = curLength = count[arrayIndex];
+                    curCount += 4;
+
+                    foreach (var item in byteData.byteArrayList)
+                    {
+                        if (null == item.Array || item.Count == 0) continue;
+                        fixed (byte* pSource = item.Array)
+                        {
+                            Buffer.MemoryCopy(pSource + item.Offset, pTarget + curCount, item.Count, item.Count);
+                        }
+                    }
+                    _securityManager?.Encryption(new ArraySegment<byte>(bytes, curCount, curLength));
+                    curCount += curLength;
+                }
+            }
+            return bytes;
+        }
+
+        #endregion
+
+    }
+}

+ 389 - 0
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Fast/DeliveryServer.cs

@@ -0,0 +1,389 @@
+//  https://freshflower.iteye.com/blog/2285272 
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using Sers.Core.CL.MessageDelivery;
+using Vit.Core.Module.Log;
+using Vit.Core.Util.Net;
+using Vit.Core.Util.Pool;
+using Vit.Core.Util.Threading;
+
+namespace Sers.CL.Socket.Iocp.Mode.Fast
+{
+    public class DeliveryServer: IDeliveryServer
+    {
+
+
+        public Sers.Core.Util.StreamSecurity.SecurityManager securityManager;
+
+        /// <summary>
+        /// 服务端 监听地址。若不指定则监听所有网卡。例如: "127.0.0.1"、"sers.cloud"。
+        /// </summary>
+        public string host = null;
+
+        /// <summary>
+        /// 服务端 监听端口号(默认4501)。例如: 4501。
+        /// </summary>
+        public int port = 4501;
+
+
+
+        /// <summary>
+        /// 接收缓存区大小
+        /// </summary>
+        public int receiveBufferSize = 8 * 1024; 
+
+
+        public Action<IDeliveryConnection> Conn_OnDisconnected { private get; set; }
+        public Action<IDeliveryConnection> Conn_OnConnected { private get; set; }
+      
+
+
+        /// <summary>
+        /// 最大连接数
+        /// </summary>
+        private int maxConnectCount;
+
+
+        public int MaxConnCount { get { return maxConnectCount; }
+            set {
+                maxConnectCount = value;
+                m_maxNumberAcceptedClients = new Semaphore(maxConnectCount, maxConnectCount);
+                //pool_ReceiveEventArgs.Capacity = maxConnectCount;
+            }
+        }
+
+
+
+        /// <summary>
+        ///  connHashCode -> DeliveryConnection
+        /// </summary>
+        readonly ConcurrentDictionary<int, DeliveryConnection> connMap = new ConcurrentDictionary<int, DeliveryConnection>();
+
+        public IEnumerable<IDeliveryConnection> ConnectedList => connMap.Values.Select(conn=>((IDeliveryConnection)conn));
+
+
+        public DeliveryServer()
+        {
+            MaxConnCount = 20000;
+        }
+
+
+
+
+        #region Start Stop
+
+
+        public bool Start()
+        {
+            Stop();
+
+            try
+            {
+                Logger.Info("[CL.DeliveryServer] Socket.Iocp,starting... host:" + host + " port:" + port);
+
+                //(x.1)
+                connMap.Clear();
+
+                //(x.2)               
+                Send_timer.timerCallback = Send_Flush;
+                Send_timer.Start();
+
+                //(x.3)
+                IPEndPoint localEndPoint = new IPEndPoint(String.IsNullOrEmpty(host) ? IPAddress.Any : NetHelp.ParseToIPAddress(host), port);
+                listenSocket = new global::System.Net.Sockets.Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+                listenSocket.Bind(localEndPoint);
+
+
+                // start the server with a listen backlog of 100 connections
+                listenSocket.Listen(maxConnectCount);
+                // post accepts on the listening socket
+                StartAccept(null);
+
+                Logger.Info("[CL.DeliveryServer] Socket.Iocp,started.");
+                return true;
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+            return false;
+        }
+
+
+        /// <summary>
+        /// 停止服务
+        /// </summary>
+        public void Stop()
+        {
+
+            Send_timer.Stop();
+
+
+            if (listenSocket == null) return;
+
+            var listenSocket_ = listenSocket;
+            listenSocket = null;
+
+            //(x.1) stop conn
+            ConnectedList.ToList().ForEach(Delivery_OnDisconnected);
+            connMap.Clear();
+
+            //(x.2) close Socket
+            try
+            {
+                listenSocket_.Close();
+                listenSocket_.Dispose();
+                //listenSocket_.Shutdown(SocketShutdown.Both);
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+
+        }
+        #endregion
+
+
+        #region Iocp
+
+
+        global::System.Net.Sockets.Socket listenSocket;
+
+        Semaphore m_maxNumberAcceptedClients;
+
+
+        #region ReceiveEventArgs
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        SocketAsyncEventArgs ReceiveEventArgs_Create(global::System.Net.Sockets.Socket socket)
+        {
+            var conn = Delivery_OnConnected(socket);           
+
+            SocketAsyncEventArgs receiveEventArgs = new SocketAsyncEventArgs();
+            receiveEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
+
+
+            var buff = DataPool.BytesGet(receiveBufferSize);
+            receiveEventArgs.SetBuffer(buff, 0, buff.Length);
+ 
+            receiveEventArgs.UserToken = conn;
+            conn.receiveEventArgs = receiveEventArgs;
+
+            return receiveEventArgs;
+        }
+
+        //ObjectPool<SocketAsyncEventArgs> pool_ReceiveEventArgs = new ObjectPool<SocketAsyncEventArgs>();
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        void ReceiveEventArgs_Release(SocketAsyncEventArgs receiveEventArgs)
+        {
+            receiveEventArgs.UserToken = null;
+            //pool_ReceiveEventArgs.Push(receiveEventArgs);
+        }
+        #endregion
+
+
+
+      
+        // Begins an operation to accept a connection request from the client 
+        //
+        // <param name="acceptEventArg">The context object to use when issuing 
+        // the accept operation on the server's listening socket</param>
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private void StartAccept(SocketAsyncEventArgs acceptEventArgs)
+        {
+            if (acceptEventArgs == null)
+            {
+                acceptEventArgs = new SocketAsyncEventArgs();
+                acceptEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
+            }
+            else
+            {
+                // socket must be cleared since the context object is being reused
+                acceptEventArgs.AcceptSocket = null;
+            }
+
+            m_maxNumberAcceptedClients.WaitOne();
+            if (!listenSocket.AcceptAsync(acceptEventArgs))
+            {
+                AcceptEventArg_Completed(null,acceptEventArgs);
+            }
+        }
+
+        // This method is the callback method associated with Socket.AcceptAsync 
+        // operations and is invoked when an accept operation is complete
+        //
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
+        {
+            try
+            {          
+                // Get the socket for the accepted client connection and put it into the 
+                //ReadEventArg object user token
+                SocketAsyncEventArgs receiveEventArgs = ReceiveEventArgs_Create(acceptEventArgs.AcceptSocket);               
+
+                if (!acceptEventArgs.AcceptSocket.ReceiveAsync(receiveEventArgs))
+                {
+                    ProcessReceive(receiveEventArgs);
+                }
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+
+            // Accept the next connection request
+            if (acceptEventArgs.SocketError == SocketError.OperationAborted) return;
+            StartAccept(acceptEventArgs);
+        }
+
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public void IO_Completed(object sender, SocketAsyncEventArgs e)
+        {
+            // determine which type of operation just completed and call the associated handler
+            switch (e.LastOperation)
+            {
+                case SocketAsyncOperation.Receive:
+                    ProcessReceive(e);
+                    break;
+                case SocketAsyncOperation.Send:
+                    Logger.Info("[Iocp]IO_Completed Send");
+                    return;
+                //    ProcessSend(e);
+                //    break;
+                default:
+                    Logger.Info("[Iocp]IO_Completed default");
+                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
+            }
+
+        }
+
+
+        // This method is invoked when an asynchronous receive operation completes. 
+        // If the remote host closed the connection, then the socket is closed.  
+        // If data was received then the data is echoed back to the client.
+        //
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private void ProcessReceive(SocketAsyncEventArgs e)
+        {
+            //读取数据
+            DeliveryConnection conn = e.UserToken as DeliveryConnection;
+            if (conn == null) return;
+
+
+            try
+            {            
+
+                // check if the remote host closed the connection               
+                if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
+                {
+                    //读取数据
+                    conn.AppendData(new ArraySegment<byte>(e.Buffer, e.Offset, e.BytesTransferred));
+
+                    byte[] buffData = DataPool.BytesGet(receiveBufferSize);
+                    e.SetBuffer(buffData, 0, buffData.Length);
+
+                    // start loop
+                    //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明
+                    if (!conn.socket.ReceiveAsync(e))
+                        ProcessReceive(e);
+                    return;
+                } 
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+            conn.Close();
+        }
+
+
+
+        #endregion
+
+
+        #region Send
+
+        SersTimer_SingleThread Send_timer = new SersTimer_SingleThread { intervalMs = 1 };
+
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        void Send_Flush(object state)
+        {
+            try
+            {
+                foreach (var conn in connMap.Values)
+                {
+                    conn.Flush();
+                }
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+        }
+
+        #endregion
+
+
+        #region Delivery_Event
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private DeliveryConnection Delivery_OnConnected(global::System.Net.Sockets.Socket socket)
+        {
+            var conn = new DeliveryConnection();
+            conn.securityManager = securityManager;
+            conn.Init(socket);           
+
+            conn.Conn_OnDisconnected = Delivery_OnDisconnected;
+
+            connMap[conn.GetHashCode()] = conn;
+            try
+            {
+                Conn_OnConnected?.Invoke(conn);
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+
+
+            return conn;
+        }
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private void Delivery_OnDisconnected(IDeliveryConnection _conn)
+        {
+            // decrement the counter keeping track of the total number of clients connected to the server
+            m_maxNumberAcceptedClients.Release();
+
+
+            var conn = (DeliveryConnection)_conn;
+
+            ReceiveEventArgs_Release(conn.receiveEventArgs);
+
+            connMap.TryRemove(_conn.GetHashCode(),out _);
+
+            try
+            {
+                Conn_OnDisconnected?.Invoke(_conn);
+            }
+            catch (Exception ex)
+            {
+                Logger.Error(ex);
+            }
+
+        }
+        #endregion
+
+
+    }
+}

+ 1 - 1
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/DeliveryClient.cs → dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Simple/DeliveryClient.cs

@@ -11,7 +11,7 @@ using Vit.Core.Util.Net;
 using Vit.Core.Util.Pool;
 using Vit.Extensions;
 
-namespace Sers.CL.Socket.Iocp
+namespace Sers.CL.Socket.Iocp.Mode.Simple
 {
     public class DeliveryClient: IDeliveryClient
     {

+ 1 - 1
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/DeliveryConnection.cs → dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Simple/DeliveryConnection.cs

@@ -7,7 +7,7 @@ using Vit.Core.Module.Log;
 using Vit.Core.Util.Pipelines;
 using Vit.Extensions;
 
-namespace Sers.CL.Socket.Iocp
+namespace Sers.CL.Socket.Iocp.Mode.Simple
 {
     public class DeliveryConnection : IDeliveryConnection
     {

+ 1 - 1
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/DeliveryServer.cs → dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Simple/DeliveryServer.cs

@@ -14,7 +14,7 @@ using Vit.Core.Util.Net;
 using Vit.Core.Util.Pool;
 using Vit.Extensions;
 
-namespace Sers.CL.Socket.Iocp
+namespace Sers.CL.Socket.Iocp.Mode.Simple
 {
     public class DeliveryServer: IDeliveryServer
     {

+ 34 - 9
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/OrganizeClientBuilder.cs

@@ -10,21 +10,46 @@ namespace Sers.CL.Socket.Iocp
     {
         public void Build(List<IOrganizeClient> organizeList, JObject config)
         {
-            var delivery = new DeliveryClient();
-
-            #region security        
+            #region security     
+            Sers.Core.Util.StreamSecurity.SecurityManager securityManager = null;
             if (config["security"] is JArray securityConfigs)
             {
-                var securityManager = Sers.Core.Util.StreamSecurity.SecurityManager.BuildSecurityManager(securityConfigs);
-                ((DeliveryConnection)delivery.conn).securityManager = securityManager;
+                securityManager = Sers.Core.Util.StreamSecurity.SecurityManager.BuildSecurityManager(securityConfigs);
             }
             #endregion
 
-            delivery.host = config["host"].ConvertToString();
-            delivery.port = config["port"].Convert<int>();
-                
 
-            organizeList.Add(new OrganizeClient(delivery, config));
+            string mode = config["mode"]?.ToString();
+
+            switch (mode)
+            {
+                case "Simple":
+                    {
+                        var delivery = new Mode.Simple.DeliveryClient();
+
+                        delivery.host = config["host"].ConvertToString();
+                        delivery.port = config["port"].Convert<int>();
+
+                        ((Mode.Simple.DeliveryConnection)delivery.conn).securityManager = securityManager;
+
+                        organizeList.Add(new OrganizeClient(delivery, config));
+                    }
+                    break;
+
+                //case "Fast":
+                default:
+                    {
+                        var delivery = new Mode.Fast.DeliveryClient();
+
+                        delivery.host = config["host"].ConvertToString();
+                        delivery.port = config["port"].Convert<int>();
+
+                        ((Mode.Fast.DeliveryConnection)delivery.conn).securityManager = securityManager;
+
+                        organizeList.Add(new OrganizeClient(delivery, config));
+                    }
+                    break;
+            }
         }
     }
 }

+ 36 - 7
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/OrganizeServerBuilder.cs

@@ -10,20 +10,49 @@ namespace Sers.CL.Socket.Iocp
     {
         public void Build(List<IOrganizeServer> organizeList, JObject config)
         {
-            var delivery = new DeliveryServer();
 
-            #region security        
+            #region security     
+            Sers.Core.Util.StreamSecurity.SecurityManager securityManager = null;
             if (config["security"] is JArray securityConfigs)
             {
-                var securityManager = Sers.Core.Util.StreamSecurity.SecurityManager.BuildSecurityManager(securityConfigs);
-                delivery.securityManager = securityManager;
+                securityManager = Sers.Core.Util.StreamSecurity.SecurityManager.BuildSecurityManager(securityConfigs);
             }
             #endregion
 
-            delivery.host = config["host"].ConvertToString();
-            delivery.port = config["port"].Convert<int>();
 
-            organizeList.Add(new OrganizeServer(delivery, config));
+            string mode = config["mode"]?.ToString();
+
+            switch (mode)
+            {
+                case "Simple":
+                    {
+                        var delivery = new Mode.Simple.DeliveryServer();
+
+                        delivery.securityManager = securityManager;
+
+                        delivery.host = config["host"].ConvertToString();
+                        delivery.port = config["port"].Convert<int>();
+
+                        organizeList.Add(new OrganizeServer(delivery, config));
+                    }
+                    break;
+
+                //case "Fast":
+                default:
+                    {
+                        var delivery = new Mode.Fast.DeliveryServer();
+
+                        delivery.securityManager = securityManager;
+
+                        delivery.host = config["host"].ConvertToString();
+                        delivery.port = config["port"].Convert<int>();
+
+                        organizeList.Add(new OrganizeServer(delivery, config));
+                    }
+                    break;
+            }
+
+
         }
        
     }

+ 5 - 1
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Sers.CL.Socket.Iocp.csproj

@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
     <TargetFramework>netstandard2.0</TargetFramework>
@@ -16,6 +16,10 @@
     <DocumentationFile>bin\Debug\netstandard2.0\Sers.CL.Socket.Iocp.xml</DocumentationFile>
   </PropertyGroup>
 
+  <PropertyGroup>
+    <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
+  </PropertyGroup>
+
   <ItemGroup>
     <ProjectReference Include="..\..\..\Sers.Core\Sers.Core\Sers.Core.csproj" />
   </ItemGroup>

+ 9 - 3
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.ThreadWait/DeliveryConnection.cs

@@ -1,10 +1,13 @@
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Net.Sockets;
+using System.Runtime.CompilerServices;
 using System.Threading;
 using System.Threading.Tasks;
 using Sers.Core.CL.MessageDelivery;
 using Vit.Core.Module.Log;
+using Vit.Core.Util.Pipelines;
 using Vit.Core.Util.Pool;
 using Vit.Core.Util.Threading;
 using Vit.Extensions;
@@ -36,6 +39,9 @@ namespace Sers.CL.Socket.ThreadWait
         public Action<IDeliveryConnection, ArraySegment<byte>> OnGetFrame { private get; set; }
 
 
+        BlockingCollection<ByteData> queue = new BlockingCollection<ByteData>();
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
             if (data == null || socket == null) return;
@@ -171,7 +177,7 @@ namespace Sers.CL.Socket.ThreadWait
 
 
 
-     
+
 
         #region (x.x) socket层 封装 ReadMsg 
         //线程不安全
@@ -184,6 +190,7 @@ namespace Sers.CL.Socket.ThreadWait
         */
 
 
+        ArraySegment<byte> bLen = new byte[4].BytesToArraySegmentByte();
         internal ArraySegment<byte> ReadMsg()
         {
             #region Method Receive
@@ -214,8 +221,7 @@ namespace Sers.CL.Socket.ThreadWait
             try
             {
 
-                var bLen = DataPool.ArraySegmentByteGet(4);
-
+                
                 //(x.1)获取 第一部分(len)  
                 Receive(bLen);
                 Int32 len = bLen.ArraySegmentByteToInt32();

+ 2 - 1
dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/CmClient/appsettings.json

@@ -48,6 +48,7 @@
           /* the class of builder in assemblyFile  */
           "className": "Sers.CL.Socket.Iocp.OrganizeClientBuilder",
 
+          //"mode": "Simple",
 
           /* (x.2) config */
           /* 服务端 host地址。例如: "127.0.0.1"、"sers.cloud" */
@@ -136,7 +137,7 @@
 
   "PressureTest": {
     "clientCount": 1,
-    "requestThreadCount": 10240,
+    "requestThreadCount": 1,
     "messageThreadCount": 0,
     "msgLen": 1
   }

+ 1 - 0
dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/CmServer/appsettings.json

@@ -45,6 +45,7 @@
           /* the class of Builder in assemblyFile  */
           "className": "Sers.CL.Socket.Iocp.OrganizeServerBuilder",
 
+          //"mode": "Simple",
 
           /* (x.2) config */
           /* 服务端 监听地址。若不指定则监听所有网卡。例如: "127.0.0.1"、"sers.cloud"。*/

+ 4 - 0
dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryClient/DeliveryClient.csproj

@@ -5,6 +5,10 @@
     <TargetFramework>netcoreapp2.1</TargetFramework>
   </PropertyGroup>
 
+  <PropertyGroup>
+    <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
+  </PropertyGroup>
+
   <ItemGroup>
     <ProjectReference Include="..\..\..\Ipc\Sers.CL.Ipc.NamedPipe\Sers.CL.Ipc.NamedPipe.csproj" />
     <ProjectReference Include="..\..\..\Ipc\Sers.CL.Ipc.SharedMemory\Sers.CL.Ipc.SharedMemory.csproj" />

+ 4 - 0
dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryServer/DeliveryServer.csproj

@@ -5,6 +5,10 @@
     <TargetFramework>netcoreapp2.1</TargetFramework>
   </PropertyGroup>
 
+  <PropertyGroup>
+    <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
+  </PropertyGroup>
+
   <ItemGroup>
     <ProjectReference Include="..\..\..\Ipc\Sers.CL.Ipc.NamedPipe\Sers.CL.Ipc.NamedPipe.csproj" />
     <ProjectReference Include="..\..\..\Ipc\Sers.CL.Ipc.SharedMemory\Sers.CL.Ipc.SharedMemory.csproj" />

+ 6 - 4
dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryServer/Program.cs

@@ -43,13 +43,15 @@ namespace DeliveryTest
 
         static void StartServer()
         {
-            var server = new Sers.CL.Socket.Iocp.DeliveryServer();
-            // var server = new Sers.CL.Socket.ThreadWait.DeliveryServer();
-            server.port = port;
+            var server = new Sers.CL.Socket.Iocp.Mode.Fast.DeliveryServer();
+            //var server = new Sers.CL.Socket.ThreadWait.DeliveryServer();
+            // server.port = port;
 
             //var server = new Sers.CL.WebSocket.DeliveryServer();
             //var server = new Sers.CL.ClrZmq.ThreadWait.DeliveryServer();
-            //var server = new Sers.CL.Ipc.SharedMemory.DeliveryServer();
+            //var server = new Sers.CL.Ipc.SharedMemory.DeliveryServer();        
+
+
             //var server = new Sers.CL.Zmq.FullDuplex.DeliveryServer();
             //var server = new Sers.CL.Ipc.NamedPipe.DeliveryServer();
 

+ 4 - 2
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/Env/UsageReporter.cs

@@ -37,8 +37,10 @@ namespace Sers.Core.Module.Env
         public static void StartReportTask(double intervalSecond)
         {
             if (null != timer) return;
-            timer = new SersTimer { intervalMs = intervalSecond*1000, timerCallback =
-                (object obj)=> 
+            timer = new SersTimer { 
+                intervalMs = (int)(intervalSecond * 1000),
+                timerCallback =
+                (object obj) =>
                 {
                     Publish();
                 }

+ 2 - 6
dotnet/Library/Vit/Vit.Core/Test/Vit.Core.MsTest/Util/Threading/LongTaskHelpTest.cs → dotnet/Library/Vit/Vit.Core/Test/Vit.Core.MsTest/Util/Threading/LongTaskHelp_Test.cs

@@ -5,7 +5,7 @@ using System.Threading.Tasks;
 
 namespace Vit.Core.MsTest.Util.Threading
 {
-    public class LongTaskHelpTest
+    public class LongTaskHelp_Test
     {
         public static void Test()
         {
@@ -33,11 +33,7 @@ namespace Vit.Core.MsTest.Util.Threading
             });
 
 
-            while (true)
-            {
-                Console.WriteLine("run in Main");
-                Thread.Sleep(1000);
-            }
+            Thread.Sleep(10000);
         }
     }
 }

+ 38 - 0
dotnet/Library/Vit/Vit.Core/Test/Vit.Core.MsTest/Util/Threading/SersTimer_SingleThread_Test.cs

@@ -0,0 +1,38 @@
+using Vit.Core.Util.Threading;
+using System;
+using System.Threading;
+
+namespace Vit.Core.MsTest.Util.Threading
+{
+    public class SersTimer_SingleThread_Test
+    {
+        public static void Test()
+        {
+
+            var timer = new SersTimer_SingleThread();
+
+
+            timer.intervalMs = 1;
+
+
+            int count = 1;
+
+
+            timer.timerCallback = (state) => {
+                var curCount = Interlocked.Increment(ref count);
+
+
+                Console.Out.WriteLine($"[{curCount}]start ");
+                Thread.Sleep(500);
+
+                Console.Out.WriteLine($"[{curCount}]stop ");
+
+            };
+
+
+            timer.Start();
+
+            Thread.Sleep(10000);
+        }
+    }
+}

+ 38 - 0
dotnet/Library/Vit/Vit.Core/Test/Vit.Core.MsTest/Util/Threading/SersTimer_Test.cs

@@ -0,0 +1,38 @@
+using Vit.Core.Util.Threading;
+using System;
+using System.Threading;
+
+namespace Vit.Core.MsTest.Util.Threading
+{
+    public class SersTimer_Test
+    {
+        public static void Test()
+        {
+
+            var timer = new SersTimer();
+
+
+            timer.intervalMs = 1000;
+
+
+            int count = 1;
+
+     
+            timer.timerCallback = (state) => {
+                var curCount = count++;
+
+ 
+                Console.Out.WriteLine($"[{curCount}]start ");
+                Thread.Sleep(1500);
+
+                Console.Out.WriteLine($"[{curCount}]stop ");
+ 
+            };
+
+
+            timer.Start();
+
+            Thread.Sleep(10000);
+        }
+    }
+}

+ 8 - 2
dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/ByteDataExtensions.cs

@@ -12,12 +12,15 @@ namespace Vit.Extensions
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static byte[] ByteDataToBytes(this List<ArraySegment<byte>> byteData)
         {
+            //(x.1)get length
             int count = 0;
             foreach (var item in byteData)
             {
                 count += item.Count;
             }
 
+
+            //(x.2) copy data
             var bytes = new byte[count];
 
             int curIndex = 0;
@@ -33,10 +36,13 @@ namespace Vit.Extensions
         }
 
 
-        
+
         #endregion
 
-       
+
+
+
+        
 
     }
 }

+ 2 - 21
dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/DataPool.cs

@@ -13,14 +13,14 @@ namespace Vit.Core.Util.Pool
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static byte[] BytesGet(int minimumLength)
         {
-            // return new byte[minimumLength];
+            return new byte[minimumLength];
             return ArrayPool<byte>.Shared.Rent(minimumLength);
         }
 
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static void BytesReturn(byte[] data)
         {
-             ArrayPool<byte>.Shared.Return(data);
+             //ArrayPool<byte>.Shared.Return(data);
         }
         #endregion
 
@@ -39,24 +39,5 @@ namespace Vit.Core.Util.Pool
         #endregion
 
 
-
-        #region ByteData
-
-        [MethodImpl(MethodImplOptions.AggressiveInlining)]
-        public static List<System.ArraySegment<byte>> ByteDataGet()
-        {
-            //return new List<System.ArraySegment<byte>>();
-            return ObjectPool<List<System.ArraySegment<byte>>>.Shared.Pop();
-        }
-
-        [MethodImpl(MethodImplOptions.AggressiveInlining)]
-        internal static void ByteDataReturn(List<System.ArraySegment<byte>> data)
-        {
-            //return;
-            ObjectPool<List<System.ArraySegment<byte>>>.Shared.Push(data);
-        }
-        #endregion
-
-
     }
 }

+ 7 - 6
dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Threading/SersTimer.cs

@@ -5,13 +5,14 @@ namespace Vit.Core.Util.Threading
 {
     public class SersTimer :  IDisposable
     {
-        private Timer _timer=null;
+        protected Timer _timer=null;
         /// <summary>
         /// 定时器间隔,单位:ms
         /// </summary>
-        public double intervalMs;
-        public TimerCallback timerCallback;
-        public void Start()
+        public virtual int intervalMs { get; set; }
+
+        public virtual TimerCallback timerCallback { get; set; }
+        public virtual void Start()
         {
             Stop();
 
@@ -21,7 +22,7 @@ namespace Vit.Core.Util.Threading
 
    
 
-        public void Stop()
+        public virtual void Stop()
         {
             if (_timer != null)
             {
@@ -31,7 +32,7 @@ namespace Vit.Core.Util.Threading
             }
         }
 
-        public void Dispose()
+        public virtual void Dispose()
         {
             Stop();
         }

+ 45 - 0
dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Threading/SersTimer_SingleThead.cs

@@ -0,0 +1,45 @@
+using System;
+using System.Threading;
+
+namespace Vit.Core.Util.Threading
+{
+    /// <summary>
+    /// 若前序timer回调没有执行结束,则后续回调不会被调用
+    /// </summary>
+    public class SersTimer_SingleThread : SersTimer
+    {
+        public SersTimer_SingleThread() 
+        {
+            _timerCallback = TimerCallback;
+        }
+
+        protected int locked = 0;
+        protected void TimerCallback(object state)
+        {       
+            if (0 != Interlocked.CompareExchange(ref locked, 1, 0))
+                return;
+
+            try
+            {
+                _oriCallback(state);
+            }
+            finally
+            {
+                locked = 0;
+            }     
+        }
+
+        protected TimerCallback _oriCallback;
+        protected readonly TimerCallback _timerCallback;
+
+        public override TimerCallback timerCallback 
+        {
+            get => _timerCallback;
+            set 
+            {
+                _oriCallback = value;
+            }
+        }       
+
+    }
+}