|
@@ -1,6 +1,7 @@
|
|
using System;
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Collections.Generic;
|
|
|
|
+using System.Runtime.CompilerServices;
|
|
using System.Threading;
|
|
using System.Threading;
|
|
using Newtonsoft.Json.Linq;
|
|
using Newtonsoft.Json.Linq;
|
|
using Sers.Core.CL.MessageDelivery;
|
|
using Sers.Core.CL.MessageDelivery;
|
|
@@ -60,6 +61,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
#region (x.x.2)SendMessage SendRequest
|
|
#region (x.x.2)SendMessage SendRequest
|
|
|
|
|
|
|
|
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
public void SendMessageAsync(IOrganizeConnection conn, Vit.Core.Util.Pipelines.ByteData message)
|
|
public void SendMessageAsync(IOrganizeConnection conn, Vit.Core.Util.Pipelines.ByteData message)
|
|
{
|
|
{
|
|
Delivery_SendFrameAsync(conn, (byte)EFrameType.message, 0, message);
|
|
Delivery_SendFrameAsync(conn, (byte)EFrameType.message, 0, message);
|
|
@@ -67,6 +69,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
public long SendRequestAsync(IOrganizeConnection conn, Object sender, Vit.Core.Util.Pipelines.ByteData requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback, ERequestType requestType = ERequestType.app)
|
|
public long SendRequestAsync(IOrganizeConnection conn, Object sender, Vit.Core.Util.Pipelines.ByteData requestData, Action<object, Vit.Core.Util.Pipelines.ByteData> callback, ERequestType requestType = ERequestType.app)
|
|
{
|
|
{
|
|
//no need guid,just make sure reqKey is unique in current connection client
|
|
//no need guid,just make sure reqKey is unique in current connection client
|
|
@@ -84,55 +87,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
//SendRequest
|
|
//SendRequest
|
|
Delivery_SendFrameAsync(conn, (byte)EFrameType.request, (byte)requestType, reqRepFrame);
|
|
Delivery_SendFrameAsync(conn, (byte)EFrameType.request, (byte)requestType, reqRepFrame);
|
|
return reqKey;
|
|
return reqKey;
|
|
- }
|
|
|
|
-
|
|
|
|
- #region static curAutoResetEvent
|
|
|
|
- public static AutoResetEvent curAutoResetEvent =>
|
|
|
|
- _curAutoResetEvent.Value ?? (_curAutoResetEvent.Value = new AutoResetEvent(false));
|
|
|
|
-
|
|
|
|
- static System.Threading.ThreadLocal<AutoResetEvent> _curAutoResetEvent = new System.Threading.ThreadLocal<AutoResetEvent>();
|
|
|
|
- #endregion
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- public bool SendRequest(IOrganizeConnection conn, Vit.Core.Util.Pipelines.ByteData requestData, out ByteData replyData)
|
|
|
|
- {
|
|
|
|
- ByteData _replyData = null;
|
|
|
|
-
|
|
|
|
- AutoResetEvent mEvent = curAutoResetEvent;
|
|
|
|
- mEvent.Reset();
|
|
|
|
-
|
|
|
|
- long reqKey = SendRequestAsync(conn, null, requestData, (sender, replyData_) => {
|
|
|
|
- _replyData = replyData_;
|
|
|
|
- mEvent?.Set();
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- bool success;
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- success = mEvent.WaitOne(requestTimeoutMs);
|
|
|
|
- }
|
|
|
|
- finally
|
|
|
|
- {
|
|
|
|
- mEvent = null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- if (success)
|
|
|
|
- {
|
|
|
|
- replyData = _replyData;
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- if (OrganizeToDelivery_RequestMap_TryRemove(reqKey, out var requestInfo))
|
|
|
|
- {
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
- replyData = null;
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
#endregion
|
|
#endregion
|
|
|
|
|
|
@@ -253,9 +208,10 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
//IConsumer<DeliveryToOrganize_MessageFrame> task_DeliveryToOrganize_Processor = new Consumer_WorkerPoolCascade<DeliveryToOrganize_MessageFrame>();
|
|
//IConsumer<DeliveryToOrganize_MessageFrame> task_DeliveryToOrganize_Processor = new Consumer_WorkerPoolCascade<DeliveryToOrganize_MessageFrame>();
|
|
|
|
|
|
|
|
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
void DeliveryToOrganize_OnGetMessageFrame(IOrganizeConnection conn, ArraySegment<byte> messageFrame)
|
|
void DeliveryToOrganize_OnGetMessageFrame(IOrganizeConnection conn, ArraySegment<byte> messageFrame)
|
|
{
|
|
{
|
|
- var msg = DeliveryToOrganize_MessageFrame.Pop();
|
|
|
|
|
|
+ var msg = new DeliveryToOrganize_MessageFrame();
|
|
msg.conn = conn;
|
|
msg.conn = conn;
|
|
msg.messageFrame = messageFrame;
|
|
msg.messageFrame = messageFrame;
|
|
|
|
|
|
@@ -267,25 +223,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
class DeliveryToOrganize_MessageFrame
|
|
class DeliveryToOrganize_MessageFrame
|
|
{
|
|
{
|
|
public IOrganizeConnection conn { get; set; }
|
|
public IOrganizeConnection conn { get; set; }
|
|
- public ArraySegment<byte>? messageFrame;
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- public static DeliveryToOrganize_MessageFrame Pop()
|
|
|
|
- {
|
|
|
|
- return ObjectPool<DeliveryToOrganize_MessageFrame>.Shared.Pop();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /// <summary>
|
|
|
|
- /// 使用结束请手动调用
|
|
|
|
- /// </summary>
|
|
|
|
- public void Push()
|
|
|
|
- {
|
|
|
|
- conn = null;
|
|
|
|
- messageFrame = null;
|
|
|
|
-
|
|
|
|
- ObjectPool<DeliveryToOrganize_MessageFrame>.Shared.Push(this);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ public ArraySegment<byte>? messageFrame;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -293,38 +231,23 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
{
|
|
{
|
|
public IOrganizeConnection conn { get; set; }
|
|
public IOrganizeConnection conn { get; set; }
|
|
|
|
|
|
- public long reqKey;
|
|
|
|
- public static DeliveryToOrganize_RequestInfo Pop()
|
|
|
|
- {
|
|
|
|
- return ObjectPool<DeliveryToOrganize_RequestInfo>.Shared.Pop();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /// <summary>
|
|
|
|
- /// 使用结束请手动调用
|
|
|
|
- /// </summary>
|
|
|
|
- public void Push()
|
|
|
|
- {
|
|
|
|
- conn = null;
|
|
|
|
- ObjectPool<DeliveryToOrganize_RequestInfo>.Shared.Push(this);
|
|
|
|
- }
|
|
|
|
|
|
+ public long reqKey;
|
|
}
|
|
}
|
|
#endregion
|
|
#endregion
|
|
|
|
|
|
#endregion
|
|
#endregion
|
|
-
|
|
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
#region DeliveryToOrganize_ProcessFrame
|
|
#region DeliveryToOrganize_ProcessFrame
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
void DeliveryToOrganize_ProcessFrame(DeliveryToOrganize_MessageFrame msgFrame)
|
|
void DeliveryToOrganize_ProcessFrame(DeliveryToOrganize_MessageFrame msgFrame)
|
|
{
|
|
{
|
|
- IOrganizeConnection conn = msgFrame.conn;
|
|
|
|
- var messageFrame = msgFrame.messageFrame;
|
|
|
|
-
|
|
|
|
- msgFrame.Push();
|
|
|
|
|
|
+ IOrganizeConnection conn = msgFrame.conn;
|
|
|
|
|
|
- if (messageFrame == null) return;
|
|
|
|
|
|
+ if (msgFrame.messageFrame == null) return;
|
|
|
|
|
|
- var data = messageFrame.Value;
|
|
|
|
|
|
+ var data = msgFrame.messageFrame.Value;
|
|
|
|
|
|
if (data.Count <= 2) return;
|
|
if (data.Count <= 2) return;
|
|
|
|
|
|
@@ -347,7 +270,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
{
|
|
{
|
|
byte requestType = data.Array[data.Offset + 1];
|
|
byte requestType = data.Array[data.Offset + 1];
|
|
|
|
|
|
- var reqInfo = DeliveryToOrganize_RequestInfo.Pop();
|
|
|
|
|
|
+ var reqInfo = new DeliveryToOrganize_RequestInfo();
|
|
reqInfo.conn = conn;
|
|
reqInfo.conn = conn;
|
|
UnpackReqRepFrame(msgData, out reqInfo.reqKey, out var requestData);
|
|
UnpackReqRepFrame(msgData, out reqInfo.reqKey, out var requestData);
|
|
|
|
|
|
@@ -370,7 +293,8 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
#region DeliveryToOrganize_OnGetRequest
|
|
#region DeliveryToOrganize_OnGetRequest
|
|
|
|
|
|
const string organizeVersion = "Sers.Mq.Socket.v1";
|
|
const string organizeVersion = "Sers.Mq.Socket.v1";
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
void DeliveryToOrganize_OnGetRequest(DeliveryToOrganize_RequestInfo reqInfo, byte requestType, ArraySegment<byte> requestData)
|
|
void DeliveryToOrganize_OnGetRequest(DeliveryToOrganize_RequestInfo reqInfo, byte requestType, ArraySegment<byte> requestData)
|
|
{
|
|
{
|
|
switch ((ERequestType)requestType)
|
|
switch ((ERequestType)requestType)
|
|
@@ -400,14 +324,15 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
}
|
|
}
|
|
#endregion
|
|
#endregion
|
|
|
|
|
|
-
|
|
|
|
|
|
+
|
|
#region DeliveryToOrganize_SendReply
|
|
#region DeliveryToOrganize_SendReply
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
private void DeliveryToOrganize_SendReply(object sender, Vit.Core.Util.Pipelines.ByteData replyData)
|
|
private void DeliveryToOrganize_SendReply(object sender, Vit.Core.Util.Pipelines.ByteData replyData)
|
|
{
|
|
{
|
|
DeliveryToOrganize_RequestInfo reqInfo = sender as DeliveryToOrganize_RequestInfo;
|
|
DeliveryToOrganize_RequestInfo reqInfo = sender as DeliveryToOrganize_RequestInfo;
|
|
var conn = reqInfo.conn;
|
|
var conn = reqInfo.conn;
|
|
var reqKey = reqInfo.reqKey;
|
|
var reqKey = reqInfo.reqKey;
|
|
- reqInfo.Push();
|
|
|
|
|
|
+
|
|
|
|
|
|
PackageReqRepFrame(reqKey, replyData, out var repFrame);
|
|
PackageReqRepFrame(reqKey, replyData, out var repFrame);
|
|
|
|
|
|
@@ -435,12 +360,14 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
readonly ConcurrentDictionary<long, OrganizeToDelivery_RequestInfo> organizeToDelivery_RequestMap0 = new ConcurrentDictionary<long, OrganizeToDelivery_RequestInfo>();
|
|
readonly ConcurrentDictionary<long, OrganizeToDelivery_RequestInfo> organizeToDelivery_RequestMap0 = new ConcurrentDictionary<long, OrganizeToDelivery_RequestInfo>();
|
|
readonly ConcurrentDictionary<long, OrganizeToDelivery_RequestInfo> organizeToDelivery_RequestMap1 = new ConcurrentDictionary<long, OrganizeToDelivery_RequestInfo>();
|
|
readonly ConcurrentDictionary<long, OrganizeToDelivery_RequestInfo> organizeToDelivery_RequestMap1 = new ConcurrentDictionary<long, OrganizeToDelivery_RequestInfo>();
|
|
|
|
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
bool OrganizeToDelivery_RequestMap_TryRemove(long guid, out OrganizeToDelivery_RequestInfo reqInfo)
|
|
bool OrganizeToDelivery_RequestMap_TryRemove(long guid, out OrganizeToDelivery_RequestInfo reqInfo)
|
|
{
|
|
{
|
|
return (guid > 0 ? organizeToDelivery_RequestMap0 : organizeToDelivery_RequestMap1).TryRemove(guid, out reqInfo);
|
|
return (guid > 0 ? organizeToDelivery_RequestMap0 : organizeToDelivery_RequestMap1).TryRemove(guid, out reqInfo);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
void OrganizeToDelivery_RequestMap_Set(ref long guid, OrganizeToDelivery_RequestInfo reqInfo)
|
|
void OrganizeToDelivery_RequestMap_Set(ref long guid, OrganizeToDelivery_RequestInfo reqInfo)
|
|
{
|
|
{
|
|
if (organizeToDelivery_RequestMap_timeoutTime < DateTime.Now)
|
|
if (organizeToDelivery_RequestMap_timeoutTime < DateTime.Now)
|
|
@@ -491,6 +418,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
|
|
|
|
|
|
|
|
#region (x.5)Delivery_SendFrameAsync
|
|
#region (x.5)Delivery_SendFrameAsync
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
void Delivery_SendFrameAsync(IOrganizeConnection conn, byte msgType, byte requestType, Vit.Core.Util.Pipelines.ByteData data)
|
|
void Delivery_SendFrameAsync(IOrganizeConnection conn, byte msgType, byte requestType, Vit.Core.Util.Pipelines.ByteData data)
|
|
{
|
|
{
|
|
//var item = DataPool.BytesGet(2);
|
|
//var item = DataPool.BytesGet(2);
|
|
@@ -509,6 +437,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
|
|
|
|
readonly SersTimer heartBeat_Timer = new SersTimer();
|
|
readonly SersTimer heartBeat_Timer = new SersTimer();
|
|
|
|
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
void HeartBeat_Loop()
|
|
void HeartBeat_Loop()
|
|
{
|
|
{
|
|
try
|
|
try
|
|
@@ -663,6 +592,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
第1部分: 请求标识(reqKey)(long) 长度为8字节
|
|
第1部分: 请求标识(reqKey)(long) 长度为8字节
|
|
第2部分: 消息内容(oriMsg)
|
|
第2部分: 消息内容(oriMsg)
|
|
*/
|
|
*/
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
internal static void UnpackReqRepFrame(ArraySegment<byte> reqRepFrame, out long reqKey, out ArraySegment<byte> oriMsg)
|
|
internal static void UnpackReqRepFrame(ArraySegment<byte> reqRepFrame, out long reqKey, out ArraySegment<byte> oriMsg)
|
|
{
|
|
{
|
|
//第1帧
|
|
//第1帧
|
|
@@ -678,6 +608,7 @@ namespace Sers.Core.CL.MessageOrganize.DefaultOrganize
|
|
/// <param name="reqKey"></param>
|
|
/// <param name="reqKey"></param>
|
|
/// <param name="oriMsg"></param>
|
|
/// <param name="oriMsg"></param>
|
|
/// <param name="reqRepFrame"></param>
|
|
/// <param name="reqRepFrame"></param>
|
|
|
|
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
static void PackageReqRepFrame(long reqKey, Vit.Core.Util.Pipelines.ByteData oriMsg, out Vit.Core.Util.Pipelines.ByteData reqRepFrame)
|
|
static void PackageReqRepFrame(long reqKey, Vit.Core.Util.Pipelines.ByteData oriMsg, out Vit.Core.Util.Pipelines.ByteData reqRepFrame)
|
|
{
|
|
{
|
|
//*
|
|
//*
|