ReadStream.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. using System;
  2. using System.Threading;
  3. using Sers.Core.CL.MessageDelivery;
  4. using Vit.Core.Module.Log;
  5. using Vit.Core.Util.Threading.Worker;
  6. using Vit.Extensions.Json_Extensions;
  7. namespace Sers.CL.Ipc.SharedMemory.Stream
  8. {
  9. public class ReadStream
  10. {
  11. public IDeliveryConnection conn;
  12. /// <summary>
  13. /// 请勿处理耗时操作,需立即返回。接收到客户端的数据事件 public delegate void OnReceiveData(AsyncUserToken token, ArraySegment&lt;byte&gt; messageFrame);
  14. /// </summary>
  15. public Action<IDeliveryConnection, ArraySegment<byte>> OnReceiveMessage { set; private get; }
  16. public Action<IDeliveryConnection> OnDisconnected { set; private get; }
  17. public ReadStream()
  18. {
  19. }
  20. ~ReadStream()
  21. {
  22. Stop();
  23. }
  24. global::SharedMemory.CircularBuffer buffer;
  25. /// <summary>
  26. ///
  27. /// </summary>
  28. /// <param name="name">共享内存名称</param>
  29. /// <param name="nodeCount">共享内存节点个数</param>
  30. /// <param name="nodeBufferSize">共享内存节点大小</param>
  31. /// <returns></returns>
  32. public bool SharedMemory_Malloc(string name = "", int nodeCount = 64, int nodeBufferSize = 10)
  33. {
  34. try
  35. {
  36. if (buffer != null) return false;
  37. buffer = Util.SharedMemory_Malloc(name: name, nodeCount: nodeCount, nodeBufferSize: nodeBufferSize);
  38. return true;
  39. }
  40. catch (Exception ex)
  41. {
  42. Logger.Error(ex);
  43. return false;
  44. }
  45. }
  46. public bool SharedMemory_Attach(string name = "")
  47. {
  48. if (buffer != null) return false;
  49. try
  50. {
  51. buffer = Util.SharedMemory_Attach(name);
  52. return true;
  53. }
  54. catch (Exception ex)
  55. {
  56. Logger.Error(ex);
  57. return false;
  58. }
  59. }
  60. public bool Start()
  61. {
  62. if (buffer == null) return false;
  63. // start receiveMsg Thread
  64. receiveMsg_Thread.threadName = "CL-Ipc-SharedMemory-receiveMsg-" + buffer.Name;
  65. receiveMsg_Thread.threadCount = 1;
  66. receiveMsg_Thread.Processor = ReceiveMsg_Thread;
  67. receiveMsg_Thread.Start();
  68. return true;
  69. }
  70. public void Stop()
  71. {
  72. if (buffer == null) return;
  73. receiveMsg_Thread.Stop();
  74. buffer.Close();
  75. buffer = null;
  76. try
  77. {
  78. OnDisconnected?.Invoke(conn);
  79. }
  80. catch (Exception ex)
  81. {
  82. Logger.Error(ex);
  83. }
  84. }
  85. #region ReceiveMsg_Thread
  86. readonly LongThread receiveMsg_Thread = new LongThread();
  87. void ReceiveMsg_Thread()
  88. {
  89. byte[] baLen = new byte[4];
  90. while (true)
  91. {
  92. try
  93. {
  94. while (true)
  95. {
  96. //(x.1) get msg len
  97. while (0 == buffer.Read(baLen)) ;
  98. int len = baLen.BytesToInt32();
  99. if (len < 0)
  100. {
  101. Stop();
  102. return;
  103. }
  104. #region (x.2) receive msg
  105. byte[] data = new byte[len];
  106. int receivedLen = 0;
  107. while (receivedLen < len)
  108. {
  109. receivedLen += buffer.Read(data, receivedLen);
  110. }
  111. #endregion
  112. //(x.3) deal msg
  113. OnReceiveMessage(conn, new ArraySegment<byte>(data));
  114. }
  115. }
  116. catch (Exception ex) when (!(ex.GetBaseException() is ThreadInterruptedException))
  117. {
  118. Logger.Error(ex);
  119. }
  120. }
  121. }
  122. #endregion
  123. }
  124. }