DeliveryConnection.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Net.Sockets;
  4. using System.Runtime.CompilerServices;
  5. using Sers.CL.Socket.Iocp.Base;
  6. using Vit.Core.Module.Log;
  7. using Vit.Core.Util.Pipelines;
  8. using Vit.Extensions;
  9. namespace Sers.CL.Socket.Iocp.Mode.Timer
  10. {
  11. public class DeliveryConnection : DeliveryConnection_Base
  12. {
  13. public void SetConfig(int sendBufferSize /*= 1_000_000*/, int sendBufferCount /*= 1024*/)
  14. {
  15. this.sendBufferSize = sendBufferSize;
  16. this.sendBufferCount = sendBufferCount;
  17. buffer = new ByteData[sendBufferCount];
  18. bufferItemCount = new int[sendBufferCount];
  19. }
  20. #region Send
  21. ConcurrentQueue<ByteData> frameQueueToSend = new ConcurrentQueue<ByteData>();
  22. /// <summary>
  23. /// 发送缓冲区数据块的最小大小(单位:byte)
  24. /// </summary>
  25. public int sendBufferSize ;
  26. /// <summary>
  27. /// 发送缓冲区个数
  28. /// </summary>
  29. int sendBufferCount;
  30. ByteData[] buffer ;
  31. int[] bufferItemCount ;
  32. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  33. public override void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
  34. {
  35. frameQueueToSend.Enqueue(data);
  36. }
  37. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  38. public void FlushSendFrameQueue()
  39. {
  40. int curIndex;
  41. try
  42. {
  43. while (true)
  44. {
  45. curIndex = 0;
  46. while (true)
  47. {
  48. if (frameQueueToSend.TryDequeue(out var item))
  49. {
  50. buffer[curIndex++] = item;
  51. if (curIndex == sendBufferCount)
  52. {
  53. break;
  54. }
  55. }
  56. else
  57. {
  58. if (curIndex == 0) return;
  59. break;
  60. }
  61. }
  62. FlushData(curIndex);
  63. if (curIndex < sendBufferCount)
  64. {
  65. return;
  66. }
  67. }
  68. }
  69. catch (Exception ex)
  70. {
  71. Logger.Error(ex);
  72. Close();
  73. }
  74. }
  75. /// <summary>
  76. ///
  77. /// </summary>
  78. /// <param name="stopIndex">不包含</param>
  79. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  80. void FlushData(int stopIndex)
  81. {
  82. int curCount;
  83. ByteData byteData;
  84. byte[] bytes;
  85. int sumCount = 0;
  86. int startIndex = 0;
  87. int curIndex = 0;
  88. while (true)
  89. {
  90. byteData = buffer[curIndex];
  91. //(x.1)get count
  92. curCount = 0;
  93. foreach (var item in byteData.byteArrayList)
  94. {
  95. curCount += item.Count;
  96. }
  97. bufferItemCount[curIndex] = curCount;
  98. sumCount += curCount;
  99. curIndex++;
  100. //(x.2)
  101. if (curIndex == stopIndex)
  102. {
  103. bytes = BufferToBytes(startIndex, curIndex, sumCount);
  104. socket.SendAsync(bytes.BytesToArraySegmentByte(), SocketFlags.None);
  105. return;
  106. }
  107. //(x.3)
  108. if (sumCount >= sendBufferSize)
  109. {
  110. bytes = BufferToBytes(startIndex, curIndex, sumCount);
  111. socket.SendAsync(bytes.BytesToArraySegmentByte(), SocketFlags.None);
  112. sumCount = 0;
  113. startIndex = curIndex;
  114. }
  115. }
  116. }
  117. /// <summary>
  118. ///
  119. /// </summary>
  120. /// <param name="startIndex"></param>
  121. /// <param name="stopIndex">不包含</param>
  122. /// <param name="sumCount"></param>
  123. /// <returns></returns>
  124. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  125. unsafe byte[] BufferToBytes(int startIndex, int stopIndex,int sumCount)
  126. {
  127. var bytes = new byte[sumCount + (stopIndex - startIndex) * 4];
  128. int curLength;
  129. fixed (byte* pTarget = bytes)
  130. {
  131. int dataIndex = 0;
  132. for (int curIndex = startIndex; curIndex < stopIndex; curIndex++)
  133. {
  134. var byteData = buffer[curIndex];
  135. ((int*)(pTarget + dataIndex))[0] = curLength = bufferItemCount[curIndex];
  136. dataIndex += 4;
  137. foreach (var item in byteData.byteArrayList)
  138. {
  139. if (null == item.Array || item.Count == 0) continue;
  140. fixed (byte* pSource = item.Array)
  141. {
  142. Buffer.MemoryCopy(pSource + item.Offset, pTarget + dataIndex, item.Count, item.Count);
  143. }
  144. dataIndex += item.Count;
  145. }
  146. _securityManager?.Encryption(new ArraySegment<byte>(bytes, dataIndex - curLength, curLength));
  147. }
  148. }
  149. return bytes;
  150. }
  151. #endregion
  152. }
  153. }