lith hace 4 años
padre
commit
1aeea3df37
Se han modificado 73 ficheros con 2417 adiciones y 392 borrados
  1. 3 3
      dotnet/Library/Sers/Sers.CL/Ipc/Sers.CL.Ipc.NamedPipe/DeliveryConnection.cs
  2. 2 2
      dotnet/Library/Sers/Sers.CL/Ipc/Sers.CL.Ipc.SharedMemory/DeliveryConnection.cs
  3. 3 3
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/DeliveryConnection.cs
  4. 4 4
      dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.ThreadWait/DeliveryConnection.cs
  5. 30 14
      dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Client/ProgramQps.cs
  6. 11 0
      dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Client/README.md
  7. 5 4
      dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Client/appsettings.json
  8. 3 3
      dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Server/Program.cs
  9. 1 1
      dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Server/appsettings.json
  10. 16 3
      dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryClient/Program.cs
  11. 2 1
      dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryClient/StartConsole.bat
  12. 4 3
      dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryServer/Program.cs
  13. 2 0
      dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryServer/StartConsole.bat
  14. 7 8
      dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryTest/Program.cs
  15. 3 3
      dotnet/Library/Sers/Sers.CL/WebSocket/Sers.CL.WebSocket/DeliveryClient_Connection.cs
  16. 3 3
      dotnet/Library/Sers/Sers.CL/WebSocket/Sers.CL.WebSocket/DeliveryServer_Connection.cs
  17. 2 2
      dotnet/Library/Sers/Sers.CL/Zmq/FullDuplex/Sers.CL.Zmq.FullDuplex/DeliveryConnection.cs
  18. 2 2
      dotnet/Library/Sers/Sers.CL/Zmq/ThreadWait/Sers.CL.ClrZmq.ThreadWait/DeliveryConnection.cs
  19. 2 2
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/CommunicationManage/CommunicationManageClient.cs
  20. 1 1
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/CommunicationManage/CommunicationManageServer.cs
  21. 1 1
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageDelivery/IDeliveryConnection.cs
  22. 1 1
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/DefaultOrganize/OrganizeClient.cs
  23. 3 3
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/DefaultOrganize/OrganizeConnection.cs
  24. 7 7
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/DefaultOrganize/OrganizeServer.cs
  25. 41 58
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/DefaultOrganize/RequestAdaptor.cs
  26. 1 1
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/IOrganizeClient.cs
  27. 3 3
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/IOrganizeConnection.cs
  28. 1 1
      dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/IOrganizeServer.cs
  29. 3 3
      dotnet/Library/Sers/Sers.Core/Sers.Core/Module/Api/ApiClient.cs
  30. 2 2
      dotnet/Library/Sers/Sers.Core/Sers.Core/Module/Message/ApiMessage.cs
  31. 10 10
      dotnet/Library/Sers/Sers.Core/Sers.Core/Module/Message/SersFile.cs
  32. 3 3
      dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/MessageCenterService.cs
  33. 4 4
      dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/MessageClient.cs
  34. 1 0
      dotnet/Library/Sers/Sers.Core/Sers.Core/Sers.Core.csproj
  35. 71 0
      dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_BlockingCollection.cs
  36. 101 0
      dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_Disruptor.cs
  37. 136 0
      dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_WorkerPool.cs
  38. 73 0
      dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_WorkerPoolCache.cs
  39. 130 0
      dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_WorkerPoolCascade.cs
  40. 24 0
      dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/IConsumer.cs
  41. 139 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/ProgramQps.cs
  42. 15 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Properties/PublishProfiles/FolderProfile.pubxml
  43. 109 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/ConcurrentLinkedQueue.cs
  44. 152 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/Program_ConcurrentLinkedQueue.cs
  45. 365 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/Program_QueueQps.cs
  46. 48 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/Queue_Channel.cs
  47. 58 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/RingBuffer.cs
  48. 15 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/README.md
  49. 27 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Sers.Core.Util.Consumer.Test.csproj
  50. 2 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/StartConsole.bat
  51. 112 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Statistics/StatisticsQpsInfo.cs
  52. 247 0
      dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Test/WorkerPoolTest.cs
  53. 3 3
      dotnet/Library/Sers/Sers.ServiceStation/Sers.ServiceStation/ServiceStation.cs
  54. 21 12
      dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/ArraySegmentByteExtensions.cs
  55. 3 0
      dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/Base64StringExtensions.cs
  56. 0 173
      dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/ByteDataExtensions.cs
  57. 10 1
      dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/BytesExtensions.cs
  58. 30 0
      dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/DataCopyExtensions.cs
  59. 214 0
      dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pipelines/ByteData.cs
  60. 6 7
      dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/DataPool.cs
  61. 6 6
      dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/ObjectPool.cs
  62. 3 6
      dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/ObjectPoolByGenerator.cs
  63. 58 0
      dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/PoolCache.cs
  64. 5 1
      dotnet/Library/Vit/Vit.Core/Vit.Core/Vit.Core.csproj
  65. 11 0
      dotnet/Sers.sln
  66. 3 3
      dotnet/ServiceCenter/Sers.ServiceCenter/Apm/Sers.Gover.Apm.Zipkin/AppEvent.cs
  67. 4 4
      dotnet/ServiceCenter/Sers.ServiceCenter/Sers.Gover/Base/GoverManage.cs
  68. 4 4
      dotnet/ServiceCenter/Sers.ServiceCenter/Sers.ServiceCenter/ApiCenter/ApiCenterService.cs
  69. 1 1
      dotnet/ServiceCenter/Sers.ServiceCenter/Sers.ServiceCenter/Entity/ApiNode.cs
  70. 1 1
      dotnet/ServiceCenter/Sers.ServiceCenter/Sers.ServiceCenter/Entity/ServiceStation.cs
  71. 10 10
      dotnet/ServiceCenter/Sers.ServiceCenter/Sers.ServiceCenter/ServiceCenter.cs
  72. 1 1
      dotnet/ServiceStation/Demo/SersLoader/Did.SersLoader.Demo/Controllers/Demo/SampleController.cs
  73. 12 0
      dotnet/todo.txt

+ 3 - 3
dotnet/Library/Sers/Sers.CL/Ipc/Sers.CL.Ipc.NamedPipe/DeliveryConnection.cs

@@ -39,7 +39,7 @@ namespace Sers.CL.Ipc.NamedPipe
         public Action<IDeliveryConnection, ArraySegment<byte>> OnGetFrame { private get; set; }
 
 
-        public void SendFrameAsync(List<ArraySegment<byte>> data)
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
             if (data == null || stream == null) return;
 
@@ -50,10 +50,10 @@ namespace Sers.CL.Ipc.NamedPipe
             }
             try
             {
-                Int32 len = data.ByteDataCount();
+                Int32 len = data.Count();
                 data.Insert(0, len.Int32ToArraySegmentByte());               
 
-                var bytes = data.ByteDataToBytes();
+                var bytes = data.ToBytes();
                 _securityManager?.Encryption(new ArraySegment<byte>(bytes,4, bytes.Length-4));
 
                 stream.WriteAsync(bytes, 0, bytes.Length);

+ 2 - 2
dotnet/Library/Sers/Sers.CL/Ipc/Sers.CL.Ipc.SharedMemory/DeliveryConnection.cs

@@ -116,9 +116,9 @@ namespace Sers.CL.Ipc.SharedMemory
             }
         }
 
-        public void SendFrameAsync(List<ArraySegment<byte>> data)
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
-            var bytes = data.ByteDataToBytes();
+            var bytes = data.ToBytes();
             _securityManager?.Encryption(bytes.BytesToArraySegmentByte());
 
             writeStream.SendMessageAsync(bytes);

+ 3 - 3
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/DeliveryConnection.cs

@@ -32,15 +32,15 @@ namespace Sers.CL.Socket.Iocp
 
         public Action<IDeliveryConnection> Conn_OnDisconnected { get; set; }
 
