DeliveryConnection.cs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO.Pipes;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using Sers.Core.CL.MessageDelivery;
  7. using Vit.Core.Module.Log;
  8. using Vit.Core.Util.Pipelines;
  9. using Vit.Core.Util.Pool;
  10. using Vit.Extensions;
  11. namespace Sers.CL.Ipc.NamedPipe
  12. {
  13. /// <summary>
  14. ///
  15. /// </summary>
  16. public class DeliveryConnection : IDeliveryConnection
  17. {
  18. ~DeliveryConnection()
  19. {
  20. Close();
  21. }
  22. /// <summary>
  23. /// 连接状态(0:waitForCertify; 2:certified; 4:waitForClose; 8:closed;)
  24. /// </summary>
  25. public byte state { get; set; } = DeliveryConnState.waitForCertify;
  26. public Action<IDeliveryConnection> Conn_OnDisconnected { get; set; }
  27. /// <summary>
  28. /// 请勿处理耗时操作,需立即返回。接收到客户端的数据事件
  29. /// </summary>
  30. public Action<IDeliveryConnection, ArraySegment<byte>> OnGetFrame { private get; set; }
  31. public void SendFrameAsync(List<ArraySegment<byte>> data)
  32. {
  33. if (data == null || stream == null) return;
  34. if (!stream.IsConnected)
  35. {
  36. Task.Run(Close);
  37. return;
  38. }
  39. try
  40. {
  41. Int32 len = data.ByteDataCount();
  42. data.Insert(0, len.Int32ToArraySegmentByte());
  43. var bytes = data.ByteDataToBytes();
  44. stream.WriteAsync(bytes, 0, bytes.Length);
  45. stream.FlushAsync();
  46. }
  47. catch (Exception ex) when (!(ex.GetBaseException() is ThreadInterruptedException))
  48. {
  49. Logger.Error(ex);
  50. Task.Run(Close);
  51. }
  52. }
  53. public void Close()
  54. {
  55. if (state == DeliveryConnState.closed) return;
  56. state = DeliveryConnState.closed;
  57. try
  58. {
  59. stream.Close();
  60. stream.Dispose();
  61. //stream.Shutdown(SocketShutdown.Both);
  62. }
  63. catch (Exception ex)
  64. {
  65. Logger.Error(ex);
  66. }
  67. try
  68. {
  69. Conn_OnDisconnected?.Invoke(this);
  70. }
  71. catch (Exception ex)
  72. {
  73. Logger.Error(ex);
  74. }
  75. }
  76. public void Init(PipeStream stream)
  77. {
  78. this.stream = stream;
  79. connectTime = DateTime.Now;
  80. }
  81. #region taskToReceiveMsg
  82. class StreamReader
  83. {
  84. public PipeStream stream;
  85. public Action<ArraySegment<byte>> OnReadData;
  86. public Action OnClose;
  87. //定义异步读取状态类
  88. class AsyncState
  89. {
  90. public PipeStream stream { get; set; }
  91. public byte[] buffer { get; set; }
  92. }
  93. /// <summary>
  94. /// 缓存区大小
  95. /// </summary>
  96. public int receiveBufferSize = 8 * 1024;
  97. public void Start()
  98. {
  99. try
  100. {
  101. var buffer = DataPool.BytesGet(receiveBufferSize);
  102. var asyncState = new AsyncState { stream = stream, buffer = buffer };
  103. //异步读取
  104. if (stream.IsConnected)
  105. {
  106. var result=stream.BeginRead(buffer, 0, receiveBufferSize, new AsyncCallback(AsyncReadCallback), asyncState);
  107. return;
  108. }
  109. }
  110. catch (Exception ex)
  111. {
  112. Logger.Error(ex);
  113. }
  114. Task.Run(OnClose);
  115. }
  116. //异步读取回调处理方法
  117. void AsyncReadCallback(IAsyncResult asyncResult)
  118. {
  119. try
  120. {
  121. var asyncState = (AsyncState)asyncResult.AsyncState;
  122. int readCount = asyncState.stream.EndRead(asyncResult);
  123. if (readCount > 0)
  124. {
  125. //输出读取内容值
  126. OnReadData(new ArraySegment<byte>(asyncState.buffer, 0, readCount));
  127. asyncState.buffer = DataPool.BytesGet(receiveBufferSize);
  128. //再次执行异步读取操作
  129. if (asyncState.stream.IsConnected)
  130. {
  131. asyncState.stream.BeginRead(asyncState.buffer, 0, receiveBufferSize, new AsyncCallback(AsyncReadCallback), asyncState);
  132. return;
  133. }
  134. }
  135. }
  136. catch (Exception ex)
  137. {
  138. Logger.Error(ex);
  139. }
  140. Task.Run(OnClose);
  141. }
  142. }
  143. PipeFrame pipe = new PipeFrame() { OnDequeueData = ArraySegmentByteExtensions.ReturnToPool };
  144. public void AppendData(ArraySegment<byte> data)
  145. {
  146. pipe.Write(data);
  147. while (pipe.TryRead_SersFile(out var msgFrame))
  148. {
  149. OnGetFrame.Invoke(this, msgFrame);
  150. }
  151. }
  152. public void StartBackThreadToReceiveMsg()
  153. {
  154. new StreamReader
  155. {
  156. stream = stream,
  157. OnReadData = AppendData,
  158. OnClose=Close
  159. }.Start();
  160. }
  161. #endregion
  162. /// <summary>
  163. /// 通信对象
  164. /// </summary>
  165. public PipeStream stream { get; private set; }
  166. /// <summary>
  167. /// 连接时间
  168. /// </summary>
  169. public DateTime connectTime { get; private set; }
  170. }
  171. }