DeliveryConnection.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO.Pipes;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using Sers.Core.CL.MessageDelivery;
  7. using Vit.Core.Module.Log;
  8. using Vit.Core.Util.Pipelines;
  9. using Vit.Core.Util.Pool;
  10. using Vit.Extensions;
  11. namespace Sers.CL.Ipc.NamedPipe
  12. {
  13. /// <summary>
  14. ///
  15. /// </summary>
  16. public class DeliveryConnection : IDeliveryConnection
  17. {
  18. ~DeliveryConnection()
  19. {
  20. Close();
  21. }
  22. public Sers.Core.Util.StreamSecurity.SecurityManager securityManager { set => _securityManager = value; }
  23. Sers.Core.Util.StreamSecurity.SecurityManager _securityManager;
  24. /// <summary>
  25. /// 连接状态(0:waitForCertify; 2:certified; 4:waitForClose; 8:closed;)
  26. /// </summary>
  27. public byte state { get; set; } = DeliveryConnState.waitForCertify;
  28. public Action<IDeliveryConnection> Conn_OnDisconnected { get; set; }
  29. /// <summary>
  30. /// 请勿处理耗时操作,需立即返回。接收到客户端的数据事件
  31. /// </summary>
  32. public Action<IDeliveryConnection, ArraySegment<byte>> OnGetFrame { private get; set; }
  33. public void SendFrameAsync(Vit.Core.Util.Pipelines.ByteData data)
  34. {
  35. if (data == null || stream == null) return;
  36. if (!stream.IsConnected)
  37. {
  38. Task.Run((Action)Close);
  39. return;
  40. }
  41. try
  42. {
  43. Int32 len = data.Count();
  44. data.Insert(0, len.Int32ToArraySegmentByte());
  45. var bytes = data.ToBytes();
  46. _securityManager?.Encryption(new ArraySegment<byte>(bytes,4, bytes.Length-4));
  47. stream.WriteAsync(bytes, 0, bytes.Length);
  48. stream.FlushAsync();
  49. }
  50. catch (Exception ex) when (!(ex.GetBaseException() is ThreadInterruptedException))
  51. {
  52. Logger.Error(ex);
  53. Task.Run((Action)Close);
  54. }
  55. }
  56. public void Close()
  57. {
  58. if (state == DeliveryConnState.closed) return;
  59. state = DeliveryConnState.closed;
  60. try
  61. {
  62. stream.Close();
  63. stream.Dispose();
  64. }
  65. catch (Exception ex)
  66. {
  67. Logger.Error(ex);
  68. }
  69. try
  70. {
  71. Conn_OnDisconnected?.Invoke(this);
  72. }
  73. catch (Exception ex)
  74. {
  75. Logger.Error(ex);
  76. }
  77. }
  78. public void Init(PipeStream stream)
  79. {
  80. this.stream = stream;
  81. connectTime = DateTime.Now;
  82. }
  83. #region taskToReceiveMsg
  84. class StreamReader
  85. {
  86. public PipeStream stream;
  87. public Action<ArraySegment<byte>> OnReadData;
  88. public Action OnClose;
  89. //定义异步读取状态类
  90. class AsyncState
  91. {
  92. public PipeStream stream { get; set; }
  93. public byte[] buffer { get; set; }
  94. }
  95. /// <summary>
  96. /// 缓存区大小
  97. /// </summary>
  98. public int receiveBufferSize = 8 * 1024;
  99. public void Start()
  100. {
  101. try
  102. {
  103. var buffer = DataPool.BytesGet(receiveBufferSize);
  104. var asyncState = new AsyncState { stream = stream, buffer = buffer };
  105. //异步读取
  106. if (stream.IsConnected)
  107. {
  108. var result = stream.BeginRead(buffer, 0, receiveBufferSize, new AsyncCallback(AsyncReadCallback), asyncState);
  109. return;
  110. }
  111. }
  112. catch (Exception ex)
  113. {
  114. Logger.Error(ex);
  115. }
  116. Task.Run(OnClose);
  117. }
  118. //异步读取回调处理方法
  119. void AsyncReadCallback(IAsyncResult asyncResult)
  120. {
  121. try
  122. {
  123. var asyncState = (AsyncState)asyncResult.AsyncState;
  124. int readCount = asyncState.stream.EndRead(asyncResult);
  125. if (readCount > 0)
  126. {
  127. //输出读取内容值
  128. OnReadData(new ArraySegment<byte>(asyncState.buffer, 0, readCount));
  129. asyncState.buffer = DataPool.BytesGet(receiveBufferSize);
  130. //再次执行异步读取操作
  131. if (asyncState.stream.IsConnected)
  132. {
  133. asyncState.stream.BeginRead(asyncState.buffer, 0, receiveBufferSize, new AsyncCallback(AsyncReadCallback), asyncState);
  134. return;
  135. }
  136. }
  137. }
  138. catch (Exception ex)
  139. {
  140. Logger.Error(ex);
  141. }
  142. Task.Run(OnClose);
  143. }
  144. }
  145. PipeFrame pipe = new PipeFrame() { OnDequeueData = ArraySegmentByteExtensions.ReturnToPool };
  146. public void AppendData(ArraySegment<byte> data)
  147. {
  148. pipe.Write(data);
  149. while (pipe.TryRead_SersFile(out var msgFrame))
  150. {
  151. _securityManager?.Decryption(msgFrame);
  152. OnGetFrame.Invoke(this, msgFrame);
  153. }
  154. }
  155. public void StartBackThreadToReceiveMsg()
  156. {
  157. new StreamReader
  158. {
  159. stream = stream,
  160. OnReadData = AppendData,
  161. OnClose=Close
  162. }.Start();
  163. }
  164. #endregion
  165. /// <summary>
  166. /// 通信对象
  167. /// </summary>
  168. public PipeStream stream { get; private set; }
  169. /// <summary>
  170. /// 连接时间
  171. /// </summary>
  172. public DateTime connectTime { get; private set; }
  173. }
  174. }