-        public void SendFrameAsync(List<ArraySegment<byte>> data)
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
             if (data == null || socket == null) return;
             try
             {
-                Int32 len = data.ByteDataCount();
+                Int32 len = data.Count();
                 data.Insert(0, len.Int32ToArraySegmentByte());     
 
-                var bytes = data.ByteDataToBytes();
+                var bytes = data.ToBytes();
                 _securityManager?.Encryption(new ArraySegment<byte>(bytes,4,bytes.Length-4));
                 socket.SendAsync(bytes.BytesToArraySegmentByte(), SocketFlags.None);
                 //socket.SendAsync(data, SocketFlags.None);

+ 4 - 4
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.ThreadWait/DeliveryConnection.cs

@@ -36,15 +36,15 @@ namespace Sers.CL.Socket.ThreadWait
         public Action<IDeliveryConnection, ArraySegment<byte>> OnGetFrame { private get; set; }
 
 
-        public void SendFrameAsync(List<ArraySegment<byte>> data)
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
             if (data == null || socket == null) return;
             try
-            {          
-                Int32 len = data.ByteDataCount();
+            {
+                Int32 len = data.Count();
                 data.Insert(0, len.Int32ToArraySegmentByte());
 
-                var bytes = data.ByteDataToBytes();
+                var bytes = data.ToBytes();
 
                 _securityManager?.Encryption(new ArraySegment<byte>(bytes, 4, bytes.Length - 4));       
 

+ 30 - 14
dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Client/ProgramQps.cs

@@ -56,13 +56,13 @@ namespace CLClient
             cm.conn_OnGetRequest = (conn,sender,request,callback)=> 
             {
                 qpsInfo.IncrementRequest();
-                callback(sender,new List<ArraySegment<byte>> { request });
+                callback(sender,new Vit.Core.Util.Pipelines.ByteData { request });
             };
 
             cm.conn_OnGetMessage = (conn,msg) => 
             {
                 qpsInfo.IncrementRequest();
-                cm.SendMessageAsync(new List<ArraySegment<byte>> { msg });
+                cm.SendMessageAsync(new Vit.Core.Util.Pipelines.ByteData { msg });
             };
 
 
@@ -84,19 +84,35 @@ namespace CLClient
 
             if (theadCount >= 0)
             {
-                LongTaskHelp task = new LongTaskHelp();
-                task.threadName = "PressureTest.SendRequest";
-                task.threadCount = theadCount;
-                task.action = ()=> {
-
-                    var conn = cm.organizeList[0].conn;
-                    for (; ; )
-                    {
-                        if (conn.SendRequest(buff.BytesToByteData(), out _))
-                            qpsInfo.IncrementRequest();
-                    }
+                var conn = cm.organizeList[0].conn;
+                Action<object, Vit.Core.Util.Pipelines.ByteData> callback = null;
+
+                callback = (sender, data) =>
+                {
+                    qpsInfo.IncrementRequest();
+                    conn.SendRequestAsync(null, data, callback);
                 };
-                task.Start();
+
+
+                for (int t= theadCount;t>0 ;t-- )
+                {
+                    callback(null, buff.BytesToByteData());
+                }
+
+
+                //LongTaskHelp task = new LongTaskHelp();
+                //task.threadName = "PressureTest.SendRequest";
+                //task.threadCount = theadCount;
+                //task.action = ()=> {
+
+                //    var conn = cm.organizeList[0].conn;
+                //    for (; ; )
+                //    {
+                //        if (conn.SendRequest(buff.BytesToByteData(), out _))
+                //            qpsInfo.IncrementRequest();
+                //    }
+                //};
+                //task.Start();
             }
 
 

+ 11 - 0
dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Client/README.md

@@ -0,0 +1,11 @@
+
+#--------------------------------
+
+dotnet CLClient.dll > console.log 2>&1 &
+
+启动10个进程的qps为 10万
+
+启动1个进程的qps是4万
+
+
+ 

+ 5 - 4
dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Client/appsettings.json

@@ -9,7 +9,7 @@
         "requestTimeoutMs": 60000,
 
         /* 后台处理消息的线程个数(单位个,默认2) */
-        "workThreadCount": 4,
+        "workThreadCount": 16,
 
         //HeartBeat
         /* 心跳检测超时时间(单位ms,默认30000) */
@@ -46,11 +46,12 @@
           /* 在此Assembly中查找builder */
           "assemblyFile": "Sers.CL.Socket.Iocp.dll",
           /* the class of builder in assemblyFile  */
-          //"className": "Sers.CL.Socket.Iocp.OrganizeClientBuilder",
+          // "className": "Sers.CL.Socket.Iocp.OrganizeClientBuilder",
 
 
           /* (x.2) config */
           /* 服务端 host地址。例如: "127.0.0.1"、"sers.com" */
+          //"host": "192.168.10.11",
           "host": "127.0.0.1",
           /* 服务端 监听端口号。例如: 4501 */
           "port": 4501
@@ -135,8 +136,8 @@
 
   "PressureTest": {
     "clientCount": 1,
-    "requestThreadCount": 1,
-    "messageThreadCount": 0,
+    "requestThreadCount":0,
+    "messageThreadCount": 12800,
     "msgLen": 1
   }
 

+ 3 - 3
dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Server/Program.cs

@@ -26,14 +26,14 @@ namespace CLServer
             cm.conn_OnGetMessage = (conn, msg) =>
             {
                 qpsInfo.IncrementRequest();
-                conn.SendMessageAsync(new List<ArraySegment<byte>> { msg });               
+                conn.SendMessageAsync(new Vit.Core.Util.Pipelines.ByteData { msg });               
             };
 
 
-            cm.conn_OnGetRequest = (IOrganizeConnection  conn, Object sender, ArraySegment<byte> requestData, Action<object, List<ArraySegment<byte>>> callback) =>
+            cm.conn_OnGetRequest = (IOrganizeConnection  conn, Object sender, ArraySegment<byte> requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback) =>
             {
                 qpsInfo.IncrementRequest();
-                callback(sender,new List<ArraySegment<byte>> { requestData });
+                callback(sender,new Vit.Core.Util.Pipelines.ByteData { requestData });
             };
 
             cm.Conn_OnConnected = (conn) =>

+ 1 - 1
dotnet/Library/Sers/Sers.CL/Test/CommunicationManage/Server/appsettings.json

@@ -9,7 +9,7 @@
         "requestTimeoutMs": 60000,
 
         /* 后台处理消息的线程个数(单位个,默认2) */
-        "workThreadCount": 8,
+        "workThreadCount": 16,
 
         //HeartBeat
         /* 心跳检测超时时间(单位ms,默认30000) */

+ 16 - 3
dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryClient/Program.cs

@@ -2,6 +2,7 @@
 using System;
 using System.Collections.Generic;
 using System.Threading;
+using System.Threading.Tasks;
 using Vit.Core.Module.Log;
 using Vit.Core.Util.Threading;
 
@@ -95,8 +96,8 @@ namespace DeliveryTest
 
                 //data[0]++;
 
-                data[1] = 10;
-                var byteData = new List<ArraySegment<byte>>() { data };
+                //data[1] = 10;
+                var byteData = new Vit.Core.Util.Pipelines.ByteData() { data };
                 conn.SendFrameAsync(byteData);
            
             };
@@ -110,7 +111,19 @@ namespace DeliveryTest
 
             for (var t = 0; t < thread; t++)
             {
-                client.conn.SendFrameAsync(new List<ArraySegment<byte>>() { new ArraySegment<byte>(buff)});
+                //client.conn.SendFrameAsync(new Vit.Core.Util.Pipelines.ByteData() { new ArraySegment<byte>(buff)});
+                Task.Run(()=> {
+
+                    while (true)
+                    {
+                        for (int t1 = 0; t1 < 1000; t1++)
+                        {
+                            client.conn.SendFrameAsync(new Vit.Core.Util.Pipelines.ByteData() { new ArraySegment<byte>(buff) });
+                        }
+                        //Thread.Sleep(1);
+                    }
+                
+                });
             }
 
         }

+ 2 - 1
dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryClient/StartConsole.bat

@@ -2,8 +2,9 @@
 :: dotnet DeliveryClient.dll host port thread msgLen
 :: dotnet DeliveryClient.dll 127.0.0.1 4501 200 1024
 
+:: dotnet DeliveryClient/DeliveryClient.dll 127.0.0.1 4501 200 1024
 
-dotnet DeliveryClient.dll
+dotnet DeliveryClient.dll 127.0.0.1 4501 200 1
 pause
 
 

+ 4 - 3
dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryServer/Program.cs

@@ -59,12 +59,13 @@ namespace DeliveryTest
                 conn.OnGetFrame = (conn_, data) =>
                 {
                     qpsInfo.IncrementRequest();
+        
 
                     //data[0]++;
-                    data[1] = 5;
-                    var byteData = new List<ArraySegment<byte>>() { data };
+                    //data[1] = 5;
+                    //var byteData = new Vit.Core.Util.Pipelines.ByteData() { data };
 
-                    conn_.SendFrameAsync(byteData);
+                    //conn_.SendFrameAsync(byteData);
                 };
             };
 

+ 2 - 0
dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryServer/StartConsole.bat

@@ -2,6 +2,8 @@
 :: dotnet DeliveryServer.dll port
 :: dotnet DeliveryServer.dll 4501
 
+:: dotnet DeliveryServer/DeliveryServer.dll 4501
+
 dotnet DeliveryServer.dll
 pause
 

+ 7 - 8
dotnet/Library/Sers/Sers.CL/Test/MessageDelivery/DeliveryTest/Program.cs

@@ -40,9 +40,9 @@ namespace DeliveryTest
             {
                 conn.OnGetFrame = (conn_, data) =>
                 {
-                    data[0]++;
-                    data[1] = 5;
-                    var byteData = new List<ArraySegment<byte>>() { data };
+                    //data[0]++;
+                    //data[1] = 5;
+                    var byteData = new Vit.Core.Util.Pipelines.ByteData() { data };
 
                     conn_.SendFrameAsync(byteData);
                 };
@@ -53,7 +53,7 @@ namespace DeliveryTest
 
         }
 
-        static List<ArraySegment<byte>> staticByteData => new List<ArraySegment<byte>>() { (new ArraySegment<byte>(new byte[] { 0, 1, 2, 3 })) };
+        static Vit.Core.Util.Pipelines.ByteData staticByteData => new Vit.Core.Util.Pipelines.ByteData() { (new ArraySegment<byte>(new byte[] { 0, 1, 2, 3 })) };
 
  
         static void StartClient()
@@ -66,12 +66,11 @@ namespace DeliveryTest
 
             client.Conn_OnGetFrame = (conn, data) =>
             {
-                data[0]++;
+                //data[0]++;
 
-                data[1]=10;
-                var byteData = new List<ArraySegment<byte>>() { data };
+                //data[1]=10;
+                var byteData = new Vit.Core.Util.Pipelines.ByteData() { data };
                 conn.SendFrameAsync(byteData);
-                //Console.WriteLine("ss");
             };
 
             var connected = client.Connect() ;

+ 3 - 3
dotnet/Library/Sers/Sers.CL/WebSocket/Sers.CL.WebSocket/DeliveryClient_Connection.cs

@@ -30,16 +30,16 @@ namespace Sers.CL.WebSocket
 
         public Action<IDeliveryConnection> Conn_OnDisconnected { get; set; }
 
-        public void SendFrameAsync(List<ArraySegment<byte>> data)
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
             if (data == null || socket == null) return;
             try
             {
 
-                Int32 len = data.ByteDataCount();
+                Int32 len = data.Count();
                 data.Insert(0, len.Int32ToArraySegmentByte());
 
-                var bytes = data.ByteDataToBytes();
+                var bytes = data.ToBytes();
 
                 _securityManager?.Encryption(new ArraySegment<byte>(bytes, 4, bytes.Length - 4));
 

+ 3 - 3
dotnet/Library/Sers/Sers.CL/WebSocket/Sers.CL.WebSocket/DeliveryServer_Connection.cs

@@ -28,16 +28,16 @@ namespace Sers.CL.WebSocket
 
         public Action<IDeliveryConnection> Conn_OnDisconnected { get; set; }
 
-        public void SendFrameAsync(List<ArraySegment<byte>> data)
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
             if (data == null || socket == null) return;
 
             try
             {
-                Int32 len = data.ByteDataCount();
+                Int32 len = data.Count();
                 data.Insert(0, len.Int32ToArraySegmentByte());
 
-                var bytes = data.ByteDataToBytes();
+                var bytes = data.ToBytes();
 
                 _securityManager?.Encryption(new ArraySegment<byte>(bytes, 4, bytes.Length - 4));
 

+ 2 - 2
dotnet/Library/Sers/Sers.CL/Zmq/FullDuplex/Sers.CL.Zmq.FullDuplex/DeliveryConnection.cs

@@ -51,9 +51,9 @@ namespace Sers.CL.Zmq.FullDuplex
         }
 
         public Action<DeliveryConnection, byte[]> OnSendFrameAsync { private get; set; }
-        public void SendFrameAsync(List<ArraySegment<byte>> data)
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
-            var bytes = data.ByteDataToBytes();
+            var bytes = data.ToBytes();
 
             _securityManager?.Encryption(bytes.BytesToArraySegmentByte()); 
 

+ 2 - 2
dotnet/Library/Sers/Sers.CL/Zmq/ThreadWait/Sers.CL.ClrZmq.ThreadWait/DeliveryConnection.cs

@@ -44,9 +44,9 @@ namespace Sers.CL.ClrZmq.ThreadWait
 
 
         public Action<DeliveryConnection, byte[]> OnSendFrameAsync { private get; set; }
-        public void SendFrameAsync(List<ArraySegment<byte>> data)
+        public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
-            var bytes = data.ByteDataToBytes();
+            var bytes = data.ToBytes();
 
             _securityManager?.Encryption(bytes.BytesToArraySegmentByte());
          

+ 2 - 2
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/CommunicationManage/CommunicationManageClient.cs

@@ -38,13 +38,13 @@ namespace Sers.Core.CL.CommunicationManage
         /// <summary>
         /// 会在内部线程中被调用 
         /// </summary>
-        public Action<IOrganizeConnection,object, ArraySegment<byte>, Action<object, List<ArraySegment<byte>>>> conn_OnGetRequest{   get;set;    }
+        public Action<IOrganizeConnection,object, ArraySegment<byte>, Action<object, Vit.Core.Util.Pipelines.ByteData>> conn_OnGetRequest{   get;set;    }
 
         public Action<IOrganizeConnection,ArraySegment<byte>> conn_OnGetMessage{   get; set;   }
         #endregion
 
         #region SendMessageAsync      
-        public void SendMessageAsync(List<ArraySegment<byte>> message)
+        public void SendMessageAsync(Vit.Core.Util.Pipelines.ByteData message)
         {
             foreach (var conn in connList)
             {

+ 1 - 1
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/CommunicationManage/CommunicationManageServer.cs

@@ -50,7 +50,7 @@ namespace Sers.Core.CL.CommunicationManage
         /// 会在内部线程中被调用 
         /// (conn,sender,requestData, callback)
         /// </summary>
-        public Action<IOrganizeConnection, object, ArraySegment<byte>, Action<object, List<ArraySegment<byte>>>> conn_OnGetRequest { get; set; }
+        public Action<IOrganizeConnection, object, ArraySegment<byte>, Action<object, Vit.Core.Util.Pipelines.ByteData>> conn_OnGetRequest { get; set; }
 
         public Action<IOrganizeConnection, ArraySegment<byte>> conn_OnGetMessage { get; set; }
 

+ 1 - 1
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageDelivery/IDeliveryConnection.cs

@@ -11,7 +11,7 @@ namespace Sers.Core.CL.MessageDelivery
         /// </summary>
         byte state { get; set; }
 
-        void SendFrameAsync(List<ArraySegment<byte>> data);
+        void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data);
         Action<IDeliveryConnection, ArraySegment<byte>> OnGetFrame { set; }
         void Close();
 

+ 1 - 1
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/DefaultOrganize/OrganizeClient.cs

@@ -54,7 +54,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         /// 会在内部线程中被调用 
         /// (conn,sender,requestData,callback)
         /// </summary>
-        public Action<IOrganizeConnection,object, ArraySegment<byte>, Action<object, List<ArraySegment<byte>>>> conn_OnGetRequest
+        public Action<IOrganizeConnection,object, ArraySegment<byte>, Action<object, Vit.Core.Util.Pipelines.ByteData>> conn_OnGetRequest
         {
             set
             {

+ 3 - 3
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/DefaultOrganize/OrganizeConnection.cs

@@ -18,17 +18,17 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         }
 
 
-        public void SendMessageAsync(List<ArraySegment<byte>> message)
+        public void SendMessageAsync(Vit.Core.Util.Pipelines.ByteData message)
         {
             requestAdaptor.SendMessageAsync(this, message);
         }
 
 
-        public void SendRequestAsync(Object sender, List<ArraySegment<byte>> requestData, Action<object, List<ArraySegment<byte>>> callback)
+        public void SendRequestAsync(Object sender, Vit.Core.Util.Pipelines.ByteData requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback)
         {
             requestAdaptor.SendRequestAsync(this, sender, requestData, callback);
         }
-        public bool SendRequest(List<ArraySegment<byte>> requestData, out List<ArraySegment<byte>> replyData)
+        public bool SendRequest(Vit.Core.Util.Pipelines.ByteData requestData, out Vit.Core.Util.Pipelines.ByteData replyData)
         {
             return requestAdaptor.SendRequest(this, requestData, out replyData);
         }

+ 7 - 7
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/DefaultOrganize/OrganizeServer.cs

@@ -115,11 +115,11 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         /// <summary>
         /// 会在内部线程中被调用 
           /// </summary>
-        public Action<IOrganizeConnection, object, ArraySegment<byte>, Action<object, List<ArraySegment<byte>>>> conn_OnGetRequest
+        public Action<IOrganizeConnection, object, ArraySegment<byte>, Action<object, Vit.Core.Util.Pipelines.ByteData>> conn_OnGetRequest
         {
             set
             {              
-                requestAdaptor.event_OnGetRequest = (IOrganizeConnection organizeConn,Object sender, ArraySegment<byte> requestData, Action<object, List<ArraySegment<byte>>> callback) =>
+                requestAdaptor.event_OnGetRequest = (IOrganizeConnection organizeConn,Object sender, ArraySegment<byte> requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback) =>
                 {
                     var deliveryConn = organizeConn.GetDeliveryConn();
                     if (deliveryConn.state == DeliveryConnState.certified)
@@ -146,13 +146,13 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         }
 
 
-        public void Station_SendMessageAsync(IOrganizeConnection conn, List<ArraySegment<byte>> message)
+        public void Station_SendMessageAsync(IOrganizeConnection conn, Vit.Core.Util.Pipelines.ByteData message)
         {
             requestAdaptor.SendMessageAsync(conn, message);
         }
 
 
-        public void Station_SendRequestAsync(IOrganizeConnection conn, Object sender, List<ArraySegment<byte>> requestData, Action<object, List<ArraySegment<byte>>> callback)
+        public void Station_SendRequestAsync(IOrganizeConnection conn, Object sender, Vit.Core.Util.Pipelines.ByteData requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback)
         {
             requestAdaptor.SendRequestAsync(conn, sender, requestData, callback);
         }
@@ -161,7 +161,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
 
         #region ConnCheckSecretKey
 
-        private void ConnCheckSecretKey(IOrganizeConnection organizeConn,IDeliveryConnection deliveryConn, Object sender, ArraySegment<byte> requestData, Action<object, List<ArraySegment<byte>>> callback)
+        private void ConnCheckSecretKey(IOrganizeConnection organizeConn,IDeliveryConnection deliveryConn, Object sender, ArraySegment<byte> requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback)
         {
             // 身份验证
             try
@@ -176,7 +176,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
                     //验证通过
                     replyData = "true".SerializeToArraySegmentByte();
 
-                    callback?.Invoke(sender, new List<ArraySegment<byte>> { replyData });
+                    callback?.Invoke(sender, new Vit.Core.Util.Pipelines.ByteData { replyData });
 
                     #region 新连接 事件
                     connMap[deliveryConn] = organizeConn;
@@ -190,7 +190,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
                     deliveryConn.state = DeliveryConnState.waitForClose;
                     Logger.Info("[CL.OrganizeServer] Authentication - failed!(" + reqSecretKey + ")");
                     replyData = "false".SerializeToArraySegmentByte();
-                    callback?.Invoke(sender, new List<ArraySegment<byte>> { replyData });
+                    callback?.Invoke(sender, new Vit.Core.Util.Pipelines.ByteData { replyData });
                 }
             }
             catch (Exception ex)

+ 41 - 58
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/DefaultOrganize/RequestAdaptor.cs

@@ -2,9 +2,9 @@
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Threading;
-using System.Threading.Tasks;
 using Newtonsoft.Json.Linq;
 using Sers.Core.CL.MessageDelivery;
+using Sers.Core.Util.Consumer;
 using Vit.Core.Module.Log;
 using Vit.Core.Util.Common;
 using Vit.Core.Util.Pool;
@@ -42,7 +42,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         /// 会在内部线程中被调用 
         /// (conn,sender,requestData,callback)
         /// </summary>
-        public Action<IOrganizeConnection, object, ArraySegment<byte>, Action<object, List<ArraySegment<byte>>>> event_OnGetRequest { private get; set; }
+        public Action<IOrganizeConnection, object, ArraySegment<byte>, Action<object, Vit.Core.Util.Pipelines.ByteData>> event_OnGetRequest { private get; set; }
 
         /// <summary>
         /// deliveryToOrganize_OnGetMessage
@@ -59,14 +59,14 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         #region (x.x.2)SendMessage SendRequest
 
 
-        public void SendMessageAsync(IOrganizeConnection conn, List<ArraySegment<byte>> message)
+        public void SendMessageAsync(IOrganizeConnection conn, Vit.Core.Util.Pipelines.ByteData message)
         {
             Delivery_SendFrameAsync(conn, (byte)EFrameType.message, 0, message);
         }
 
 
 
-        public long SendRequestAsync(IOrganizeConnection conn, Object sender, List<ArraySegment<byte>> requestData, Action<object, List<ArraySegment<byte>>> callback, ERequestType requestType = ERequestType.app)
+        public long SendRequestAsync(IOrganizeConnection conn, Object sender, Vit.Core.Util.Pipelines.ByteData requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback, ERequestType requestType = ERequestType.app)
         {
             //no need guid,just make sure reqKey is unique in current connection client
             //long reqKey = CommonHelp.NewGuidLong();
@@ -86,9 +86,9 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         }
 
 
-        public bool SendRequest(IOrganizeConnection conn, List<ArraySegment<byte>> requestData, out List<ArraySegment<byte>> replyData)
+        public bool SendRequest(IOrganizeConnection conn, Vit.Core.Util.Pipelines.ByteData requestData, out Vit.Core.Util.Pipelines.ByteData replyData)
         {
-            List<ArraySegment<byte>> _replyData = null;
+            Vit.Core.Util.Pipelines.ByteData _replyData = null;
 
             AutoResetEvent mEvent = pool_AutoResetEvent.Pop();
             mEvent.Reset();
@@ -133,12 +133,11 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         public void Start()
         {
             //(x.1) task_DeliveryToOrganize_Processor
-            task_DeliveryToOrganize_Processor.Stop();
-
-            task_DeliveryToOrganize_Processor.threadName = "CL-RequestAdaptor-dealer";
-            task_DeliveryToOrganize_Processor.threadCount = workThreadCount;
-            task_DeliveryToOrganize_Processor.action = DeliveryToOrganize_Processor;
-            task_DeliveryToOrganize_Processor.Start();
+            //task_DeliveryToOrganize_Processor.Stop();
+            task_DeliveryToOrganize_Processor.processor = DeliveryToOrganize_ProcessFrame;
+            task_DeliveryToOrganize_Processor.workThreadCount = workThreadCount;
+            task_DeliveryToOrganize_Processor.name = "CL-RequestAdaptor-dealer";
+            task_DeliveryToOrganize_Processor.Start(); 
 
             //(x.2) heartBeat thread
             heartBeat_Timer.timerCallback = (state) => { HeartBeat_Loop(); };
@@ -194,7 +193,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
 
         ObjectPoolGenerator<AutoResetEvent> pool_AutoResetEvent = new ObjectPoolGenerator<AutoResetEvent>(() => new AutoResetEvent(false));
         long reqKeyIndex = CommonHelp.NewGuidLong();
-        LongTaskHelp task_DeliveryToOrganize_Processor = new LongTaskHelp();
+   
         #endregion
 
 
@@ -237,7 +236,13 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
 
         #region deliveryToOrganize_MessageFrameQueue  
 
-        readonly BlockingCollection<DeliveryToOrganize_MessageFrame> deliveryToOrganize_MessageFrameQueue = new BlockingCollection<DeliveryToOrganize_MessageFrame>();
+
+        IConsumer<DeliveryToOrganize_MessageFrame> task_DeliveryToOrganize_Processor = new Consumer_BlockingCollection<DeliveryToOrganize_MessageFrame>();
+        //IConsumer<DeliveryToOrganize_MessageFrame> task_DeliveryToOrganize_Processor = new Consumer_Disruptor<DeliveryToOrganize_MessageFrame>();
+        //IConsumer<DeliveryToOrganize_MessageFrame> task_DeliveryToOrganize_Processor = new Consumer_WorkerPool<DeliveryToOrganize_MessageFrame>();
+        //IConsumer<DeliveryToOrganize_MessageFrame> task_DeliveryToOrganize_Processor = new Consumer_WorkerPoolCache<DeliveryToOrganize_MessageFrame>();
+        //IConsumer<DeliveryToOrganize_MessageFrame> task_DeliveryToOrganize_Processor = new Consumer_WorkerPoolCascade<DeliveryToOrganize_MessageFrame>();
+
 
         void DeliveryToOrganize_OnGetMessageFrame(IOrganizeConnection conn, ArraySegment<byte> messageFrame)
         {
@@ -245,7 +250,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
             msg.conn = conn;
             msg.messageFrame = messageFrame;
 
-            deliveryToOrganize_MessageFrameQueue.Add(msg);
+            task_DeliveryToOrganize_Processor.Publish(msg);    
         }
 
 
@@ -297,44 +302,21 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         #endregion
 
         #endregion
+        
 
 
-
-        #region DeliveryToOrganize_Processor
-        private void DeliveryToOrganize_Processor()
+        #region DeliveryToOrganize_ProcessFrame
+        void DeliveryToOrganize_ProcessFrame(DeliveryToOrganize_MessageFrame msgFrame)
         {
-            while (true)
-            {
-                try
-                {
-                    #region Process                        
-                    while (true)
-                    {
-                        var msgFrame = deliveryToOrganize_MessageFrameQueue.Take();
-                        try
-                        {
-                            if (msgFrame.messageFrame != null)
-                                DeliveryToOrganize_ProcessFrame(msgFrame.conn, msgFrame.messageFrame.Value);
-                        }
-                        finally
-                        {
-                            msgFrame.Push();
-                        }
-                    }
-                    #endregion
-                }
-                catch (Exception ex) when (!(ex.GetBaseException() is ThreadInterruptedException))
-                {
-                    Logger.Error(ex);
-                }
-            }
-        }
-        #endregion
+            IOrganizeConnection conn = msgFrame.conn;
+            var messageFrame = msgFrame.messageFrame;
 
+            msgFrame.Push();
+
+            if (messageFrame == null) return;
+
+            var data = messageFrame.Value;
 
-        #region DeliveryToOrganize_ProcessFrame
-        void DeliveryToOrganize_ProcessFrame(IOrganizeConnection conn, ArraySegment<byte> data)
-        {
             if (data.Count <= 2) return;
 
             EFrameType msgType = (EFrameType)data.Array[data.Offset];
@@ -348,7 +330,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
 
                         if (OrganizeToDelivery_RequestMap_TryRemove(reqKey, out var requestInfo))
                         {
-                            requestInfo.callback(requestInfo.sender, new List<ArraySegment<byte>> { replyData });
+                            requestInfo.callback(requestInfo.sender, new Vit.Core.Util.Pipelines.ByteData { replyData });
                             requestInfo.Push();
                         }
                         return;
@@ -397,12 +379,12 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
                         if (version == organizeVersion)
                         {
                             // send reply
-                            DeliveryToOrganize_SendReply(reqInfo, new List<ArraySegment<byte>> { requestData });
+                            DeliveryToOrganize_SendReply(reqInfo, new Vit.Core.Util.Pipelines.ByteData { requestData });
                         }
                         else
                         {
                             // send reply
-                            DeliveryToOrganize_SendReply(reqInfo, new List<ArraySegment<byte>> { "error".SerializeToArraySegmentByte() });
+                            DeliveryToOrganize_SendReply(reqInfo, new Vit.Core.Util.Pipelines.ByteData { "error".SerializeToArraySegmentByte() });
                         }
                         return;
                     }
@@ -412,7 +394,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
 
                
         #region DeliveryToOrganize_SendReply
-        private void DeliveryToOrganize_SendReply(object sender, List<ArraySegment<byte>> replyData)
+        private void DeliveryToOrganize_SendReply(object sender, Vit.Core.Util.Pipelines.ByteData replyData)
         {
             DeliveryToOrganize_RequestInfo reqInfo = sender as DeliveryToOrganize_RequestInfo;
             var conn = reqInfo.conn;
@@ -491,7 +473,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         class OrganizeToDelivery_RequestInfo
         {
             public object sender;
-            public Action<object, List<ArraySegment<byte>>> callback;
+            public Action<object, Vit.Core.Util.Pipelines.ByteData> callback;
 
             public static OrganizeToDelivery_RequestInfo Pop()
             {
@@ -517,9 +499,10 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
 
 
         #region (x.5)Delivery_SendFrameAsync        
-        void Delivery_SendFrameAsync(IOrganizeConnection conn, byte msgType, byte requestType, List<ArraySegment<byte>> data)
+        void Delivery_SendFrameAsync(IOrganizeConnection conn, byte msgType, byte requestType, Vit.Core.Util.Pipelines.ByteData data)
         {
-            var item = DataPool.BytesGet(2);
+            //var item = DataPool.BytesGet(2);
+            var item = new byte[2];
             item[0] = msgType;
             item[1] = requestType;
             data.Insert(0, new ArraySegment<byte>(item, 0, 2));
@@ -564,7 +547,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
 
 
         static readonly byte[] organizeVersion_ba = organizeVersion.SerializeToBytes();
-        static List<ArraySegment<byte>> HeartBeat_Data => organizeVersion_ba.BytesToByteData();
+        static Vit.Core.Util.Pipelines.ByteData HeartBeat_Data => organizeVersion_ba.BytesToByteData();
 
         class HeartBeatInfo
         {
@@ -660,7 +643,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
             return p;
         }
 
-        void HeartBeat_callback(object sender,List<ArraySegment<byte>> replyData)
+        void HeartBeat_callback(object sender,Vit.Core.Util.Pipelines.ByteData replyData)
         {
             HeartBeatPackage package = sender as HeartBeatPackage;
 
@@ -703,7 +686,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
         /// <param name="reqKey"></param>
         /// <param name="oriMsg"></param>
         /// <param name="reqRepFrame"></param>
-        static void PackageReqRepFrame(long reqKey, List<ArraySegment<byte>> oriMsg, out List<ArraySegment<byte>> reqRepFrame)
+        static void PackageReqRepFrame(long reqKey, Vit.Core.Util.Pipelines.ByteData oriMsg, out Vit.Core.Util.Pipelines.ByteData reqRepFrame)
         {
             //*
             reqRepFrame = DataPool.ByteDataGet();

+ 1 - 1
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/IOrganizeClient.cs

@@ -18,7 +18,7 @@ namespace Sers.Core.CL.MessageOrganize
         /// 会在内部线程中被调用 
         /// (conn,sender,requestData,callback)
         /// </summary>
-        Action<IOrganizeConnection,object, ArraySegment<byte>, Action<object, List<ArraySegment<byte>>>> conn_OnGetRequest { set; }
+        Action<IOrganizeConnection,object, ArraySegment<byte>, Action<object, Vit.Core.Util.Pipelines.ByteData>> conn_OnGetRequest { set; }
 
         Action<IOrganizeConnection,ArraySegment<byte>> conn_OnGetMessage { set; }
 

+ 3 - 3
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/IOrganizeConnection.cs

@@ -7,11 +7,11 @@ namespace Sers.Core.CL.MessageOrganize
     {
         string connTag { get; set; }
 
-        void SendMessageAsync(List<ArraySegment<byte>> message);
+        void SendMessageAsync(Vit.Core.Util.Pipelines.ByteData message);
 
 
-        void SendRequestAsync(Object sender, List<ArraySegment<byte>> requestData, Action<object, List<ArraySegment<byte>>> callback);
-        bool SendRequest(List<ArraySegment<byte>> requestData, out List<ArraySegment<byte>> replyData);
+        void SendRequestAsync(Object sender, Vit.Core.Util.Pipelines.ByteData requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback);
+        bool SendRequest(Vit.Core.Util.Pipelines.ByteData requestData, out Vit.Core.Util.Pipelines.ByteData replyData);
 
 
         void Close();

+ 1 - 1
dotnet/Library/Sers/Sers.Core/Sers.Core/CL/MessageOrganize/IOrganizeServer.cs

@@ -18,7 +18,7 @@ namespace Sers.Core.CL.MessageOrganize
         /// 会在内部线程中被调用 
         /// (conn,sender,requestData, callback)
         /// </summary>
-        Action<IOrganizeConnection, object, ArraySegment<byte>, Action<object, List<ArraySegment<byte>>>> conn_OnGetRequest { set; }
+        Action<IOrganizeConnection, object, ArraySegment<byte>, Action<object, Vit.Core.Util.Pipelines.ByteData>> conn_OnGetRequest { set; }
 
         Action<IOrganizeConnection, ArraySegment<byte>> conn_OnGetMessage { set; }
     }

+ 3 - 3
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/Api/ApiClient.cs

@@ -21,7 +21,7 @@ namespace Sers.Core.Module.Api
         /// callbacks长度必须大于1
         /// </summary>
         /// <param name="callbacks"></param>
-        public static void SetOnSendRequest(Func<List<ArraySegment<byte>>, ArraySegment<byte>>[] callbacks)
+        public static void SetOnSendRequest(Func<Vit.Core.Util.Pipelines.ByteData, ArraySegment<byte>>[] callbacks)
         {
             Instances = new ApiClient[callbacks.Length];
 
@@ -41,7 +41,7 @@ namespace Sers.Core.Module.Api
 
         #region CallApi
 
-        private Func<List<ArraySegment<byte>>, ArraySegment<byte>> OnSendRequest { get; set; }
+        private Func<Vit.Core.Util.Pipelines.ByteData, ArraySegment<byte>> OnSendRequest { get; set; }
 
 
         #region CallApi 原始
@@ -51,7 +51,7 @@ namespace Sers.Core.Module.Api
         /// </summary>
         /// <param name="reqOri"></param>
         /// <returns></returns>
-        private ArraySegment<byte> CallApi(List<ArraySegment<byte>> reqOri)
+        private ArraySegment<byte> CallApi(Vit.Core.Util.Pipelines.ByteData reqOri)
         {
             return OnSendRequest(reqOri);
         }

+ 2 - 2
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/Message/ApiMessage.cs

@@ -13,14 +13,14 @@ namespace Sers.Core.Module.Message
         {
         }
 
-        public override List<ArraySegment<byte>> Files
+        public override Vit.Core.Util.Pipelines.ByteData Files
         {
             get
             {
                 var files = base.Files;
                 if (files == null)
                 {
-                    base.Files= files = new List<ArraySegment<byte>>(2) { ArraySegmentByteExtensions.Null, ArraySegmentByteExtensions.Null };
+                    base.Files= files = new Vit.Core.Util.Pipelines.ByteData(2).Add(ArraySegmentByteExtensions.Null).Add(ArraySegmentByteExtensions.Null);
                 }
                 return files;
             }

+ 10 - 10
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/Message/SersFile.cs

@@ -15,7 +15,7 @@ namespace Sers.Core.Module.Message
         }
 
 
-        public virtual List<ArraySegment<byte>> Files { get; protected set; }
+        public virtual Vit.Core.Util.Pipelines.ByteData Files { get; protected set; }
 
 
 
@@ -28,7 +28,7 @@ namespace Sers.Core.Module.Message
 
 
 
-        public List<ArraySegment<byte>> Package()
+        public Vit.Core.Util.Pipelines.ByteData Package()
         {
             return PackageArraySegmentByte(Files);
         }
@@ -37,7 +37,7 @@ namespace Sers.Core.Module.Message
 
 
         #region 文件读写
-        public int FileCount => Files.Count;
+        public int FileCount => Files.Count();
 
 
         public ArraySegment<byte> GetFile(int FileIndex)
@@ -53,7 +53,7 @@ namespace Sers.Core.Module.Message
             Files.Add(file);
         }
 
-        public SersFile SetFiles(List<ArraySegment<byte>> files)
+        public SersFile SetFiles(Vit.Core.Util.Pipelines.ByteData files)
         {
             Files = files;
             return this;
@@ -80,9 +80,9 @@ namespace Sers.Core.Module.Message
         /// </summary>
         /// <param name="files"></param>
         /// <returns></returns>
-        static List<ArraySegment<byte>> PackageArraySegmentByte(List<ArraySegment<byte>> files)
+        static Vit.Core.Util.Pipelines.ByteData PackageArraySegmentByte(Vit.Core.Util.Pipelines.ByteData files)
         {
-            var oriData = new List<ArraySegment<byte>>();
+            var oriData = new Vit.Core.Util.Pipelines.ByteData();
 
             foreach (var file in files)
             {
@@ -97,9 +97,9 @@ namespace Sers.Core.Module.Message
         ///// </summary>
         ///// <param name="files"></param>
         ///// <returns></returns>
-        //static List<ArraySegment<byte>> PackageByteData(params List<ArraySegment<byte>>[] files)
+        //static Vit.Core.Util.Pipelines.ByteData PackageByteData(params Vit.Core.Util.Pipelines.ByteData[] files)
         //{
-        //    var byteData = new List<ArraySegment<byte>>();
+        //    var byteData = new Vit.Core.Util.Pipelines.ByteData();
 
         //    foreach (var file in files)
         //    {
@@ -111,9 +111,9 @@ namespace Sers.Core.Module.Message
         //}
 
 
-        static List<ArraySegment<byte>> UnpackOriData(ArraySegment<byte> oriData)
+        static Vit.Core.Util.Pipelines.ByteData UnpackOriData(ArraySegment<byte> oriData)
         {
-            List<ArraySegment<byte>> files = new List<ArraySegment<byte>>();
+            Vit.Core.Util.Pipelines.ByteData files = new Vit.Core.Util.Pipelines.ByteData();
             int index = 0;
             int fileLen;
            

+ 3 - 3
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/MessageCenterService.cs

@@ -71,14 +71,14 @@ namespace Sers.Core.Module.PubSub
 
             //message,msgTitle,msgData
             var frame = new SersFile().SetFiles(
-                new[] { (byte)EFrameType.message }.BytesToArraySegmentByte(),
+                (new[] { (byte)EFrameType.message }).BytesToArraySegmentByte(),
                  msgTitle.SerializeToArraySegmentByte(),
                  msgData
-                ).Package().ByteDataToBytes();
+                ).Package().ToBytes();
         
             foreach (var conn in connList.Values)
             {
-                conn.SendMessageAsync(new List<ArraySegment<byte>> { frame.BytesToArraySegmentByte() });                 
+                conn.SendMessageAsync(new Vit.Core.Util.Pipelines.ByteData { frame.BytesToArraySegmentByte() });                 
             }
         }
 

+ 4 - 4
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/MessageClient.cs

@@ -40,10 +40,10 @@ namespace Sers.Core.Module.PubSub
             }
         }
 
-        Action<List<ArraySegment<byte>>> _OnSendMessage;
+        Action<Vit.Core.Util.Pipelines.ByteData> _OnSendMessage;
 
         //消息延迟发送机制
-        public Action<List<ArraySegment<byte>>> OnSendMessage
+        public Action<Vit.Core.Util.Pipelines.ByteData> OnSendMessage
         {
             //get => _OnSendMessage;
             set
@@ -71,9 +71,9 @@ namespace Sers.Core.Module.PubSub
  
 
         #region 延迟发送队列
-        readonly ConcurrentQueue<List<ArraySegment<byte>>> msgFrameToSend = new ConcurrentQueue<List<ArraySegment<byte>>>();       
+        readonly ConcurrentQueue<Vit.Core.Util.Pipelines.ByteData> msgFrameToSend = new ConcurrentQueue<Vit.Core.Util.Pipelines.ByteData>();       
        
-        void SendFrame(List<ArraySegment<byte>> frame)
+        void SendFrame(Vit.Core.Util.Pipelines.ByteData frame)
         {
             if (null == _OnSendMessage)
             {

+ 1 - 0
dotnet/Library/Sers/Sers.Core/Sers.Core/Sers.Core.csproj

@@ -26,6 +26,7 @@
   </ItemGroup>
 
   <ItemGroup>
+    <PackageReference Include="Disruptor" Version="3.6.2" />
     <PackageReference Include="Microsoft.Extensions.DependencyModel" Version="2.0.0" />
   </ItemGroup>  
  

+ 71 - 0
dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_BlockingCollection.cs

@@ -0,0 +1,71 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using Vit.Core.Module.Log;
+using Vit.Core.Util.Threading;
+
+namespace Sers.Core.Util.Consumer
+{
+    /// <summary>
+    /// qps : 50万   producer:16    consumer:16
+    /// qps : 70万   producer:2    consumer:2
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public class Consumer_BlockingCollection<T>: IConsumer<T>
+    {
+
+        public int workThreadCount { get; set; } = 16;
+
+        public string name { get; set; }
+
+        public Action<T> processor { get; set; }
+
+
+        BlockingCollection<T> queue = new BlockingCollection<T>();
+        LongTaskHelp task = new LongTaskHelp();
+
+        public void Publish(T t) 
+        {
+            queue.Add(t);
+        }
+
+
+        public void Start() 
+        { 
+            task.Stop();
+
+            task.threadName = name;
+            task.threadCount = workThreadCount;
+            task.action = Processor;
+            task.Start();
+        }
+
+        public void Stop() 
+        {
+            task.Stop();
+        }
+
+
+        private void Processor()
+        {
+            while (true)
+            {
+                try
+                {
+                    #region Process                        
+                    while (true)
+                    {
+                        var msgFrame = queue.Take();
+                        processor(msgFrame);
+                    }
+                    #endregion
+                }
+                catch (Exception ex) when (!(ex.GetBaseException() is ThreadInterruptedException))
+                {
+                    Logger.Error(ex);
+                }
+            }
+        }
+
+    }
+}

+ 101 - 0
dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_Disruptor.cs

@@ -0,0 +1,101 @@
+using Disruptor;
+using Disruptor.Dsl;
+using System;
+using System.Threading.Tasks;
+
+namespace Sers.Core.Util.Consumer
+{
+
+
+    // https://www.jianshu.com/p/6232d81581ff
+
+    // https://www.cnblogs.com/duanxz/archive/2013/01/23/2872513.html
+
+    // WaitStrategy消费者等待策略
+    // https://www.imooc.com/article/259253
+
+
+    /// <summary>
+    /// qps 基本同 Consumer_WorkerPool
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public class Consumer_Disruptor<T> : IConsumer<T>
+        where T: class,new()
+    {
+        public int workThreadCount { get; set; } = 16;
+
+        public string name { get; set; }
+
+        public Action<T> processor { get; set; }
+
+
+        /// <summary>
+        /// the size of the ring buffer, must be power of 2
+        /// </summary>
+        public int ringBufferSize { get; set; } = 2 << 22;
+
+
+
+        public void Publish(T data)
+        {
+            long index = _ringBuffer.Next();
+            var entity = _ringBuffer[index];
+            entity.data = data;
+            _ringBuffer.Publish(index);  
+        }
+
+        class Entry
+        {
+            public T data;
+        }
+        class WorkHandler : IWorkHandler<Entry>
+        {
+             Action<T> processor;
+            public WorkHandler(Action<T> processor) 
+            {
+                this.processor = processor;
+            }
+
+            public void OnEvent(Entry entry)
+            {
+                processor(entry.data);       
+            }
+        }
+
+        // static IWaitStrategy waitStrategy => new BlockingWaitStrategy();      //qps 1线程: 380万 2线程: 420万
+        // static IWaitStrategy waitStrategy => new SleepingWaitStrategy();      //qps 1线程:1100万 2线程:1200万
+        //static IWaitStrategy waitStrategy => new YieldingWaitStrategy();      //qps 1线程:1100万 2线程:1100万
+        static IWaitStrategy waitStrategy => new SpinWaitWaitStrategy();        //qps 1线程:1500万 2线程:1200万
+
+
+
+        public void Start()
+        {           
+            disruptor = new Disruptor<Entry>(() => new Entry(), ringBufferSize, TaskScheduler.Default, ProducerType.Multi, waitStrategy);
+
+            IWorkHandler<Entry>[] workerPool = new IWorkHandler<Entry>[workThreadCount];
+            for (var t = 0; t < workThreadCount; t++)
+            {
+                workerPool[t]=new WorkHandler(processor);
+            } 
+            disruptor.HandleEventsWithWorkerPool(workerPool);
+
+            _ringBuffer = disruptor.Start();
+        }
+
+        Disruptor<Entry> disruptor;
+        RingBuffer<Entry> _ringBuffer;
+
+        public void Stop()
+        {
+            disruptor?.Shutdown();
+            disruptor = null;
+            _ringBuffer = null;
+        }
+
+
+
+
+
+    }
+}

+ 136 - 0
dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_WorkerPool.cs

@@ -0,0 +1,136 @@
+using System;
+using Disruptor;
+using System.Threading.Tasks;
+using System.Linq;
+
+namespace Sers.Core.Util.Consumer
+{
+    /// <summary>
+    /// qps : 400万   producer:16    consumer:16
+    /// 
+    /// qps 1200万 - 2000万   1个对象,producer:2    consumer:2
+    /// qps 8000万 - 10000万 16个对象,producer:2    consumer:2
+    /// 参考 https://www.cnblogs.com/hda37210/p/5242185.html
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public class Consumer_WorkerPool<T> : IConsumer<T>
+    {
+
+        /// <summary>
+        /// the size of the ring buffer, must be power of 2
+        /// </summary>
+        public static int defaultBufferSize = 2 << 20;
+
+        public string name { get; set; }
+
+
+        public int workThreadCount { get; set; } = 16;
+
+        public Action<T> processor { get; set; }
+
+
+
+        public Action<T>[] processorList { get; set; }
+
+
+        /// <summary>
+        /// the size of the ring buffer, must be power of 2
+        /// </summary>
+        public int ringBufferSize { get; set; } = defaultBufferSize;
+
+
+
+        /// <summary>
+        /// 产品
+        /// </summary>
+        public class Entry
+        {
+            public T data;
+        }
+
+        /// <summary>
+        /// 消费处理对象
+        /// </summary>
+        public class WorkHandler : IWorkHandler<Entry>
+        {
+            Action<T> processor;
+            public WorkHandler(Action<T> processor)
+            {
+                this.processor = processor;
+            }
+
+            public void OnEvent(Entry entry)
+            {
+                processor(entry.data);
+            }
+        }
+
+
+
+        private RingBuffer<Entry> _ringBuffer;
+
+        private Disruptor.WorkerPool<Entry> _workerPool;
+
+        // static IWaitStrategy waitStrategy => new YieldingWaitStrategy();  //qps 1线程:2100万 2线程:1200万 4线程:500万
+        static IWaitStrategy waitStrategy => new SpinWaitWaitStrategy();    //qps 1线程:2400万 2线程:1200万 4线程:500万
+
+
+        public void Start()
+        {
+            IWorkHandler<Entry>[] handers;
+
+            if (processorList != null)
+            {
+                handers = processorList.Select(processor => new WorkHandler(processor)).ToArray();
+            }
+            else 
+            {
+                handers = System.Linq.Enumerable.Range(0, workThreadCount).Select(i => new WorkHandler(processor)).ToArray();                
+            }
+
+            
+
+
+            if (handers.Length == 1)
+            {
+                _ringBuffer = RingBuffer<Entry>.CreateSingleProducer(() => new Entry(), ringBufferSize, waitStrategy);
+            }
+            else
+            {
+                _ringBuffer = RingBuffer<Entry>.CreateMultiProducer(() => new Entry(), ringBufferSize, waitStrategy);
+            }
+
+            _workerPool = new WorkerPool<Entry>(_ringBuffer
+                , _ringBuffer.NewBarrier()
+                , new FatalExceptionHandler()
+                , handers);
+
+            _ringBuffer.AddGatingSequences(_workerPool.GetWorkerSequences());
+
+
+            _workerPool.Start(new Disruptor.Dsl.BasicExecutor(TaskScheduler.Default));
+        }
+
+
+        public void Stop()
+        {
+            _workerPool.DrainAndHalt();
+        }
+
+        public void Publish(T data)
+        {
+            long sequence = _ringBuffer.Next();
+            var product = _ringBuffer[sequence];
+
+            product.data = data;
+
+            _ringBuffer.Publish(sequence);
+        }
+
+
+
+
+
+
+    }
+}

+ 73 - 0
dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_WorkerPoolCache.cs

@@ -0,0 +1,73 @@
+using System;
+using System.Linq;
+using System.Threading;
+
+namespace Sers.Core.Util.Consumer
+{
+    /// <summary>
+    /// qps : 1200万   producer:32    consumer:32
+    /// qps : 1000万   producer:16    consumer:16
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public class Consumer_WorkerPoolCache<T> : IConsumer<T>
+    {
+
+        public int workThreadCount { get; set; } = 2;
+
+        public string name { get; set; }
+
+        public Action<T> processor { get; set; }
+
+
+        int curRootIndex;
+
+        Consumer_WorkerPool<T>[] rootWorkerList;         
+         
+
+ 
+
+        public void Start()
+        {
+            Stop();
+
+           
+            rootWorkerList = Enumerable.Range(0, workThreadCount).Select(m =>
+            {
+                var worker = new Consumer_WorkerPool<T>();
+                worker.processor = processor;
+                worker.workThreadCount = 1;
+                //worker.Start();
+                return worker;
+            }).ToArray();
+
+            curRootIndex = 0;
+
+            rootWorkerList.ToList().ForEach(m => m.Start());
+        }
+
+
+        public void Stop()
+        {
+            rootWorkerList?.ToList().ForEach(m => m.Stop());
+            rootWorkerList = null;
+        }
+
+
+
+
+        public void Publish(T data)
+        { 
+            var index = Interlocked.Increment(ref curRootIndex);
+
+            index = Math.Abs(index) % rootWorkerList.Length;
+
+            rootWorkerList[index]?.Publish(data);
+        }
+
+
+
+
+
+
+    }
+}

+ 130 - 0
dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/Consumer_WorkerPoolCascade.cs

@@ -0,0 +1,130 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+
+namespace Sers.Core.Util.Consumer
+{
+    /// <summary>
+    /// qps : 1400万   producer:32    consumer:32
+    /// qps : 1200万   producer:16    consumer:16
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public class Consumer_WorkerPoolCascade<T> : IConsumer<T>
+    {
+        public int[] workCountArray  = { 16,1 };
+
+        public int workThreadCount { get => workCountArray[0]; set => workCountArray = new[] { value, 1 }; }
+
+        public string name { get; set; }
+
+        public Action<T> processor { get; set; }
+
+ 
+
+
+        List<Consumer_WorkerPool<T>> workerList=new List<Consumer_WorkerPool<T>>();
+
+
+        int curRootIndex;
+        Consumer_WorkerPool<T>[] rootWorkerList;
+
+
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="level">从1开始</param>
+        /// <returns></returns>
+        Consumer_WorkerPool<T> BuildLevel(int level) 
+        {
+       
+            int workerCount = workCountArray[level - 1];
+
+            if (level == workCountArray.Length)
+            {
+                //最后一层
+
+                var worker = new Consumer_WorkerPool<T>();
+                worker.processor = processor;
+                worker.workThreadCount = workerCount;
+
+                workerList.Add(worker);
+
+         
+                return worker;
+
+            }
+            else 
+            {
+                var processorList = Enumerable.Range(0, workerCount).Select(m =>
+                {
+                    var child = BuildLevel(level + 1);
+                    Action<T> processor = t => child.Publish(t);
+                    return processor;
+                }).ToArray();
+
+                var worker = new Consumer_WorkerPool<T>();
+                worker.processorList = processorList;
+                workerList.Add(worker);
+
+                return worker;
+
+            }
+
+
+            
+        }
+
+    
+
+        public void Start()
+        {
+            Stop();
+
+
+
+            rootWorkerList = Enumerable.Range(0, workCountArray[0]).Select(m =>
+            {             
+                return BuildLevel(2);
+            }).ToArray();
+
+            curRootIndex = 0;
+
+            workerList.ForEach(m => m.Start());
+
+
+        }
+
+
+        public void Stop()
+        {
+            lock (this)
+            {
+
+                workerList.ForEach(m => m.Stop());
+                workerList.Clear();
+
+                rootWorkerList = null;
+            }
+
+        }
+
+
+       
+
+        public void Publish(T data)
+        {  
+            var index = Interlocked.Increment(ref curRootIndex);
+
+            index = Math.Abs(index) % rootWorkerList.Length;
+
+            rootWorkerList[index]?.Publish(data);
+        }
+
+
+
+
+
+
+    }
+}

+ 24 - 0
dotnet/Library/Sers/Sers.Core/Sers.Core/Util/Consumer/IConsumer.cs

@@ -0,0 +1,24 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Sers.Core.Util.Consumer
+{
+    public interface IConsumer<T>
+    {
+        int workThreadCount { get; set; }
+
+        string name { get; set; }
+
+        Action<T> processor { get; set; }
+
+
+
+        void Publish(T t);
+
+
+        void Start();
+
+        void Stop();
+    }
+}

+ 139 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/ProgramQps.cs

@@ -0,0 +1,139 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using CLClient.Statistics;
+using Sers.Core.Util.Consumer;
+using Vit.Core.Module.Log;
+
+namespace CLClient
+{
+    class ProgramQps
+    {
+
+        static StatisticsQpsInfo qpsPub = new StatisticsQpsInfo();
+
+        static StatisticsQpsInfo qpsSub = new StatisticsQpsInfo();
+
+        static void Main(string[] args)
+        {             
+
+            Logger.OnLog = (level, msg) => { Console.Write("[" + level + "]" + msg); };
+
+            qpsPub.Start(" Pub");
+            //qpsSub.Start(" Sub");
+
+            for (var t = 0; t < 1; t++)
+            {
+                new ProgramQps() { name = "" + t }.Start();
+            }
+
+            while (true)
+            {
+                Thread.Sleep(5000);
+            }
+        }
+
+
+
+        class Product 
+        {
+            public static Product Pop()
+            {
+                return new Product();
+                //return ObjectPool<Product>.Shared.Pop();
+            }
+
+            /// <summary>
+            /// 使用结束请手动调用
+            /// </summary>
+            public void Push()
+            {
+                //ObjectPool<Product>.Shared.Push(this);
+            }
+
+            public int ms = 0;
+            public int IncrementCount = 0;
+        }
+
+
+        IConsumer<Product> consumer;
+        string name;
+        public void Start() 
+        {
+            //consumer = new Consumer_BlockingCollection<Product>();
+            //consumer = new Consumer_Disruptor<Product>();
+
+            //consumer = new Consumer_WorkerPool<Product>();
+
+            // consumer = new Consumer_WorkerPoolCache<Product>();
+            consumer = new Consumer_WorkerPoolCascade<Product> ();
+ 
+
+
+            consumer.processor = Processor;
+            consumer.workThreadCount = consumerThreadCount;
+            consumer.Start();
+
+
+            for (int i = producerThreadCount; i > 0; i--)
+            {
+                StartThreadPublish();
+            }
+        }
+
+
+        static int producerThreadCount = 16;
+        static int consumerThreadCount = 16;
+
+        /// <summary>
+        ///
+        /// </summary>
+        /// <param name="obj"></param>
+        void Processor(Product obj) 
+        {
+            //if (obj.ms > 0)
+            //    Thread.Sleep(obj.ms);
+
+       
+            //if (obj.IncrementCount > 0)
+            //    qpsSub.IncrementRequest(obj.IncrementCount);
+
+            //obj.Push();
+        }
+
+
+        /// <summary>
+        ///  
+        /// </summary>
+        void StartThreadPublish()
+        {
+            Task.Run(() =>
+            {
+                Product product=new Product();
+
+                for (int i = 0; i < int.MaxValue; i++)
+                {
+                    for (var t = 1; t <10000; t++)
+                    {
+                        //new Product { ms = t == 0 ? 1 : 0 };
+                        //if (t % 100 == 0) qpsPub.IncrementRequest(100);
+
+                        //product = Product.Pop();
+                        //product.ms = 0;
+                        //product.IncrementCount = 0;
+                        consumer.Publish(product);
+
+                    }
+                    //product = Product.Pop();
+                    //product.ms = 1;
+                    //product.IncrementCount = 10000;
+                    consumer.Publish(product);         
+                    qpsPub.IncrementRequest(10000);
+                   
+                    //Thread.Sleep(1);
+
+                }
+            });
+        }
+    }
+}

+ 15 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Properties/PublishProfiles/FolderProfile.pubxml

@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+https://go.microsoft.com/fwlink/?LinkID=208121. 
+-->
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <PropertyGroup>
+    <PublishProtocol>FileSystem</PublishProtocol>
+    <Configuration>Release</Configuration>
+    <Platform>Any CPU</Platform>
+    <TargetFramework>netcoreapp2.1</TargetFramework>
+    <PublishDir>bin\Sers.Core.Util.Consumer.Test</PublishDir>
+    <SelfContained>false</SelfContained>
+    <_IsPortable>true</_IsPortable>
+  </PropertyGroup>
+</Project>

+ 109 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/ConcurrentLinkedQueue.cs

@@ -0,0 +1,109 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+
+namespace Sers.Core.Util.PubSub.Test.Queue
+{
+    // 基于无锁的C#并发队列实现
+    // https://www.cnblogs.com/liaofan/archive/2008/11/20/1337888.html
+
+    public class ConcurrentLinkedQueue<T>
+ {
+     private class Node<K>
+     {
+         internal K Item;
+         internal Node<K> Next;
+ 
+         public Node(K item, Node<K> next)
+         {
+             this.Item = item;
+             this.Next = next;
+         }
+     }
+ 
+     private Node<T> _head;
+     private Node<T> _tail;
+ 
+     public ConcurrentLinkedQueue()
+     {
+         _head = new Node<T>(default(T), null);
+         _tail = _head;
+     }
+ 
+     public bool IsEmpty
+     {
+         get { return (_head.Next == null); }
+     }
+ 
+     public void Enqueue(T item)
+     {
+         Node<T> newNode = new Node<T>(item, null);
+         while (true)
+         {
+             Node<T> curTail = _tail;
+             Node<T> residue = curTail.Next;
+ 
+             //判断_tail是否被其他process改变
+             if (curTail == _tail)
+             {
+                 //A 有其他process执行C成功,_tail应该指向新的节点
+                 if (residue == null) 
+                 {
+                     //C 如果其他process改变了tail.next节点,需要重新取新的tail节点
+                     if (Interlocked.CompareExchange<Node<T>>(
+                         ref curTail.Next, newNode, residue) == residue) 
+                     {
+                         //D 尝试修改tail
+                         Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail); 
+                         return;
+                     }
+                 }
+                 else
+                 {
+                     //B 帮助其他线程完成D操作
+                     Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail); 
+                 }
+             }
+         }
+     }
+ 
+     public bool TryDequeue(out T result)
+     {
+         Node<T> curHead;
+         Node<T> curTail;
+         Node<T> next;
+         do
+         {
+             curHead = _head;
+             curTail = _tail;
+             next = curHead.Next;
+             if (curHead == _head)
+             {
+                 if (next == null)  //Queue为空
+                 {
+                     result = default(T);
+                     return false;
+                 }
+                 if (curHead == curTail) //Queue处于Enqueue第一个node的过程中
+                 {
+                     //尝试帮助其他Process完成操作
+                     Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail); 
+                 }
+                 else
+                 {
+                     //取next.Item必须放到CAS之前
+                     result = next.Item; 
+                     //如果_head没有发生改变,则将_head指向next并退出
+                     if (Interlocked.CompareExchange<Node<T>>(ref _head,
+                         next, curHead) == curHead)
+                         break;
+                 }
+             }
+         }
+         while (true);
+         return true;
+     }
+ }
+ 
+}

+ 152 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/Program_ConcurrentLinkedQueue.cs

@@ -0,0 +1,152 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using CLClient.Statistics;
+using Sers.Core.Util.PubSub;
+using Sers.Core.Util.PubSub.Test.Queue;
+using Vit.Core.Module.Log;
+using Vit.Core.Util.Pool;
+
+namespace CLClient1
+{
+    // qps   1,4  600万
+    class ConcurrentLinkedQueue
+    {
+
+        static StatisticsQpsInfo qpsPub = new StatisticsQpsInfo();
+
+        static StatisticsQpsInfo qpsSub = new StatisticsQpsInfo();
+
+        static void Main11(string[] args)
+        {
+             
+
+            Logger.OnLog = (level, msg) => { Console.Write("[" + level + "]" + msg); };
+
+            qpsPub.Start(" Pub");
+            //qpsSub.Start(" Sub");
+
+            for (var t = 0; t < 1; t++)
+            {
+                new ConcurrentLinkedQueue() { name = "" + t }.Start();
+            }
+
+            while (true)
+            {
+                Thread.Sleep(5000);
+            }
+        }
+
+
+
+        class Product 
+        {
+            public static Product Pop()
+            {
+                return new Product();
+                //return ObjectPool<Product>.Shared.Pop();
+            }
+
+            /// <summary>
+            /// 使用结束请手动调用
+            /// </summary>
+            public void Push()
+            {
+                //ObjectPool<Product>.Shared.Push(this);
+            }
+
+            public int ms = 0;
+            public int IncrementCount = 0;
+        }
+
+
+        ConcurrentLinkedQueue<Product> worker;
+        string name;
+        public void Start() 
+        {
+    
+           
+            worker = new ConcurrentLinkedQueue<Product>();
+ 
+
+ 
+
+
+            for (int i = pubThreadCount; i > 0; i--)
+            {
+                StartThreadPublish();
+            }
+
+
+            for (int i = subThreadCount; i > 0; i--)
+            {
+                StartThreadSubscribe();
+            }
+        }
+
+
+        static int pubThreadCount =4;
+        static int subThreadCount = 4;
+      
+        void StartThreadPublish()
+        {
+            Task.Run(() =>
+            {
+                Product product=new Product();
+
+                for (int i = 0; i < int.MaxValue; i++)
+                {
+                    for (var t = 1; t <10000; t++)
+                    {
+
+
+                        //product = Product.Pop();
+                        //product.ms = 0;
+                        //product.IncrementCount = 0;
+                        //worker.Publish(product);
+                        worker.Enqueue(product);
+
+                    }
+                    //product = Product.Pop();
+                    //product.ms = 1;
+                    //product.IncrementCount = 10000;
+                    //worker.Publish(product);         
+                    worker.Enqueue(product);
+                    qpsPub.IncrementRequest(10000);
+                   
+                    //Thread.Sleep(1);
+
+                }
+            });
+        }
+
+
+        void StartThreadSubscribe()
+        {
+            Task.Run(() =>
+            {
+
+                SpinWait spin = new SpinWait();
+                for (int i = 0; i < int.MaxValue; i++)
+                {
+                    worker.TryDequeue(out var product);
+     
+
+                    if (product == null) 
+                    {
+                        //spin.SpinOnce();
+                    }
+
+                    //if (obj.ms > 0)
+                    //    Thread.Sleep(obj.ms);
+
+
+                    //if (obj.IncrementCount > 0)
+                    //    qpsSub.IncrementRequest(obj.IncrementCount);
+
+                    //obj.Push();
+                }
+            });
+        }
+    }
+}

+ 365 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/Program_QueueQps.cs

@@ -0,0 +1,365 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using CLClient.Statistics;
+using Sers.Core.Util.PubSub;
+using Sers.Core.Util.PubSub.Test.Queue;
+using Vit.Core.Module.Log;
+using Vit.Core.Util.Pool;
+
+namespace CLClient1
+{
+
+    // qps SersQueue    1 thread  1千万
+    //                  4 thread  6百万
+
+    // qps  RingBuffer  1 thread  2   千万
+    //                  4 thread  2.5 千万
+    //                 16 thread  4.7 千万
+
+
+    //ConcurrentQueue
+    //  thead      qps
+    //   1,1     5 千万
+    //   4,6     7 千万
+
+
+    //ConcurrentBag
+    //  thead      qps(万)
+    //   1,1     1700
+    //   2,2     180 
+    //   8,8     180 
+
+    class Program_QueueQps
+    {
+
+        static int pubThreadCount = 8;
+        static int subThreadCount = 16;
+
+        static SpinWait spin = new SpinWait();
+
+        static StatisticsQpsInfo qpsPub = new StatisticsQpsInfo();
+
+        static StatisticsQpsInfo qpsSub = new StatisticsQpsInfo();
+
+        static void Mainwww(string[] args)
+        {
+             
+
+            Logger.OnLog = (level, msg) => { Console.Write("[" + level + "]" + msg); };
+
+            qpsPub.Start(" Pub");
+            qpsSub.Start(" Sub");
+
+            for (var t = 0; t < 1; t++)
+            {
+               Start();
+            }
+
+            while (true)
+            {
+                Thread.Sleep(5000);
+            }
+        }
+
+
+
+        class Product 
+        {
+            public static Product Pop()
+            {
+                return new Product();
+                //return ObjectPool<Product>.Shared.Pop();
+            }
+
+            /// <summary>
+            /// 使用结束请手动调用
+            /// </summary>
+            public void Push()
+            {
+                //ObjectPool<Product>.Shared.Push(this);
+            }
+
+            public int ms = 0;
+            public int IncrementCount = 0;
+        }
+
+
+        #region SersQueue      
+        static void Start_SersQueue()
+        {
+
+            var worker = new Queue_Channel<Product>();
+ 
+            int pubThreadCount = 4;
+            int subThreadCount = 6;
+
+
+            for (int i = pubThreadCount; i > 0; i--)
+            {
+                Task.Run(() =>
+                {
+                    Product product = new Product();
+
+                    for (; ; )
+                    {
+                        for (var t = 1; t < 10000; t++)
+                        {
+
+
+                            //product = Product.Pop();
+                            //product.ms = 0;
+                            //product.IncrementCount = 0;
+                             worker.Publish(product);
+                   
+
+                        }
+                        //product = Product.Pop();
+                        //product.ms = 1;
+                        //product.IncrementCount = 10000;
+                         worker.Publish(product);
+               
+                        qpsPub.IncrementRequest(10000);
+
+                        //Thread.Sleep(1);
+
+                    }
+                });
+            }
+
+
+            for (int i = subThreadCount; i > 0; i--)
+            {
+                Task.Run(() =>
+                {
+
+                    SpinWait spin = new SpinWait();
+                    for (; ; )
+                    {
+
+                        var p = worker.Take();
+                        if (p == null)
+                        {
+                            spin.SpinOnce();
+                        }
+
+                        //if (obj.ms > 0)
+                        //    Thread.Sleep(obj.ms);
+
+
+                        //if (obj.IncrementCount > 0)
+                        //    qpsSub.IncrementRequest(obj.IncrementCount);
+
+                        //obj.Push();
+                    }
+                });
+            }
+        }
+
+        #endregion
+
+        #region RingBuffer      
+        static void Start_RingBuffer()
+        { 
+            var worker = new RingBuffer<Product>();
+    
+
+            int pubThreadCount = 4;
+            int subThreadCount = 6;
+
+
+
+
+            for (int i = pubThreadCount; i > 0; i--)
+            {
+                Task.Run(() =>
+                {
+                    Product product = new Product();
+
+                    for (; ; )
+                    {
+                        for (var t = 1; t < 10000; t++)
+                        {
+                            //product = Product.Pop();
+                            //product.ms = 0;
+                            //product.IncrementCount = 0;
+                            worker.Publish(product);                  
+
+                        }
+                        //product = Product.Pop();
+                        //product.ms = 1;
+                        //product.IncrementCount = 10000;
+                        worker.Publish(product);       
+                        qpsPub.IncrementRequest(10000);
+                        //Thread.Sleep(1);
+                    }
+                });
+            }
+
+
+            for (int i = subThreadCount; i > 0; i--)
+            {
+                Task.Run(() =>
+                {
+
+                    SpinWait spin = new SpinWait();
+                    for (; ; )
+                    {
+
+
+
+                        var p = worker.Take();
+                        if (p == null)
+                        {
+                            spin.SpinOnce();
+                        }
+
+                        //if (obj.ms > 0)
+                        //    Thread.Sleep(obj.ms);
+
+
+                        //if (obj.IncrementCount > 0)
+                        //    qpsSub.IncrementRequest(obj.IncrementCount);
+
+                        //obj.Push();
+                    }
+                });
+            }
+        }
+
+        #endregion
+
+
+
+        #region ConcurrentQueue      
+        static void Start2() 
+        { 
+            var worker = new ConcurrentQueue<Product>();
+
+            int pubThreadCount = 4;
+            int subThreadCount = 6;
+
+
+            for (int i = pubThreadCount; i > 0; i--)
+            {
+                Task.Run(() =>
+                {
+                    Product product = new Product();
+
+                    for (; ; )
+                    {
+                        for (var t = 1; t < 10000; t++)
+                        {
+
+
+                            //product = Product.Pop();
+                            //product.ms = 0;
+                            //product.IncrementCount = 0;
+                            //worker.Publish(product);
+                            worker.Enqueue(product);
+
+                        }
+                        //product = Product.Pop();
+                        //product.ms = 1;
+                        //product.IncrementCount = 10000;
+                        //worker.Publish(product);
+                        worker.Enqueue(product);
+                        qpsPub.IncrementRequest(10000);
+
+                        //Thread.Sleep(1);
+
+                    }
+                });
+            }
+
+
+            for (int i = subThreadCount; i > 0; i--)
+            {
+                Task.Run(() =>
+                {
+
+                    SpinWait spin = new SpinWait();
+                    for (; ; )
+                    {
+                        worker.TryDequeue(out var p);
+
+
+                        //var p = worker.Take();
+                        //if (p == null) 
+                        //{
+                        //    spin.SpinOnce();
+                        //}
+
+                        //if (obj.ms > 0)
+                        //    Thread.Sleep(obj.ms);
+
+
+                        //if (obj.IncrementCount > 0)
+                        //    qpsSub.IncrementRequest(obj.IncrementCount);
+
+                        //obj.Push();
+                    }
+                });
+            }
+        }
+
+        #endregion
+
+
+        #region ConcurrentBag      
+        static void Start()
+        {
+            var worker = new ConcurrentBag<Product>();
+
+
+
+            for (int i = pubThreadCount; i > 0; i--)
+            {
+                Task.Run(() =>
+                {
+                    Product product = new Product();
+
+                    for (; ; )
+                    {
+                        for (var t = 1; t < 10000; t++)
+                        { 
+                            worker.Add(product);
+                        }
+             
+                        worker.Add(product);
+                        qpsPub.IncrementRequest(10000);
+
+                        //spin.SpinOnce();
+                        Thread.Sleep(10);
+
+                    }
+                });
+            }
+
+
+            for (int i = subThreadCount; i > 0; i--)
+            {
+                Task.Run(() =>
+                {                   
+                    for (; ; )
+                    {
+
+                        worker.TryTake(out var p);
+
+
+                      
+                        //if (p == null) 
+                        //{
+                        //    spin.SpinOnce();
+                        //}
+ 
+                    }
+                });
+            }
+        }
+
+        #endregion
+
+    }
+}

+ 48 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/Queue_Channel.cs

@@ -0,0 +1,48 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Channels;
+
+namespace Sers.Core.Util.PubSub.Test.Queue
+{
+    public class Queue_Channel<T>
+    {
+
+        Channel<T> channel = Channel.CreateUnbounded<T>();
+
+        public async System.Threading.Tasks.Task PublishAsync(T t)
+        {
+            await channel.Writer.WriteAsync(t);
+        }
+
+        public void Publish(T t)
+        {
+            PublishAsync(t);
+              //channel.Writer.TryWrite(t);
+        }
+
+
+        /// <summary>
+        /// true if an item was read; otherwise, false.
+        /// </summary>
+        /// <param name="item"></param>
+        /// <returns></returns>
+        public bool TryTake(out T item) 
+        {
+            return channel.Reader.TryRead(out item);
+        }
+
+
+        public T Take()
+        {
+            //if (channel.Reader.WaitToReadAsync().Result)
+            //{
+                if (channel.Reader.TryRead(out var item)) return item;
+               
+            //}
+            return default;
+        }
+
+
+    }
+}

+ 58 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Queue/RingBuffer.cs

@@ -0,0 +1,58 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Channels;
+
+namespace Sers.Core.Util.PubSub.Test.Queue
+{
+    public class RingBuffer<T> where T:class
+    {
+
+        T[] buffer;
+        int power;
+        int length;
+        int mouban;
+        public RingBuffer(int power=20) 
+        {
+            this.power = power;
+            this.length = 2 << power;
+            buffer = new T[length];
+
+
+            mouban = 1;
+            while (--power > 0) 
+            {
+                mouban <<= 1;
+                mouban++;
+            }
+
+
+        }
+
+
+        int headIndex = 0;
+        int tailIndex = 0;
+
+        public void Publish(T t)
+        {
+            var index=Interlocked.Increment(ref headIndex);
+
+            buffer[index & mouban] = t;
+        }
+
+ 
+
+
+        public T Take()
+        {
+            var index = Interlocked.Increment(ref tailIndex) & mouban;
+            var t= buffer[index];
+            buffer[index] = null;
+
+            return t;
+        }
+
+
+    }
+}

+ 15 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/README.md

@@ -0,0 +1,15 @@
+
+#--------------------------------
+cd Sers.Core.Util.PubSub.Test
+dotnet Sers.Core.Util.PubSub.Test.dll 
+
+启动20个发布者
+启动22个订阅者
+
+//disruptor = new Disruptor<Entry>(() => new Entry(), 2 << 18, TaskScheduler.Default, ProducerType.Multi, new BlockingWaitStrategy());   //qps 350万
+//disruptor = new Disruptor<Entry>(() => new Entry(), 2 << 18, TaskScheduler.Default, ProducerType.Multi, new SleepingWaitStrategy());   //qps 580万
+disruptor = new Disruptor<Entry>(() => new Entry(), 2 << 18, TaskScheduler.Default, ProducerType.Multi, new YieldingWaitStrategy());     //qps 590万
+ 
+Worker_BlockingCollection  //qps 260万
+
+ 

+ 27 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Sers.Core.Util.Consumer.Test.csproj

@@ -0,0 +1,27 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>netcoreapp2.1</TargetFramework>
+  </PropertyGroup>
+ 
+
+  <ItemGroup>
+    <Content Include="StartConsole.bat">
+      <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+    </Content>
+  </ItemGroup>
+ 
+
+  <ItemGroup>
+    <PackageReference Include="System.Threading.Channels" Version="5.0.0" />
+  </ItemGroup>
+ 
+
+  <ItemGroup>
+    <ProjectReference Include="..\..\Sers.Core\Sers.Core.csproj" />
+  </ItemGroup>
+
+ 
+
+</Project>

+ 2 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/StartConsole.bat

@@ -0,0 +1,2 @@
+dotnet Sers.Core.Util.PubSub.Test.dll
+pause

+ 112 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Statistics/StatisticsQpsInfo.cs

@@ -0,0 +1,112 @@
+/// v3
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace CLClient.Statistics
+{
+    public class StatisticsQpsInfo
+    {
+        //public static StatisQpsInfo Instance = new StatisQpsInfo();
+
+
+        string name = "";
+
+
+        DateTime? startTime;
+
+        bool finished = false;
+        public void Start(string name)
+        {
+            this.name += name;
+            finished = false;
+            startTime = DateTime.Now;
+            Console.WriteLine("¿ªÊ¼");
+
+            Task.Run(() => {
+
+                while (!finished)
+                {
+                    Console.WriteLine(ToString());
+                    Thread.Sleep(1000);
+                }
+
+            });
+
+
+        }
+        public void Stop()
+        {
+            finished = true;
+            Console.WriteLine("½áÊø");
+            Console.WriteLine(ToString());
+        }
+
+        public int RequestCount = 0;
+        public void IncrementRequest() => Interlocked.Increment(ref RequestCount);
+        public void IncrementRequest(int count) => Interlocked.Add(ref RequestCount, count);
+
+        public long RequestTicks = 0;
+        public void IncrementRequestTicks(long value) => Interlocked.Add(ref RequestTicks, value);
+
+
+        public int ErrorCount = 0;
+        public void IncrementError() => Interlocked.Increment(ref ErrorCount);
+
+
+        long lastRequestTicks = 0;
+        int lastCount = 0;
+        DateTime lastTime;
+        public override string ToString()
+        {
+            var curCount = RequestCount;
+            var curRequestTicks = RequestTicks;
+
+            var msg = $"[{name}]ReqCount: {curCount}";
+
+            double d;
+
+            if (curCount > 0)
+            {
+                d = ErrorCount * 100.0 / curCount;
+                msg += $",error:{ErrorCount}({d.ToString("0.00")}%)";
+            }
+            if (startTime.HasValue)
+            {
+                if (lastCount == 0)
+                {
+                    lastTime = startTime.Value;
+                }
+                var curTime = DateTime.Now;
+
+                //sum
+                var ms = (curTime - startTime.Value).TotalMilliseconds;
+                d =  curCount / ms * 1000;
+                msg += $",qps:{ d.ToString("0.00") }";
+
+                ms = 1.0 * curRequestTicks / TimeSpan.TicksPerMillisecond;
+                d = (curCount <= 0 ? 0 : ms / curCount);
+                msg += $",ms/req:{ d.ToString("0.00") }";
+
+
+                //cur
+                msg += $",------Cur";                
+                msg += $",ReqCount: {curCount}";
+                ms = (curTime - lastTime).TotalMilliseconds;
+                d = (curCount - lastCount) / ms * 1000;
+                msg += $",qps:{ d.ToString("0.00")  }";
+                ms = 1.0 * (curRequestTicks - lastRequestTicks) / TimeSpan.TicksPerMillisecond;
+                d = (curCount <= lastCount ? 0 : ms / (curCount - lastCount));
+                msg += $",ms/req:{ d.ToString("0.00") }";
+
+
+                lastRequestTicks = curRequestTicks;
+                lastCount = curCount;
+                lastTime = curTime;
+                return msg;
+            }
+            return msg;
+        }
+    }
+}

+ 247 - 0
dotnet/Library/Sers/Sers.Core/Test/Sers.Core.Util.Consumer.Test/Test/WorkerPoolTest.cs

@@ -0,0 +1,247 @@
+using System;
+using System.Collections.Generic;
+using Disruptor;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Linq;
+
+namespace Sers.Core.Util.PubSub.Test.Test
+{
+
+    // 参考 https://www.cnblogs.com/hda37210/p/5242185.html
+
+
+
+    /// <summary>
+    /// 消费者管理器
+    /// </summary>
+    /// <typeparam name="TProduct">产品</typeparam>
+    public class Workers<TProduct> where TProduct : Producer<TProduct>, new()
+    {
+
+       
+
+        private Disruptor.WorkerPool<TProduct> _workerPool;
+
+        public Workers(List<IWorkHandler<TProduct>> handers, IWaitStrategy waitStrategy = null, int bufferSize = /*Test.repeat **/ (2<<22)) //1024 * 64
+        {
+
+            if (handers == null || handers.Count == 0)
+                throw new ArgumentNullException("消费事件处理数组为空!");
+
+            if (handers.Count == 1)
+                _ringBuffer = RingBuffer<TProduct>.CreateSingleProducer(() => new TProduct(), bufferSize,
+                    waitStrategy ?? new SpinWaitWaitStrategy());
+            else
+            {
+                _ringBuffer = RingBuffer<TProduct>.CreateMultiProducer(() => new TProduct(), bufferSize,
+                    waitStrategy ?? new SpinWaitWaitStrategy());
+            }
+
+            _workerPool = new WorkerPool<TProduct>(_ringBuffer
+                , _ringBuffer.NewBarrier()
+                , new FatalExceptionHandler()
+                , handers.ToArray());
+
+            _ringBuffer.AddGatingSequences(_workerPool.GetWorkerSequences());
+        }
+
+        public void Start()
+        {
+            _workerPool.Start(new Disruptor.Dsl.BasicExecutor(TaskScheduler.Default));
+        }
+
+        public Producer<TProduct> CreateOneProducer()
+        {
+            return new Producer<TProduct>(this._ringBuffer);
+        }
+        public void DrainAndHalt()
+        {
+            _workerPool.DrainAndHalt();
+        }
+
+        private readonly RingBuffer<TProduct> _ringBuffer;
+    }
+
+    /// <summary>
+    /// 生产者对象
+    /// </summary>
+    /// <typeparam name="TProduct">产品类型</typeparam>
+    public class Producer<TProduct> where TProduct : Producer<TProduct>
+    {
+
+        long _sequence;
+        private RingBuffer<TProduct> _ringBuffer;
+        public Producer()
+        {
+
+        }
+        public Producer(RingBuffer<TProduct> ringBuffer)
+        {
+            _ringBuffer = ringBuffer;
+        }
+        /// <summary>
+        /// 获取可修改的产品
+        /// </summary>
+        /// <returns></returns>
+        public Producer<TProduct> Enqueue()
+        {
+            long sequence = _ringBuffer.Next();
+            Producer<TProduct> producer = _ringBuffer[sequence];
+            producer._sequence = sequence;
+            if (producer._ringBuffer == null)
+                producer._ringBuffer = _ringBuffer;
+            return producer;
+        }
+        /// <summary>
+        /// 提交产品修改
+        /// </summary>
+        public void Commit()
+        {
+            _ringBuffer.Publish(_sequence);
+        }
+    }
+
+    /// <summary>
+    /// 产品/继承生产者
+    /// </summary>
+    public class Product : Producer<Product>
+    {
+        //产品包含的属下随便定义,无要求,只需要继承自生产者就行了
+        public long Value { get; set; }
+        public string Guid { get; set; }
+    }
+
+
+    /// <summary>
+    /// 消费处理对象
+    /// </summary>
+    public class WorkHandler : IWorkHandler<Product>
+    {
+        public Test test;
+        public void OnEvent(Product @event)
+        {
+
+            test.UpdateCacheByOut(@event.Guid);
+            //收到产品,在这里写处理代码
+
+        }
+
+    }
+
+
+
+    public class Test
+    {
+        public const int repeat = 1;
+
+
+
+        public static  long PrePkgInCount = 0;
+        public static  long PrePkgOutCount = 0;
+        public static  long PkgInCount = 0;
+        public static  long PkgOutCount = 0;
+          ConcurrentDictionary<string, string> InCache = new ConcurrentDictionary<string, string>();
+          ConcurrentDictionary<string, string> OutCache = new ConcurrentDictionary<string, string>();
+        private static long Seconds;
+        static bool qpsStarted = false;
+
+
+        //static int threadCount = 16 * 4;
+        static int threadCount = repeat * 2;
+
+        public void Main1()
+        {
+            //Workers<Product> workers = new Workers<Product>(
+            //new List<IWorkHandler<Product>>() { new WorkHandler() { test=this}, new WorkHandler() { test = this } });
+
+            Workers<Product> workers = new Workers<Product>(
+            Enumerable.Range(1, 2).Select(x =>(IWorkHandler<Product>) new WorkHandler() { test = this }).ToList());
+
+
+            var producerWorkerList = Enumerable.Range(1, threadCount).Select(x => workers.CreateOneProducer()).ToArray(); 
+
+
+            //Producer<Product> producerWorkers = workers.CreateOneProducer();
+            //Producer<Product> producerWorkers1 = workers.CreateOneProducer();
+
+            workers.Start();
+            if (!qpsStarted)
+            {
+                qpsStarted = true;
+                Task.Run(delegate
+                {
+                    while (true)
+                    {
+                        Thread.Sleep(1000);
+                        Seconds++;
+                        long intemp = PkgInCount;
+                        long outemp = PkgOutCount;
+                        Console.WriteLine(
+                            $"In ops={intemp - PrePkgInCount},out ops={outemp - PrePkgOutCount},inCacheCount={InCache.Count},OutCacheCount={OutCache.Count},RunningTime={Seconds}");
+                        PrePkgInCount = intemp;
+                        PrePkgOutCount = outemp;
+                    }
+
+                });
+            }
+
+            producerWorkerList.ToList().ForEach(w=> {
+
+                Task.Run(delegate { Run(w); });
+            }) ;
+
+            //Task.Run(delegate { Run(producerWorkers); });
+            //Task.Run(delegate { Run(producerWorkers); });
+            //Task.Run(delegate { Run(producerWorkers1); });
+            Console.Read();
+
+        }
+
+        public   void Run(Producer<Product> producer)
+        {
+            for (int i = 0; i < int.MaxValue; i++)
+            {
+
+                var obj = producer.Enqueue();
+                CheckRelease(obj as Product);
+                obj.Commit();
+            }
+        }
+
+        public   void CheckRelease(Product publisher)
+        {
+            //Interlocked.Increment(ref PkgInCount);
+            return; //不检查正确性
+            publisher.Guid = Guid.NewGuid().ToString();
+            InCache.TryAdd(publisher.Guid, string.Empty);
+        }
+
+        long count = 0; 
+
+        public   void UpdateCacheByOut(string guid)
+        {
+            if (Interlocked.Increment(ref count) % 10000 == 9999)
+            {
+                Interlocked.Add(ref PkgOutCount, 10000);
+            }
+
+            return;
+            //Interlocked.Increment(ref PkgOutCount);
+    
+            if (guid != null)
+                if (InCache.ContainsKey(guid))
+                {
+                    string str;
+                    InCache.TryRemove(guid, out str);
+                }
+                else
+                {
+                    OutCache.TryAdd(guid, string.Empty);
+                }
+
+        }
+
+    }
+}

+ 3 - 3
dotnet/Library/Sers/Sers.ServiceStation/Sers.ServiceStation/ServiceStation.cs

@@ -229,7 +229,7 @@ namespace Sers.ServiceStation
 
             communicationManage.conn_OnGetMessage = MessageClient.Instance.OnGetMessage;
 
-            communicationManage.conn_OnGetRequest = (IOrganizeConnection conn,Object sender, ArraySegment<byte> requestData, Action<object, List<ArraySegment<byte>>> callback) => 
+            communicationManage.conn_OnGetRequest = (IOrganizeConnection conn,Object sender, ArraySegment<byte> requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback) => 
             {
                 localApiService.CallApiAsync(sender, new ApiMessage(requestData), (object sender1, ApiMessage apiReplyMessage) =>
                 {
@@ -260,10 +260,10 @@ namespace Sers.ServiceStation
 
 
             #region (x.4) 初始化ApiClient
-            ApiClient.SetOnSendRequest(communicationManage.organizeList.Select(organize=> organize.conn).Select<IOrganizeConnection, Func<List<ArraySegment<byte>>, ArraySegment<byte>>>(
+            ApiClient.SetOnSendRequest(communicationManage.organizeList.Select(organize=> organize.conn).Select<IOrganizeConnection, Func<Vit.Core.Util.Pipelines.ByteData, ArraySegment<byte>>>(
                 conn =>
                 {
-                    return (req) => { conn.SendRequest(req, out var reply); return reply.ByteDataToArraySegment(); };
+                    return (req) => { conn.SendRequest(req, out var reply); return reply.ToArraySegment(); };
                 }
                 ).ToArray());
             #endregion

+ 21 - 12
dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/ArraySegmentByteExtensions.cs

@@ -1,5 +1,6 @@
 using Vit.Core.Module.Serialization;
 using System;
+using System.Runtime.CompilerServices;
 
 namespace Vit.Extensions
 {
@@ -12,28 +13,24 @@ namespace Vit.Extensions
         //    return new ArraySegment<T>(new T[0], 0,0);
         //}
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static bool HasData<T>(this ArraySegment<T> seg)
         {
             return seg!=null && seg.Array!=null && seg.Count>0;
         }
 
-
-
-        internal static void CopyTo<T>(this ArraySegment<T> seg,T[] bytes,int curIndex=0)
-        {
-            Array.Copy(seg.Array, seg.Offset, bytes, curIndex, seg.Count); 
-        }
-
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static ArraySegment<T> Slice<T>(this ArraySegment<T> seg,int Offset,int? count=null)
         {
             return new ArraySegment<T>(seg.Array,seg.Offset+ Offset, count?? (seg.Count-Offset) );
         }
-     
+
 
 
 
         #region ArraySegmentByte <--> String
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static string ArraySegmentByteToString(this ArraySegment<byte> data)
         {
             if (null == data || data.Array==null) return null;
@@ -41,6 +38,7 @@ namespace Vit.Extensions
         }
 
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static ArraySegment<byte> StringToArraySegmentByte(this string data)
         {
             return (null == data) ? Null : Serialization.Instance.encoding.GetBytes(data).BytesToArraySegmentByte();
@@ -49,7 +47,8 @@ namespace Vit.Extensions
 
 
         #region ArraySegmentByte <--> bytes
- 
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static byte[] ArraySegmentByteToBytes(this ArraySegment<byte> data)
         {
             if (null == data) return null;
@@ -57,24 +56,32 @@ namespace Vit.Extensions
 
             var bytes = new byte[data.Count];
             if (data.Count > 0)
-            {
-                data.CopyTo(bytes);
+            {              
+                unsafe
+                {
+                    fixed (byte* pSource = data.Array, pTarget = bytes)
+                    {
+                        Buffer.MemoryCopy(pSource + data.Offset, pTarget, data.Count, data.Count);
+                    }
+                }
             }
             return bytes;
         }
 
- 
+
         #endregion
 
 
         #region ArraySegmentByte <--> Int32
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static Int32 ArraySegmentByteToInt32(this ArraySegment<byte> data,int startIndex=0)
         {             
             return  BitConverter.ToInt32(data.Array, data.Offset+startIndex);
         }
 
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static ArraySegment<byte> Int32ToArraySegmentByte(this Int32 data)
         {
             return BitConverter.GetBytes(data).BytesToArraySegmentByte();
@@ -84,12 +91,14 @@ namespace Vit.Extensions
 
         #region ArraySegmentByte <--> Int64
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static Int64 ArraySegmentByteToInt64(this ArraySegment<byte> data, int startIndex = 0)
         {
             return BitConverter.ToInt64(data.Array, data.Offset + startIndex);
         }
 
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static ArraySegment<byte> Int64ToArraySegmentByte(this Int64 data)
         {
             return BitConverter.GetBytes(data).BytesToArraySegmentByte();

+ 3 - 0
dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/Base64StringExtensions.cs

@@ -1,4 +1,5 @@
 using System;
+using System.Runtime.CompilerServices;
 
 namespace Vit.Extensions
 {
@@ -7,12 +8,14 @@ namespace Vit.Extensions
 
         #region bytes <-->  Base64String
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static string BytesToBase64String(this byte[] data)
         {
             return Convert.ToBase64String(data);
         }
 
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static byte[] Base64StringToBytes(this string data)
         {
             return Convert.FromBase64String(data);

+ 0 - 173
dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/ByteDataExtensions.cs

@@ -1,173 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace Vit.Extensions
-{
-    public static partial class ByteDataExtensions
-    {
-
-        #region ByteDataCount
-
-        
-        public static int ByteDataCount(this List<ArraySegment<byte>> byteData)
-        {
-            if (null == byteData || byteData.Count == 0) return 0;
-            int count = 0;
-
-            foreach (var item in byteData)
-            {
-                if (null != item)
-                {
-                    count += item.Count;
-                }
-            }
-            return count;
-        }
-        #endregion
-
-
-        #region ByteData <--> Bytes
-
-        public static byte[] ByteDataToBytes(this List<ArraySegment<byte>> byteData)
-        {
-            int count = 0;
-            foreach (var item in byteData)
-            {
-                count += item.Count;
-            }
-
-            var bytes = new byte[count];
-
-            int curIndex = 0;
-            foreach (var item in byteData)
-            {
-                if (null == item.Array || item.Count == 0) continue;                
-                     
-                item.CopyTo(bytes, curIndex);
-
-                curIndex += item.Count;
-            }
-            return bytes;
-        }
-
-
-        #endregion
-
-        #region ByteData <--> ArraySegment
-
-        public static ArraySegment<byte> ByteDataToArraySegment(this List<ArraySegment<byte>> byteData)
-        {
-            var bytes = byteData?.ByteDataToBytes();
-            return bytes==null? ArraySegmentByteExtensions.Null:new ArraySegment<byte>(bytes);
-        }
-
-
-        #endregion
-
-
-        #region ByteData <--> String
-
-        public static string ByteDataToString(this List<ArraySegment<byte>> data)
-        {
-            return data.ByteDataToBytes().BytesToString();
-        }
-
-
-        public static List<ArraySegment<byte>> StringToByteData(this string data)
-        {
-            return new List<ArraySegment<byte>> { data.StringToArraySegmentByte() };
-        }
-        #endregion
-
-
-
-        #region ByteDataPopInt32
-
-        /// <summary>
-        /// data的长度 必须大于等于4
-        /// </summary>
-        /// <param name="data"></param>
-        /// <returns></returns>
-        public static int ByteDataPopInt32(this List<ArraySegment<byte>> data)
-        {
-            return data.ByteDataPopBytes(new byte[4]).BytesToInt32();
-        }
-        #endregion
-
-
-
-        #region ByteDataPopByteData
-
-        /// <summary>
-        /// data 的长度 必须大于等于 dataToPop 的长度
-        /// </summary>
-        /// <param name="data"></param>
-        /// <param name="dataToPop"></param>
-        /// <returns></returns>
-        public static byte[] ByteDataPopBytes(this List<ArraySegment<byte>> data, byte[] dataToPop)
-        {
-            int lenToPop = dataToPop.Length;
-          
-
-            int copyedIndex = 0;
-         
-            while (copyedIndex < lenToPop)
-            {
-                int leftCount = lenToPop - copyedIndex;
-
-                var cur = data[0];
-                if (cur.Count <= leftCount)
-                {
-                    Array.Copy(cur.Array, cur.Offset, dataToPop, copyedIndex, cur.Count);
-                    copyedIndex += cur.Count;
-                    data.RemoveAt(0);                 
-                }
-                else
-                {
-                   
-                    Array.Copy(cur.Array, cur.Offset, dataToPop, copyedIndex, leftCount);
-                    copyedIndex += leftCount;
-                    data[0] = cur.Slice(leftCount);
-                }
-            }
-            return dataToPop;
-        }
-        #endregion
-
-
-
-        #region ByteDataPopByteData
-
-        /// <summary>
-        /// data 的长度 必须大于等于lenToPop
-        /// </summary>
-        /// <param name="data"></param>
-        /// <param name="lenToPop"></param>
-        /// <returns></returns>
-        public static List<ArraySegment<byte>> ByteDataPopByteData(this List<ArraySegment<byte>> data, int lenToPop)
-        {
-            List<ArraySegment<byte>> dataToPop = new List<ArraySegment<byte>>();
-
-            int leftCount = lenToPop;
-            while (leftCount > 0)
-            {
-                var cur = data[0];
-                if (cur.Count <= leftCount)
-                {
-                    leftCount -= cur.Count;
-                    data.RemoveAt(0);
-                    dataToPop.Add(cur);
-                }
-                else
-                {
-                    dataToPop.Add(cur.Slice(0, leftCount));
-                    data[0] = cur.Slice(leftCount);
-                    leftCount = 0;
-                }
-            }
-            return dataToPop;
-        }
-        #endregion
-    }
-}

+ 10 - 1
dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/BytesExtensions.cs

@@ -1,5 +1,6 @@
 using System;
 using System.Collections.Generic;
+using System.Runtime.CompilerServices;
 using System.Text;
 using Vit.Core.Module.Serialization;
 
@@ -9,13 +10,15 @@ namespace Vit.Extensions
     {
 
         #region bytes <--> String
-    
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static string BytesToString(this byte[] data, Encoding encoding = null)
         {             
             return Serialization.Instance.BytesToString(data, encoding); 
         }
 
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static byte[] StringToBytes(this string data, Encoding encoding = null)
         {
             return Serialization.Instance.StringToBytes(data, encoding);
@@ -24,12 +27,14 @@ namespace Vit.Extensions
 
         #region bytes <--> Int32
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static Int32 BytesToInt32(this byte[] data,int startIndex=0)
         {
             return  BitConverter.ToInt32(data, startIndex);
         }
 
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static byte[] Int32ToBytes(this Int32 data)
         {
             return BitConverter.GetBytes(data);
@@ -38,12 +43,14 @@ namespace Vit.Extensions
 
         #region bytes <--> Int64
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static Int64 BytesToInt64(this byte[] data, int startIndex = 0)
         {
             return BitConverter.ToInt64(data, startIndex);
         }
 
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static byte[] Int64ToBytes(this Int64 data)
         {
             return BitConverter.GetBytes(data);
@@ -51,12 +58,14 @@ namespace Vit.Extensions
         #endregion
 
         #region bytes -> ArraySegmentByte
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static ArraySegment<byte> BytesToArraySegmentByte(this byte[] bytes)
         {
             return null == bytes? ArraySegmentByteExtensions.Null: new ArraySegment<byte>(bytes, 0,bytes.Length);
         }
 
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public static List<ArraySegment<byte>> BytesToByteData(this byte[] bytes)
         {
             return null == bytes?null: new List<ArraySegment<byte>> { bytes.BytesToArraySegmentByte() };

+ 30 - 0
dotnet/Library/Vit/Vit.Core/Vit.Core/Extensions/byte/DataCopyExtensions.cs

@@ -0,0 +1,30 @@
+using Vit.Core.Module.Serialization;
+using System;
+
+namespace Vit.Extensions
+{
+    public static partial class DataCopyExtensions
+    {
+
+
+ 
+        internal static void CopyTo<T>(this ArraySegment<T> seg, T[] bytes, int curIndex = 0)
+        {
+            //Array.Copy(seg.Array, seg.Offset, bytes, curIndex, seg.Count);
+            Buffer.BlockCopy(seg.Array, seg.Offset, bytes, curIndex, seg.Count);
+
+
+            //data.CopyTo(bytes);
+
+            //unsafe
+            //{
+            //    fixed (byte* pSource = seg.Array, pTarget = bytes)
+            //    {
+            //        Buffer.MemoryCopy(pSource + data.Offset, pTarget, data.Count, data.Count);
+            //    }
+            //}
+        }
+
+
+    }
+}

+ 214 - 0
dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pipelines/ByteData.cs

@@ -0,0 +1,214 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using Vit.Extensions;
+
+namespace Vit.Core.Util.Pipelines
+{
+    /// <summary>
+    /// 
+    /// .net core中的高效动态内存管理方案
+    /// https://www.cnblogs.com/TianFang/p/10084049.html
+    /// 
+    /// .net core中的System.Buffers名字空间
+    /// https://www.cnblogs.com/TianFang/p/9193881.html
+    /// 
+    /// 
+    /// Span<T> —— .NET Core 高效运行的新基石
+    /// https://blog.csdn.net/wnvalentin/article/details/93485572
+    /// 
+    /// 
+    /// </summary>
+    public class ByteData : IEnumerable<ArraySegment<byte>>
+    {
+
+        public readonly List<ArraySegment<byte>> byteData;
+
+        public ByteData()
+        {
+            byteData = new List<ArraySegment<byte>>();
+        }
+
+        public ByteData(int capacity)
+        {
+            byteData = new List<ArraySegment<byte>>(capacity);
+        }
+
+        public ByteData(List<ArraySegment<byte>> byteData)
+        {
+            this.byteData = byteData;
+        }
+
+
+        #region implicit
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public static implicit operator ByteData(List<ArraySegment<byte>> byteData)
+        {
+            return new ByteData(byteData);
+        }
+        #endregion
+
+
+
+
+
+
+        #region List
+
+
+
+
+        #region Add
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public ByteData Add(ArraySegment<byte> data)
+        {
+            byteData.Add(data);
+            return this;
+        }
+        #endregion
+
+
+
+        #region []    
+
+        public ArraySegment<byte> this[int index]
+        {
+            get
+            {
+                return byteData[index];
+            }
+
+            set
+            {
+                byteData[index] = value;
+            }
+        }
+        #endregion
+
+
+
+        #region AddRange
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public void AddRange(IEnumerable<ArraySegment<byte>> collection)
+        {
+            byteData.AddRange(collection);
+        }
+        #endregion
+
+
+
+        #region Insert
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public void Insert(int index, ArraySegment<byte> item)
+        {
+            byteData.Insert(index, item);
+        }
+        #endregion
+
+
+
+        #region IEnumerable
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public IEnumerator<ArraySegment<byte>> GetEnumerator()
+        {
+            return byteData.GetEnumerator();
+        }
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        IEnumerator IEnumerable.GetEnumerator()
+        {
+            return byteData.GetEnumerator();
+        }
+        #endregion
+
+
+
+        #endregion
+
+
+               
+
+
+
+        #region Count
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public int Count()
+        {
+            if (byteData.Count == 0) return 0;
+            int count = 0;
+
+            foreach (var item in byteData)
+            {
+                if (null != item)
+                {
+                    count += item.Count;
+                }
+            }
+            return count;
+        }
+        #endregion
+
+
+
+
+        #region ToBytes
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public byte[] ToBytes()
+        {
+            int count = 0;
+            foreach (var item in byteData)
+            {
+                count += item.Count;
+            }
+
+            var bytes = new byte[count];
+
+            int curIndex = 0;
+            foreach (var item in byteData)
+            {
+                if (null == item.Array || item.Count == 0) continue;
+
+                //item.CopyTo(bytes, curIndex);
+                //Buffer.BlockCopy(item.Array, item.Offset, bytes, curIndex, item.Count);
+
+                unsafe
+                {
+                    fixed (byte* pSource = item.Array, pTarget = bytes)
+                    {
+                        Buffer.MemoryCopy(pSource + item.Offset, pTarget + curIndex, item.Count, item.Count);
+                    }
+                }
+
+                curIndex += item.Count;
+            }
+            return bytes;
+        }
+        #endregion
+
+
+
+        #region ToArraySegment
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public ArraySegment<byte> ToArraySegment()
+        {
+            return new ArraySegment<byte>(ToBytes());
+        }
+
+        #endregion
+
+
+        #region ByteDataToString
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        public string ByteDataToString()
+        {
+            return ToBytes().BytesToString();
+        }
+        #endregion
+    }
+}

+ 6 - 7
dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/DataPool.cs

@@ -1,7 +1,6 @@
 using System;
 using System.Buffers;
-
-using ByteData = System.Collections.Generic.List<System.ArraySegment<byte>>;
+using System.Collections.Generic;
 
 namespace Vit.Core.Util.Pool
 {
@@ -38,16 +37,16 @@ namespace Vit.Core.Util.Pool
 
         #region ByteData
  
-        public static ByteData ByteDataGet()
+        public static List<System.ArraySegment<byte>> ByteDataGet()
         {
-            //return new ByteData();
-            return ObjectPool<ByteData>.Shared.Pop();
+            //return new List<System.ArraySegment<byte>>();
+            return ObjectPool<List<System.ArraySegment<byte>>>.Shared.Pop();
         }
 
-        internal static void ByteDataReturn(ByteData data)
+        internal static void ByteDataReturn(List<System.ArraySegment<byte>> data)
         {
             //return;
-            ObjectPool<ByteData>.Shared.Push(data);
+            ObjectPool<List<System.ArraySegment<byte>>>.Shared.Push(data);
         }
         #endregion
 

+ 6 - 6
dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/ObjectPool.cs

@@ -1,15 +1,15 @@
-using System.Collections.Concurrent;
-
+ 
 namespace Vit.Core.Util.Pool
 {
     public class ObjectPool<T>
-        where T:new()
+        where T:class,new()
     {
 
-        public static ObjectPool<T> Shared = new ObjectPool<T>();
+        public static readonly ObjectPool<T> Shared = new ObjectPool<T>();
+
 
 
-        private ConcurrentBag<T> _objects = new ConcurrentBag<T>();
+        private PoolCache<T> _objects = new PoolCache<T>();
 
         /// <summary>
         /// Gets or sets the total number of elements the internal data structure can hold without resizing.(default:100000)
@@ -25,7 +25,7 @@ namespace Vit.Core.Util.Pool
 
         public T PopOrNull()
         {
-            return _objects.TryTake(out var item) ? item : default(T);
+            return _objects.TryTake(out var item) ? item : null;
         }
 
         public void Push(T item)

+ 3 - 6
dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/ObjectPoolByGenerator.cs

@@ -1,23 +1,20 @@
 using System;
-using System.Collections.Concurrent;
-
 
 namespace Vit.Core.Util.Pool
 {
     public class ObjectPoolGenerator<T>       
     {
-        public static ObjectPoolGenerator<T> Shared = new ObjectPoolGenerator<T>();
+        public static readonly ObjectPoolGenerator<T> Shared = new ObjectPoolGenerator<T>();
 
 
-        private ConcurrentBag<T> _objects;
+        private PoolCache<T> _objects = new PoolCache<T>(); 
 
         public Func<T> objectGenerator;
 
         public ObjectPoolGenerator(Func<T> objectGenerator=null)
         {
             this.objectGenerator = objectGenerator;
-            //this.objectGenerator = objectGenerator ?? throw new ArgumentNullException("objectGenerator");
-            _objects = new ConcurrentBag<T>();
+            //this.objectGenerator = objectGenerator ?? throw new ArgumentNullException("objectGenerator");          
         }
 
         public T Pop()

+ 58 - 0
dotnet/Library/Vit/Vit.Core/Vit.Core/Util/Pool/PoolCache.cs

@@ -0,0 +1,58 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Vit.Core.Util.Pool
+{
+    public class PoolCache<T>
+    {
+
+        #region 使用队列 ConcurrentQueue       
+        ///*
+        ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
+
+
+        public int Count => queue.Count;
+
+        public bool TryTake(out T result) 
+        {
+            return queue.TryDequeue(out result);
+        }
+
+        public void Add(T item) 
+        {
+            queue.Enqueue(item);
+        }
+        //*/
+        #endregion
+
+
+        #region 使用队列 ConcurrentBag       
+        /*
+        ConcurrentBag<T> queue = new ConcurrentBag<T>();
+
+
+        public int Count => queue.Count;
+
+        public bool TryTake(out T result) 
+        {
+            return queue.TryTake(out result);
+        }
+
+        public void Add(T item) 
+        {
+            queue.Add(item);
+        }
+        //*/
+        #endregion
+
+
+
+
+
+
+
+
+    }
+}

+ 5 - 1
dotnet/Library/Vit/Vit.Core/Vit.Core/Vit.Core.csproj

@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
     <TargetFramework>netstandard2.0</TargetFramework>
@@ -10,6 +10,10 @@
     <DocumentationFile>bin\Debug\netstandard2.0\Vit.Core.xml</DocumentationFile>
   </PropertyGroup>
 
+  <PropertyGroup>
+    <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
+  </PropertyGroup>
+
   <ItemGroup>
     <Compile Remove="Extensions\byte\ReadOnlySpanByteExtensions.cs" />
     <Compile Remove="Extensions\byte\SpanByteExtensions.cs" />

+ 11 - 0
dotnet/Sers.sln

@@ -167,6 +167,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DeliveryClient", "Library\S
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DeliveryServer", "Library\Sers\Sers.CL\Test\MessageDelivery\DeliveryServer\DeliveryServer.csproj", "{67733A68-0505-4977-BC52-8200D536005B}"
 EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sers.Core.Util.Consumer.Test", "Library\Sers\Sers.Core\Test\Sers.Core.Util.Consumer.Test\Sers.Core.Util.Consumer.Test.csproj", "{B45BBA33-E315-409E-B44E-97FA40B33BFB}"
+EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution
 		Debug|Any CPU = Debug|Any CPU
@@ -535,6 +537,14 @@ Global
 		{67733A68-0505-4977-BC52-8200D536005B}.Release|Any CPU.Build.0 = Release|Any CPU
 		{67733A68-0505-4977-BC52-8200D536005B}.Release|x86.ActiveCfg = Release|Any CPU
 		{67733A68-0505-4977-BC52-8200D536005B}.Release|x86.Build.0 = Release|Any CPU
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB}.Debug|x86.ActiveCfg = Debug|Any CPU
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB}.Debug|x86.Build.0 = Debug|Any CPU
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB}.Release|Any CPU.Build.0 = Release|Any CPU
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB}.Release|x86.ActiveCfg = Release|Any CPU
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB}.Release|x86.Build.0 = Release|Any CPU
 	EndGlobalSection
 	GlobalSection(SolutionProperties) = preSolution
 		HideSolutionNode = FALSE
@@ -618,6 +628,7 @@ Global
 		{B21B5462-1244-48BE-BA6B-B46F69DD9F66} = {37633A5F-54B5-4179-8BCD-B7B422B4857C}
 		{72525D02-A366-4470-8BFB-797F26C3A6A7} = {4E874DC3-754A-4FE2-92B5-98ADFF74B369}
 		{67733A68-0505-4977-BC52-8200D536005B} = {4E874DC3-754A-4FE2-92B5-98ADFF74B369}
+		{B45BBA33-E315-409E-B44E-97FA40B33BFB} = {37633A5F-54B5-4179-8BCD-B7B422B4857C}
 	EndGlobalSection
 	GlobalSection(ExtensibilityGlobals) = postSolution
 		SolutionGuid = {C7DA16E3-9949-49FA-B0B4-F830636DE60F}

+ 3 - 3
dotnet/ServiceCenter/Sers.ServiceCenter/Apm/Sers.Gover.Apm.Zipkin/AppEvent.cs

@@ -19,7 +19,7 @@ namespace Sers.Gover.Apm.Zipkin
     public class AppEvent : IAppEvent
     {        
 
-        Action<Object, List<ArraySegment<byte>>> ApiScopeEvent(IRpcContextData rpcData, ApiMessage apiRequestMessage)
+        Action<Object, Vit.Core.Util.Pipelines.ByteData> ApiScopeEvent(IRpcContextData rpcData, ApiMessage apiRequestMessage)
         {
             //记录请求数据
 
@@ -150,7 +150,7 @@ namespace Sers.Gover.Apm.Zipkin
                                 if (apiResponseMessage == null)
                                 {
                                     apiResponseMessage = new ApiMessage();
-                                    apiResponseMessage.Unpack(apiReplyMessage.ByteDataToArraySegment());
+                                    apiResponseMessage.Unpack(apiReplyMessage.ToArraySegment());
                                 }
                                 if (responseRpc_oriString == null)
                                 {
@@ -170,7 +170,7 @@ namespace Sers.Gover.Apm.Zipkin
                                 if (apiResponseMessage == null)
                                 {
                                     apiResponseMessage = new ApiMessage();
-                                    apiResponseMessage.Unpack(apiReplyMessage.ByteDataToArraySegment());
+                                    apiResponseMessage.Unpack(apiReplyMessage.ToArraySegment());
                                 }
                                 if (responseData_oriString == null)
                                 {

+ 4 - 4
dotnet/ServiceCenter/Sers.ServiceCenter/Sers.Gover/Base/GoverManage.cs

@@ -104,7 +104,7 @@ namespace Sers.Gover.Base
 
         #region CallApi
 
-        public override void CallApiAsync(IRpcContextData rpcData, ApiMessage requestMessage, Object sender, Action<object, List<ArraySegment<byte>>> callback)
+        public override void CallApiAsync(IRpcContextData rpcData, ApiMessage requestMessage, Object sender, Action<object, Vit.Core.Util.Pipelines.ByteData> callback)
         {
             try
             {
@@ -339,14 +339,14 @@ namespace Sers.Gover.Base
         /// <summary>
         /// 
         /// </summary>
-        List<Func<IRpcContextData, ApiMessage, Action<Object, List<ArraySegment<byte>>>>> apiScopeEventList = null;
+        List<Func<IRpcContextData, ApiMessage, Action<Object, Vit.Core.Util.Pipelines.ByteData>>> apiScopeEventList = null;
         /// <summary>
         /// 在调用api前调用onScope,若onScope返回的结果(onDispose)不为空,则在api调用结束前调用onDispose
         /// </summary>
         /// <param name="apiScopeEvent"></param>
-        public void AddApiScopeEvent(Func<IRpcContextData, ApiMessage, Action<Object, List<ArraySegment<byte>>>> apiScopeEvent) 
+        public void AddApiScopeEvent(Func<IRpcContextData, ApiMessage, Action<Object, Vit.Core.Util.Pipelines.ByteData>> apiScopeEvent) 
         {
-            if (apiScopeEventList == null) apiScopeEventList=new List<Func<IRpcContextData, ApiMessage, Action<Object, List<ArraySegment<byte>>>>>();
+            if (apiScopeEventList == null) apiScopeEventList=new List<Func<IRpcContextData, ApiMessage, Action<Object, Vit.Core.Util.Pipelines.ByteData>>>();
 
             apiScopeEventList.Add(apiScopeEvent);
         }

+ 4 - 4
dotnet/ServiceCenter/Sers.ServiceCenter/Sers.ServiceCenter/ApiCenter/ApiCenterService.cs

@@ -17,7 +17,7 @@ namespace Sers.ServiceCenter.ApiCenter
     public abstract class ApiCenterService
     { 
  
-        public void CallApiAsync(IOrganizeConnection  conn, Object sender, ArraySegment<byte> apiRequest, Action<object, List<ArraySegment<byte>>> callback)
+        public void CallApiAsync(IOrganizeConnection  conn, Object sender, ArraySegment<byte> apiRequest, Action<object, Vit.Core.Util.Pipelines.ByteData> callback)
         {
             CommunicationManageServer.CurConn = conn;
 
@@ -31,9 +31,9 @@ namespace Sers.ServiceCenter.ApiCenter
 
         #region CallApi
         ObjectPoolGenerator<AutoResetEvent> pool_AutoResetEvent = new ObjectPoolGenerator<AutoResetEvent>(() => new AutoResetEvent(false));
-        public bool CallApi(IOrganizeConnection  conn, ArraySegment<byte> apiRequest, out List<ArraySegment<byte>> replyData, int requestTimeoutMs)
+        public bool CallApi(IOrganizeConnection  conn, ArraySegment<byte> apiRequest, out Vit.Core.Util.Pipelines.ByteData replyData, int requestTimeoutMs)
         {
-            List<ArraySegment<byte>> _replyData = null;
+            Vit.Core.Util.Pipelines.ByteData _replyData = null;
 
             AutoResetEvent mEvent = pool_AutoResetEvent.Pop();
             mEvent.Reset();
@@ -76,7 +76,7 @@ namespace Sers.ServiceCenter.ApiCenter
         [JsonIgnore]
         public Action<IRpcContextData, ApiMessage> BeforeCallApi;
 
-        public abstract void CallApiAsync(IRpcContextData rpcData, ApiMessage requestMessage, Object sender, Action<object, List<ArraySegment<byte>>> callback);
+        public abstract void CallApiAsync(IRpcContextData rpcData, ApiMessage requestMessage, Object sender, Action<object, Vit.Core.Util.Pipelines.ByteData> callback);
 
 
 

+ 1 - 1
dotnet/ServiceCenter/Sers.ServiceCenter/Sers.ServiceCenter/Entity/ApiNode.cs

@@ -34,7 +34,7 @@ namespace Sers.ServiceCenter.Entity
         #endregion
 
 
-        public void CallApiAsync(IRpcContextData rpcContextData, ApiMessage reqMessage, Object sender,  Action<object, List<ArraySegment<byte>>> callback)
+        public void CallApiAsync(IRpcContextData rpcContextData, ApiMessage reqMessage, Object sender,  Action<object, Vit.Core.Util.Pipelines.ByteData> callback)
         {
             //count
             bool success = true;

+ 1 - 1
dotnet/ServiceCenter/Sers.ServiceCenter/Sers.ServiceCenter/Entity/ServiceStation.cs

@@ -97,7 +97,7 @@ namespace Sers.ServiceCenter.Entity
         }
 
      
-        public void SendRequestAsync(Object sender, ApiMessage apiReqMessage, Action<object, List<ArraySegment<byte>>> callback)
+        public void SendRequestAsync(Object sender, ApiMessage apiReqMessage, Action<object, Vit.Core.Util.Pipelines.ByteData> callback)
         {
             connection.SendRequestAsync(sender, apiReqMessage.Package(), callback);            
         }

+ 10 - 10
dotnet/ServiceCenter/Sers.ServiceCenter/Sers.ServiceCenter/ServiceCenter.cs

@@ -97,14 +97,14 @@ namespace Sers.ServiceCenter
             }
 
 
-            public void SendRequestAsync(Object sender, List<ArraySegment<byte>> requestData, Action<object, List<ArraySegment<byte>>> callback)
+            public void SendRequestAsync(Object sender, Vit.Core.Util.Pipelines.ByteData requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback)
             {
-                localApiService.CallApiAsync(sender, new ApiMessage(requestData.ByteDataToArraySegment()), (sender_, apiReplyMessage) =>
+                localApiService.CallApiAsync(sender, new ApiMessage(requestData.ToArraySegment()), (sender_, apiReplyMessage) =>
                 {
                     callback(sender_, apiReplyMessage.Package());
                 });
             }
-            public bool SendRequest(List<ArraySegment<byte>> requestData, out List<ArraySegment<byte>> replyData)
+            public bool SendRequest(Vit.Core.Util.Pipelines.ByteData requestData, out Vit.Core.Util.Pipelines.ByteData replyData)
             {
                 Logger.Error(new NotImplementedException());
                 throw new NotImplementedException();
@@ -112,9 +112,9 @@ namespace Sers.ServiceCenter
 
        
 
-            public void SendMessageAsync(List<ArraySegment<byte>> message)
+            public void SendMessageAsync(Vit.Core.Util.Pipelines.ByteData message)
             {
-                MessageClient.Instance.OnGetMessage(this, message.ByteDataToArraySegment());
+                MessageClient.Instance.OnGetMessage(this, message.ToArraySegment());
             }
             public void Close() 
             {
@@ -319,11 +319,11 @@ namespace Sers.ServiceCenter
 
 
             #region (x.6) 初始化ApiClient
-            Func<List<ArraySegment<byte>>, ArraySegment<byte>> OnSendRequest = ((List<ArraySegment<byte>> apiReqMessage) =>
+            Func<Vit.Core.Util.Pipelines.ByteData, ArraySegment<byte>> OnSendRequest = ((Vit.Core.Util.Pipelines.ByteData apiReqMessage) =>
             {
-                apiCenterService.CallApi(connForLocalStationService, apiReqMessage.ByteDataToArraySegment(),
+                apiCenterService.CallApi(connForLocalStationService, apiReqMessage.ToArraySegment(),
                     out var replyData, communicationManage.requestTimeoutMs);
-                return replyData.ByteDataToArraySegment();
+                return replyData.ToArraySegment();
             });
 
             ApiClient.SetOnSendRequest(new[] { OnSendRequest });
@@ -331,9 +331,9 @@ namespace Sers.ServiceCenter
 
 
             #region (x.7) 桥接MessageClient 和 MessageCenterService
-            MessageClient.Instance.OnSendMessage = (List<ArraySegment<byte>> messageData) =>
+            MessageClient.Instance.OnSendMessage = (Vit.Core.Util.Pipelines.ByteData messageData) =>
             {
-                MessageCenterService.Instance.OnGetMessage(connForLocalStationService, messageData.ByteDataToBytes().BytesToArraySegmentByte());
+                MessageCenterService.Instance.OnGetMessage(connForLocalStationService, messageData.ToBytes().BytesToArraySegmentByte());
             };
             #endregion
 

+ 1 - 1
dotnet/ServiceStation/Demo/SersLoader/Did.SersLoader.Demo/Controllers/Demo/SampleController.cs

@@ -535,7 +535,7 @@ namespace Did.SersLoader.Demo.Controllers.Demo
 
             #region file from request
             var apiRequest = RpcContext.Current.apiRequestMessage;
-            if (apiRequest.Files.Count >= 3)
+            if (apiRequest.Files.Count() >= 3)
             {
                 var file3 = apiRequest.Files[2];
                 apiReply.AddFiles(file3);

+ 12 - 0
dotnet/todo.txt

@@ -1,5 +1,17 @@
 CUR:
 
+提取ByteData
+优化ByteData
+
+
+优化Consumer_WorkerPoolCascade,
+替换Consumer_BlockingCollection
+
+LocalApiService.cs BlockingCollection
+查询所有的队列 酌情优化
+
+ConcurrentBag比较慢 弃用
+
 
 Sers.Core\CL\MessageOrganize\DefaultOrganize\RequestAdaptor.cs L240  使用二级缓冲队列(以及所有使用 BlockingCollection 的地方)