123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using Sers.Core.CL.MessageDelivery;
- using Vit.Core.Module.Log;
- using Vit.Extensions;
- using ZeroMQ;
- namespace Sers.CL.ClrZmq.ThreadWait
- {
- public class DeliveryServer : IDeliveryServer
- {
- public Sers.Core.Util.StreamSecurity.SecurityManager securityManager;
- public Action<IDeliveryConnection> Conn_OnDisconnected { private get; set; }
- public Action<IDeliveryConnection> Conn_OnConnected { private get; set; }
-
- /// <summary>
- /// connGuid -> conn
- /// </summary>
- public readonly ConcurrentDictionary<long, DeliveryConnection> connMap = new ConcurrentDictionary<long, DeliveryConnection>();
- public IEnumerable<IDeliveryConnection> ConnectedList => connMap.Values.Select(conn => ((IDeliveryConnection)conn));
- /// <summary>
- /// 地址。例如: "tcp://*:4502" 、 "ipc://4502"
- /// </summary>
- public string endpoint = "tcp://*:4502";
- SocketPoller poller = new SocketPoller();
- #region Start
- /// <summary>
- /// 启动服务
- /// </summary>
- public bool Start()
- {
- try
- {
- Logger.Info("[CL.DeliveryServer] Zmq.ThreadWait,starting... endpoint: \"" + endpoint + "\"");
- //(x.1) create zmq conn
- var socket = new ZSocket(ZSocketType.ROUTER);
- socket.Bind(endpoint);
- //(x.2) init poller
- poller.OnReceiveMessage = Zmq_OnReceiveMessage;
- poller.Start(socket);
- Logger.Info("[CL.DeliveryServer] Zmq.ThreadWait,started.");
- return true;
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- return false;
- }
- void Zmq_OnReceiveMessage(ZMessage msg)
- {
- long connGuid;
- byte[] msgFrame;
- #region (x.1) get identity and msgFrame
- using (msg)
- {
- if (null == msg || msg.Count < 2) return;
- connGuid = msg[0].ReadInt64();
- msgFrame = msg[1].Read();
- #region 检测是否为 关闭命令
- if (msg.Count > 2)
- {
- bool getedCloseSignal = false;
- try
- {
- var data = msg[2].Read();
- if (data.Length > 0 && data[0] == 0xff)
- {
- getedCloseSignal = true;
- }
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- getedCloseSignal = true;
- }
- if (getedCloseSignal)
- {
- if (connMap.TryGetValue(connGuid, out var _conn))
- {
- _conn.Close();
- }
- else
- {
- SendCloseSignal(connGuid.Int64ToBytes());
- }
- return;
- }
- }
- #endregion
- }
- #endregion
- #region (x.2) get or create conn
- if (!connMap.TryGetValue(connGuid, out var conn))
- {
- //新连接
- conn=Delivery_OnConnected(connGuid);
- }
- #endregion
- #region (x.3)
- conn.OnGetFrame(conn, new ArraySegment<byte>(msgFrame));
- #endregion
- }
- void Zmq_SendFrameAsync(DeliveryConnection conn, byte[] data)
- {
- poller.SendMessageAsync(new ZMessage() { new ZFrame(conn.zmqIdentity), new ZFrame(data) });
- }
- #endregion
- #region Stop
- /// <summary>
- /// 停止服务
- /// </summary>
- public void Stop()
- {
- Logger.Info("[ServerMq] Zmq.ThreadWait,stop...");
- //(x.1) stop conn
- ConnectedList.ToList().ForEach(Delivery_OnDisconnected);
- connMap.Clear();
- //(x.2)
- poller.Close();
- Logger.Info("[ServerMq] Zmq.ThreadWait,stoped");
- }
- #endregion
- #region Delivery_Event
- private DeliveryConnection Delivery_OnConnected(long connGuid)
- {
- var conn = new DeliveryConnection() { zmqIdentity = connGuid.Int64ToBytes() };
- conn.securityManager = securityManager?.Clone();
- conn.OnSendFrameAsync = Zmq_SendFrameAsync;
- conn.Conn_OnDisconnected = Delivery_OnDisconnected;
- try
- {
- if (connMap.TryAdd(connGuid, conn))
- Conn_OnConnected?.Invoke(conn);
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- return conn;
- }
- private void Delivery_OnDisconnected(IDeliveryConnection _conn)
- {
- var conn = (DeliveryConnection)_conn;
- SendCloseSignal(conn.zmqIdentity);
- connMap.TryRemove(conn.zmqIdentity.BytesToInt64(), out _);
- try
- {
- Conn_OnDisconnected?.Invoke(_conn);
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- }
- private void SendCloseSignal(byte[] zmqIdentity)
- {
- try
- {
- if (poller != null)
- {
- poller.SendMessageAsync(new ZMessage() { new ZFrame(zmqIdentity), new ZFrame(new byte[1]), new ZFrame(new byte[] { (byte)0xff }) });
- poller.SendMessageAsync(new ZMessage() { new ZFrame(zmqIdentity), new ZFrame(new byte[1]), new ZFrame(new byte[] { (byte)0xff }) });
- }
- }
- catch (Exception ex)
- {
- Logger.Error(ex);
- }
- }
- #endregion
- }
- }
|