123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- using System;
- using System.Threading;
- using Sers.Core.CL.MessageDelivery;
- using Vit.Core.Module.Log;
- using Vit.Core.Util.Threading.Worker;
- using Vit.Extensions.Json_Extensions;
- namespace Sers.CL.Ipc.SharedMemory.Stream
- {
- public class ReadStream
- {
- public IDeliveryConnection conn;
- /// <summary>
- /// 请勿处理耗时操作,需立即返回。接收到客户端的数据事件 public delegate void OnReceiveData(AsyncUserToken token, ArraySegment<byte> messageFrame);
- /// </summary>
- public Action<IDeliveryConnection, ArraySegment<byte>> OnReceiveMessage { set; private get; }
- public Action<IDeliveryConnection> OnDisconnected { set; private get; }
- public ReadStream()
- {
- }
- ~ReadStream()
- {
- Stop();
- }
- global::SharedMemory.CircularBuffer buffer;
- /// <summary>
- ///
- /// </summary>
- /// <param name="name">共享内存名称</param>
- /// <param name="nodeCount">共享内存节点个数</param>
- /// <param name="nodeBufferSize">共享内存节点大小</param>
- /// <returns></returns>
- public bool SharedMemory_Malloc(string name = "", int nodeCount = 64, int nodeBufferSize = 10)
- {
- try
- {
- if (buffer != null) return false;
- buffer = Util.SharedMemory_Malloc(name: name, nodeCount: nodeCount, nodeBufferSize: nodeBufferSize);
- return true;
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- return false;
- }
- }
- public bool SharedMemory_Attach(string name = "")
- {
- if (buffer != null) return false;
- try
- {
- buffer = Util.SharedMemory_Attach(name);
- return true;
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- return false;
- }
- }
- public bool Start()
- {
- if (buffer == null) return false;
- // start receiveMsg Thread
- receiveMsg_Thread.threadName = "CL-Ipc-SharedMemory-receiveMsg-" + buffer.Name;
- receiveMsg_Thread.threadCount = 1;
- receiveMsg_Thread.Processor = ReceiveMsg_Thread;
- receiveMsg_Thread.Start();
- return true;
- }
- public void Stop()
- {
- if (buffer == null) return;
- receiveMsg_Thread.Stop();
- buffer.Close();
- buffer = null;
- try
- {
- OnDisconnected?.Invoke(conn);
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- }
- #region ReceiveMsg_Thread
- readonly LongThread receiveMsg_Thread = new LongThread();
- void ReceiveMsg_Thread()
- {
- byte[] baLen = new byte[4];
- while (true)
- {
- try
- {
- while (true)
- {
- //(x.1) get msg len
- while (0 == buffer.Read(baLen)) ;
- int len = baLen.BytesToInt32();
- if (len < 0)
- {
- Stop();
- return;
- }
- #region (x.2) receive msg
- byte[] data = new byte[len];
- int receivedLen = 0;
- while (receivedLen < len)
- {
- receivedLen += buffer.Read(data, receivedLen);
- }
- #endregion
- //(x.3) deal msg
- OnReceiveMessage(conn, new ArraySegment<byte>(data));
- }
- }
- catch (Exception ex) when (!(ex.GetBaseException() is ThreadInterruptedException))
- {
- Logger.Error(ex);
- }
- }
- }
- #endregion
- }
- }
|