123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- using System;
- using System.Collections.Generic;
- using System.IO.Pipes;
- using System.Threading;
- using System.Threading.Tasks;
- using Sers.Core.CL.MessageDelivery;
- using Vit.Core.Module.Log;
- using Vit.Core.Util.Pipelines;
- using Vit.Core.Util.Pool;
- using Vit.Extensions;
- namespace Sers.CL.Ipc.NamedPipe
- {
- /// <summary>
- ///
- /// </summary>
- public class DeliveryConnection : IDeliveryConnection
- {
- ~DeliveryConnection()
- {
- Close();
- }
- /// <summary>
- /// 连接状态(0:waitForCertify; 2:certified; 4:waitForClose; 8:closed;)
- /// </summary>
- public byte state { get; set; } = DeliveryConnState.waitForCertify;
- public Action<IDeliveryConnection> Conn_OnDisconnected { get; set; }
- /// <summary>
- /// 请勿处理耗时操作,需立即返回。接收到客户端的数据事件
- /// </summary>
- public Action<IDeliveryConnection, ArraySegment<byte>> OnGetFrame { private get; set; }
- public void SendFrameAsync(List<ArraySegment<byte>> data)
- {
- if (data == null || stream == null) return;
- if (!stream.IsConnected)
- {
- Task.Run(Close);
- return;
- }
- try
- {
- Int32 len = data.ByteDataCount();
- data.Insert(0, len.Int32ToArraySegmentByte());
- var bytes = data.ByteDataToBytes();
- stream.WriteAsync(bytes, 0, bytes.Length);
- stream.FlushAsync();
- }
- catch (Exception ex) when (!(ex.GetBaseException() is ThreadInterruptedException))
- {
- Logger.Error(ex);
- Task.Run(Close);
- }
- }
- public void Close()
- {
- if (state == DeliveryConnState.closed) return;
- state = DeliveryConnState.closed;
- try
- {
- stream.Close();
- stream.Dispose();
- //stream.Shutdown(SocketShutdown.Both);
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- try
- {
- Conn_OnDisconnected?.Invoke(this);
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- }
- public void Init(PipeStream stream)
- {
- this.stream = stream;
- connectTime = DateTime.Now;
- }
- #region taskToReceiveMsg
- class StreamReader
- {
- public PipeStream stream;
- public Action<ArraySegment<byte>> OnReadData;
- public Action OnClose;
- //定义异步读取状态类
- class AsyncState
- {
- public PipeStream stream { get; set; }
- public byte[] buffer { get; set; }
- }
- /// <summary>
- /// 缓存区大小
- /// </summary>
- public int receiveBufferSize = 8 * 1024;
- public void Start()
- {
- try
- {
- var buffer = DataPool.BytesGet(receiveBufferSize);
- var asyncState = new AsyncState { stream = stream, buffer = buffer };
- //异步读取
- if (stream.IsConnected)
- {
- var result=stream.BeginRead(buffer, 0, receiveBufferSize, new AsyncCallback(AsyncReadCallback), asyncState);
- return;
- }
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- Task.Run(OnClose);
- }
- //异步读取回调处理方法
- void AsyncReadCallback(IAsyncResult asyncResult)
- {
- try
- {
- var asyncState = (AsyncState)asyncResult.AsyncState;
- int readCount = asyncState.stream.EndRead(asyncResult);
- if (readCount > 0)
- {
- //输出读取内容值
- OnReadData(new ArraySegment<byte>(asyncState.buffer, 0, readCount));
- asyncState.buffer = DataPool.BytesGet(receiveBufferSize);
- //再次执行异步读取操作
- if (asyncState.stream.IsConnected)
- {
- asyncState.stream.BeginRead(asyncState.buffer, 0, receiveBufferSize, new AsyncCallback(AsyncReadCallback), asyncState);
- return;
- }
- }
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- Task.Run(OnClose);
- }
- }
- PipeFrame pipe = new PipeFrame() { OnDequeueData = ArraySegmentByteExtensions.ReturnToPool };
- public void AppendData(ArraySegment<byte> data)
- {
- pipe.Write(data);
- while (pipe.TryRead_SersFile(out var msgFrame))
- {
- OnGetFrame.Invoke(this, msgFrame);
- }
- }
- public void StartBackThreadToReceiveMsg()
- {
- new StreamReader
- {
- stream = stream,
- OnReadData = AppendData,
- OnClose=Close
- }.Start();
- }
-
- #endregion
- /// <summary>
- /// 通信对象
- /// </summary>
- public PipeStream stream { get; private set; }
- /// <summary>
- /// 连接时间
- /// </summary>
- public DateTime connectTime { get; private set; }
- }
- }
|