using System; using System.Collections.Generic; using System.IO.Pipes; using System.Threading; using System.Threading.Tasks; using Sers.Core.CL.MessageDelivery; using Vit.Core.Module.Log; using Vit.Core.Util.Pipelines; using Vit.Core.Util.Pool; using Vit.Extensions; namespace Sers.CL.Ipc.NamedPipe { /// /// /// public class DeliveryConnection : IDeliveryConnection { ~DeliveryConnection() { Close(); } /// /// 连接状态(0:waitForCertify; 2:certified; 4:waitForClose; 8:closed;) /// public byte state { get; set; } = DeliveryConnState.waitForCertify; public Action Conn_OnDisconnected { get; set; } /// /// 请勿处理耗时操作,需立即返回。接收到客户端的数据事件 /// public Action> OnGetFrame { private get; set; } public void SendFrameAsync(List> data) { if (data == null || stream == null) return; if (!stream.IsConnected) { Task.Run(Close); return; } try { Int32 len = data.ByteDataCount(); data.Insert(0, len.Int32ToArraySegmentByte()); var bytes = data.ByteDataToBytes(); stream.WriteAsync(bytes, 0, bytes.Length); stream.FlushAsync(); } catch (Exception ex) when (!(ex.GetBaseException() is ThreadInterruptedException)) { Logger.Error(ex); Task.Run(Close); } } public void Close() { if (state == DeliveryConnState.closed) return; state = DeliveryConnState.closed; try { stream.Close(); stream.Dispose(); //stream.Shutdown(SocketShutdown.Both); } catch (Exception ex) { Logger.Error(ex); } try { Conn_OnDisconnected?.Invoke(this); } catch (Exception ex) { Logger.Error(ex); } } public void Init(PipeStream stream) { this.stream = stream; connectTime = DateTime.Now; } #region taskToReceiveMsg class StreamReader { public PipeStream stream; public Action> OnReadData; public Action OnClose; //定义异步读取状态类 class AsyncState { public PipeStream stream { get; set; } public byte[] buffer { get; set; } } /// /// 缓存区大小 /// public int receiveBufferSize = 8 * 1024; public void Start() { try { var buffer = DataPool.BytesGet(receiveBufferSize); var asyncState = new AsyncState { stream = stream, buffer = buffer }; //异步读取 if (stream.IsConnected) { var result=stream.BeginRead(buffer, 0, receiveBufferSize, new AsyncCallback(AsyncReadCallback), asyncState); return; } } catch (Exception ex) { Logger.Error(ex); } Task.Run(OnClose); } //异步读取回调处理方法 void AsyncReadCallback(IAsyncResult asyncResult) { try { var asyncState = (AsyncState)asyncResult.AsyncState; int readCount = asyncState.stream.EndRead(asyncResult); if (readCount > 0) { //输出读取内容值 OnReadData(new ArraySegment(asyncState.buffer, 0, readCount)); asyncState.buffer = DataPool.BytesGet(receiveBufferSize); //再次执行异步读取操作 if (asyncState.stream.IsConnected) { asyncState.stream.BeginRead(asyncState.buffer, 0, receiveBufferSize, new AsyncCallback(AsyncReadCallback), asyncState); return; } } } catch (Exception ex) { Logger.Error(ex); } Task.Run(OnClose); } } PipeFrame pipe = new PipeFrame() { OnDequeueData = ArraySegmentByteExtensions.ReturnToPool }; public void AppendData(ArraySegment data) { pipe.Write(data); while (pipe.TryRead_SersFile(out var msgFrame)) { OnGetFrame.Invoke(this, msgFrame); } } public void StartBackThreadToReceiveMsg() { new StreamReader { stream = stream, OnReadData = AppendData, OnClose=Close }.Start(); } #endregion /// /// 通信对象 /// public PipeStream stream { get; private set; } /// /// 连接时间 /// public DateTime connectTime { get; private set; } } }