lith 4 anni fa
parent
commit
6baeac7ea0

+ 62 - 36
dotnet/Library/Sers/Sers.CL/Socket/Sers.CL.Socket.Iocp/Mode/Timer/DeliveryConnection.cs

@@ -16,12 +16,10 @@ namespace Sers.CL.Socket.Iocp.Mode.Timer
 
         ConcurrentQueue<ByteData> frameQueueToSend = new ConcurrentQueue<ByteData>();
 
-
         const int buffLength = 1024;
         ByteData[] buffer = new ByteData[buffLength];
         int[] bufferItemCount = new int[buffLength];
 
-
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public override void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
         {
@@ -57,17 +55,8 @@ namespace Sers.CL.Socket.Iocp.Mode.Timer
                             break;
                         }
                     }
-                    var bytes = ByteDataArrayToBytes(buffer, curIndex);
-                    try
-                    {
-                        socket.SendAsync(bytes.BytesToArraySegmentByte(), SocketFlags.None);
-                        //socket.SendAsync(data, SocketFlags.None);
-                    }
-                    catch (Exception ex)
-                    {
-                        Logger.Error(ex);
-                        Close();
-                    }
+
+                    FlushData(curIndex);
 
                     if (curIndex < buffLength)
                     {
@@ -78,58 +67,95 @@ namespace Sers.CL.Socket.Iocp.Mode.Timer
             catch (Exception ex)
             {
                 Logger.Error(ex);
+                Close();
             }
         }
 
-
-
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="stopIndex">不包含</param>
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
-        unsafe byte[] ByteDataArrayToBytes(ByteData[] byteDataArray, int arrayCount)
-        {
-            int sumCount = 0;
+        void FlushData(int stopIndex)
+        {      
             int curCount;
-            int arrayIndex;
+            ByteData byteData;
+            byte[] bytes;
 
-            //(x.1)get count
-            for (arrayIndex = 0; arrayIndex < arrayCount; arrayIndex++)
+            int sumCount = 0;
+            int startIndex = 0;
+            int curIndex = 0;
+            while (true)
             {
-                var byteData = byteDataArray[arrayIndex];
+                byteData = buffer[curIndex];
+
+                //(x.1)get count
                 curCount = 0;
                 foreach (var item in byteData.byteArrayList)
                 {
                     curCount += item.Count;
                 }
-                bufferItemCount[arrayIndex] = curCount;
+                bufferItemCount[curIndex] = curCount;
                 sumCount += curCount;
-            }
 
+                curIndex++;
 
-            //(x.2)copy data
-            var bytes = new byte[sumCount + arrayCount * 4];
-            arrayIndex = 0;
-            curCount = 0;
+
+                //(x.2)
+                if (curIndex == stopIndex)
+                {
+                    bytes = BufferToBytes(startIndex, curIndex, sumCount);
+                    socket.SendAsync(bytes.BytesToArraySegmentByte(), SocketFlags.None);
+                    return;
+                }
+
+
+                //(x.3)
+                if (sumCount >= 1_000_000)
+                {
+                    bytes = BufferToBytes(startIndex, curIndex, sumCount);
+                    socket.SendAsync(bytes.BytesToArraySegmentByte(), SocketFlags.None);
+
+                    sumCount = 0;
+                    startIndex = curIndex;
+                }           
+            }
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="startIndex"></param>
+        /// <param name="stopIndex">不包含</param>
+        /// <param name="sumCount"></param>
+        /// <returns></returns>
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        unsafe byte[] BufferToBytes(int startIndex, int stopIndex,int sumCount)
+        {
+            var bytes = new byte[sumCount + (stopIndex - startIndex) * 4];
 
             int curLength;
 
             fixed (byte* pTarget = bytes)
             {
-                for (arrayIndex = 0; arrayIndex < arrayCount; arrayIndex++)
+                int dataIndex = 0;
+                for (int curIndex = startIndex; curIndex < stopIndex; curIndex++)
                 {
-                    var byteData = byteDataArray[arrayIndex];
-                    ((int*)(pTarget + curCount))[0] = curLength = bufferItemCount[arrayIndex];
-                    curCount += 4;
+                    var byteData = buffer[curIndex];
+
+                    ((int*)(pTarget + dataIndex))[0] = curLength = bufferItemCount[curIndex];
+                    dataIndex += 4;
 
                     foreach (var item in byteData.byteArrayList)
                     {
                         if (null == item.Array || item.Count == 0) continue;
                         fixed (byte* pSource = item.Array)
                         {
-                            Buffer.MemoryCopy(pSource + item.Offset, pTarget + curCount, item.Count, item.Count);
+                            Buffer.MemoryCopy(pSource + item.Offset, pTarget + dataIndex, item.Count, item.Count);
                         }
-                        curCount += item.Count;
+                        dataIndex += item.Count;
                     }
-                    _securityManager?.Encryption(new ArraySegment<byte>(bytes, curCount- curLength, curLength));
-                    
+                    _securityManager?.Encryption(new ArraySegment<byte>(bytes, dataIndex - curLength, curLength));
                 }
             }
             return bytes;

+ 33 - 31
dotnet/ServiceStation/Demo/StressTest/App.Robot.Station/Logical/Worker/Worker_ApiClientAsync.cs

@@ -44,40 +44,40 @@ namespace App.Robot.Station.Logical.Worker
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         protected  void CallApi()
         {
-            ApiClient.CallRemoteApiAsync<ApiReturn>((ret) =>
+            if (Interlocked.Decrement(ref leftCount) < 0)
             {
-                bool success = false;
-                if (ret == null || ret.success)
-                {
-                    success = true;
-                }
-                else
-                {
-                    if (logError)
-                        Logger.Info("失败:ret:" + ret.Serialize());
-                }
-
-                taskItem.StepUp(success);
-
-                if (taskItem.sumCount >= taskItem.targetCount)
-                {
-                    needRunning = false;
-                }
-
-
-                if (interval > 0)
-                    Thread.Sleep(interval);
-
-                if (needRunning) CallApi();
-                else
-                {
-                    Interlocked.Decrement(ref runningThreadCount);
-                }
-
-            }, apiRoute,apiArg, httpMethod);      
-          
+                needRunning = false;
+                Interlocked.Decrement(ref runningThreadCount);
+                return;
+            } 
+
+            ApiClient.CallRemoteApiAsync<ApiReturn>(OnSuc, apiRoute,apiArg, httpMethod);                
+        }
+
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        void OnSuc(ApiReturn ret) 
+        {
+            bool success = false;
+            if (ret == null || ret.success)
+            {
+                success = true;
+            }
+            else
+            {
+                if (logError)
+                    Logger.Info("失败:ret:" + ret.Serialize());
+            }
+
+            taskItem.StepUp(success);
+
+            if (interval > 0)
+                Thread.Sleep(interval);
+
+            CallApi();
         }
 
+        long leftCount;
+
         public void Start()
         {
             if (needRunning) return;
@@ -85,6 +85,8 @@ namespace App.Robot.Station.Logical.Worker
             taskItem.curCount = 0;
             taskItem.failCount = 0;
 
+            leftCount = taskItem.targetCount - taskItem.sumCount;
+
             needRunning = true;
 
             for (var t = 0; t < taskItem.config.threadCount; t++)