lith vor 3 Jahren
Ursprung
Commit
634ceede3b

+ 1 - 1
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/Controller/SubscriberLoader.cs

@@ -18,7 +18,7 @@ namespace Sers.Core.Module.PubSub.Controller
             foreach (var type in types)
             {
                 ISubscriber subscriber = (ISubscriber)Activator.CreateInstance(type);
-                EndpointManage.Instance.Message_Subscribe(subscriber);
+                SubscriberManage.Instance.Message_Subscribe(subscriber);
             }
         }
     }

+ 8 - 8
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/EFrameType.cs

@@ -3,20 +3,20 @@
     public enum EFrameType : byte
     {
         /// <summary>
-        ///  publish,msgTitle,msgData
+        ///  publish, msgTitle, msgData
         /// </summary>
-        publish,
+        publish = 0,
         /// <summary>
-        /// subscribe,msgTitle
+        /// subscribe, msgTitle
         /// </summary>
-        subscribe,
+        subscribe = 1,
         /// <summary>
-        /// subscribeCancel,msgTitle
+        /// unSubscribe, msgTitle
         /// </summary>
-        subscribeCancel,
+        unSubscribe = 2,
         /// <summary>
-        /// message,msgTitle,msgData
+        /// message, msgTitle, msgData
         /// </summary>
-        message
+        message = 3
     }
 }

+ 2 - 2
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/HotPlugSubscriber.cs

@@ -21,7 +21,7 @@ namespace Sers.Core.Module.PubSub
         public void Subscribe()
         {
             if (subscribing) return;
-            EndpointManage.Instance.Message_Subscribe(this);
+            SubscriberManage.Instance.Message_Subscribe(this);
             subscribing = true;
         }
 
@@ -32,7 +32,7 @@ namespace Sers.Core.Module.PubSub
         {
             if (subscribing)
             {
-                EndpointManage.Instance.Message_SubscribeCancel(this);
+                SubscriberManage.Instance.Message_UnSubscribe(this);
                 subscribing = false;
             }
             

+ 1 - 1
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/MessageCenterService.cs

@@ -45,7 +45,7 @@ namespace Sers.Core.Module.PubSub
                     case (byte)EFrameType.subscribe:
                         Subscribe(conn, msgTitle);
                         break;
-                    case (byte)EFrameType.subscribeCancel:
+                    case (byte)EFrameType.unSubscribe:
                         SubscribeCancel(conn, msgTitle);
                         break;
                 }

+ 5 - 7
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/MessageClient.cs

@@ -1,12 +1,10 @@
 using Vit.Core.Module.Log;
 using System;
-using System.Collections.Generic;
 using System.Threading.Tasks;
 using Sers.Core.Module.Message;
 using Vit.Extensions;
 using System.Collections.Concurrent;
 using Sers.Core.CL.MessageOrganize;
-using Vit.Core.Util.Pipelines;
 using System.Runtime.CompilerServices;
 
 namespace Sers.Core.Module.PubSub
@@ -98,7 +96,7 @@ namespace Sers.Core.Module.PubSub
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public void Message_Publish(string msgTitle, ArraySegment<byte> msgData)
         {
-            //publish,msgTitle,msgData 
+            //EFrameType.publish, msgTitle, msgData 
             var frame = new SersFile().SetFiles(
                 new[] { (byte)EFrameType.publish }.BytesToArraySegmentByte(),
                  msgTitle.SerializeToArraySegmentByte(),
@@ -109,7 +107,7 @@ namespace Sers.Core.Module.PubSub
 
         public void Message_Subscribe(string msgTitle)
         {
-            //subscribe,msgTitle
+            //EFrameType.subscribe, msgTitle
             var frame = new SersFile().SetFiles(
                 new[] { (byte)EFrameType.subscribe }.BytesToArraySegmentByte(),
                  msgTitle.SerializeToArraySegmentByte()
@@ -117,11 +115,11 @@ namespace Sers.Core.Module.PubSub
             SendFrame(frame);
         }
 
-        public void Message_SubscribeCancel(string msgTitle)
+        public void Message_UnSubscribe(string msgTitle)
         {
-            //subscribeCancel,msgTitle
+            //EFrameType.unSubscribe, msgTitle
             var frame = new SersFile().SetFiles(
-                new[] { (byte)EFrameType.subscribeCancel }.BytesToArraySegmentByte(),
+                new[] { (byte)EFrameType.unSubscribe }.BytesToArraySegmentByte(),
                  msgTitle.SerializeToArraySegmentByte()
                 ).Package();
             SendFrame(frame);

+ 5 - 5
dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/EndpointManage.cs → dotnet/Library/Sers/Sers.Core/Sers.Core/Module/PubSub/SubscriberManage.cs

@@ -6,12 +6,12 @@ using System.Runtime.CompilerServices;
 
 namespace Sers.Core.Module.PubSub
 {
-    public class EndpointManage
+    public class SubscriberManage
     {
-        public static readonly EndpointManage Instance = new EndpointManage();
+        public static readonly SubscriberManage Instance = new SubscriberManage();
 
 
-        public EndpointManage()
+        public SubscriberManage()
         {
             MessageClient.Instance.Message_Consumer = Message_Consumer;
         }
@@ -64,7 +64,7 @@ namespace Sers.Core.Module.PubSub
                 subscriberList.TryAdd(subscriber.GetHashCode(), subscriber);
             }
         }
-        public void Message_SubscribeCancel(HotPlugSubscriber subscriber)
+        public void Message_UnSubscribe(HotPlugSubscriber subscriber)
         {
             lock (this)
             {
@@ -74,7 +74,7 @@ namespace Sers.Core.Module.PubSub
                 if (subscriberList.IsEmpty)
                 {
                     subscriberMap.TryRemove(subscriber.msgTitle, out _);
-                    MessageClient.Instance.Message_SubscribeCancel(subscriber.msgTitle);
+                    MessageClient.Instance.Message_UnSubscribe(subscriber.msgTitle);
                 }
             }     
         }

+ 7 - 0
dotnet/ServiceCenter/App.Gover.Gateway/wwwroot/JsStation/sers.ServiceStation.demo.js

@@ -174,7 +174,14 @@ serviceStation.apiClient.callApiAsync("/JsStation/api1", { name: 'sers' }, 'GET'
     });
 
 
+//------------------------------------------------------------------------
+//(x.6) subscribe message
+var subscriber = serviceStation.subscriberManage.createSubscribe('SersEvent.ServiceStation.Add', (message_bytes) => {
+    vit.logger.info("get message :" + vit.bytesToString(message_bytes));
+    subscriber.unSubscribe();
+});
 
+subscriber.subscribe();
 
 
 

+ 211 - 15
dotnet/ServiceCenter/App.Gover.Gateway/wwwroot/JsStation/sers.ServiceStation.js

@@ -1,12 +1,12 @@
 /*
  * sers.ServiceStation.js
- * Date   : 2022-01-23
- * Version: 2.1.17-temp
+ * Date   : 2022-05-07
+ * Version: 2.1.18-preview9
  * author : Lith
  * email  : serset@yeah.net
  */
 
-; sers = { version: '2.1.17-temp' };
+; sers = { version: '2.1.18-preview9' };
 
 /*
  * vit.js 扩展
@@ -230,6 +230,7 @@
 
 /*
 * sers.CL.js 扩展
+*    PipeFrame CL.DeliveryClient RequestAdaptor CL.OrganizeClient
 * author : Lith
 * email  : serset@yeah.net
 */
@@ -428,6 +429,12 @@
 		//	callback: function(apiReplyMessage_bytes){ }
 		self.event_onGetRequest;
 
+
+		//事件,delivery向Organize发送请求时被调用
+		//function (bytes) { }
+		self.event_onGetMessage;
+		
+
 		//请求超时时间(单位ms,默认300000)
 		self.requestTimeoutMs = 300000;
 
@@ -465,7 +472,7 @@
 					break;
 
 				case EFrameType.message:
-					//TODO
+					self.event_onGetMessage(msgData);
 					break;
 			}
 		};
@@ -530,6 +537,12 @@
 
 
 
+		self.sendMessage = function (message_bytes) {
+			delivery_sendFrame(EFrameType.message, 0, message_bytes);
+		};
+
+
+
 		function delivery_sendFrame(msgType, requestType, bytes) {
 			bytes.splice(0, 0, msgType, requestType);
 			self.event_onSendFrame(bytes);
@@ -550,6 +563,7 @@
 	}
 
 
+
 	//websocketHost demo: "ws://127.0.0.1:4503"
 	CL.OrganizeClient = function (websocketHost) {
 
@@ -582,6 +596,10 @@
 				self.event_onGetRequest(apiRequestMessage_bytes, callback);
 			};
 
+			requestAdaptor.event_onGetMessage = function (bytes) {
+				self.event_onGetMessage(bytes);
+			};
+
 			requestAdaptor.event_onSendFrame = function (bytes) {
 				delivery.sendFrame(bytes);
 			};
@@ -601,11 +619,19 @@
 		//	callback function(apiReplyMessage_bytes){}
 		self.event_onGetRequest = null;
 
+		//function (bytes) { }
+		self.event_onGetMessage = null;
+
 		//callback: ({success,replyData})=>{ }
 		self.sendRequest = function (requestData, callback) {
 			requestAdaptor.sendRequest(null, requestData, callback);
 		};
 
+ 
+		self.sendMessage = function (message_bytes) {
+			requestAdaptor.sendMessage(message_bytes);
+		};
+
 		//callback:   function (success) { }
 		self.connect = function (callback) {
 
@@ -649,6 +675,7 @@
 
 /*
 * sers.ServiceStation.js 扩展
+*   sers.ApiMessage	sers.ApiClient sers.MessageClient sers.LocalApiService sers.ServiceStation
 * author : Lith
 * email  : serset@yeah.net
 */
@@ -722,8 +749,8 @@
 		};
 	};
 
-	//(bytes files)
-	//return bytes
+	//arg		files(bytes[])
+	//return	bytes
 	ApiMessage.package = function () {
 		var files = arguments;
 		var oriData = [];
@@ -736,9 +763,8 @@
 		return oriData;
 	};
 
-
-	//(bytes oriData)
-	//return  bytes fileArray
+	//arg		oriData(bytes)
+	//return	bytes[]			file[]
 	ApiMessage.unpackage = function (oriData) {
 		var files = [];
 
@@ -785,6 +811,165 @@
 
 	};
 
+
+	//MessageClient
+	sers.MessageClient = function (organizeClient) {
+
+		let EFrameType = {
+			/// <summary>
+			///  publish, msgTitle, msgData
+			/// </summary>
+			publish : 0,
+			/// <summary>
+			/// subscribe, msgTitle
+			/// </summary>
+			subscribe : 1,
+			/// <summary>
+			/// unSubscribe, msgTitle
+			/// </summary>
+			unSubscribe : 2,
+			/// <summary>
+			/// message, msgTitle, msgData
+			/// </summary>
+			message : 3
+		};
+
+
+		function sendFrame(frame) {
+			organizeClient.sendMessage(frame);
+		}
+
+		this.onGetMessage = (messageData) => {
+			let frame = ApiMessage.unpackage(messageData);
+
+			let msgType = frame[0][0];
+
+            switch (msgType) {
+
+                case EFrameType.message:
+
+                    let msgTitle = vit.bytesToString(frame[1]);
+                    let msgData = frame[2];
+
+                    this.message_Consumer(msgTitle, msgData);
+                    break;
+            }
+		};
+
+
+		// (msgTitle,msgData)=>void
+		this.message_Consumer;
+
+		this.message_Publish = function (msgTitle, msgData) {
+			//EFrameType.publish, msgTitle, msgData 
+			let frame = ApiMessage.package(
+				[EFrameType.publish],
+				vit.stringToBytes(msgTitle),
+				msgData);
+            sendFrame(frame);
+		};
+
+		this.message_Subscribe = function (msgTitle) {
+			//EFrameType.subscribe, msgTitle
+			let frame = ApiMessage.package(
+				[EFrameType.subscribe],
+				vit.stringToBytes(msgTitle)
+			);
+			sendFrame(frame);
+		};
+
+		this.message_UnSubscribe = function (msgTitle) {
+			//EFrameType.unSubscribe, msgTitle
+			let frame = ApiMessage.package(
+				[EFrameType.unSubscribe],
+				vit.stringToBytes(msgTitle)
+			);
+			sendFrame(frame);
+		};
+	};
+
+	//SubscriberManage
+	sers.SubscriberManage = function (messageClient) {
+
+		this.createSubscribe = (msgTitle, onGetMessage) => {
+			return new sers.MessageSubscriber(this, msgTitle, onGetMessage);
+		};
+
+
+		//消息订阅者   msgTitle ->    Subscriber[]
+		let subscriberMap = {};
+
+		// subscriber:MessageSubscriber
+		this.message_Subscribe = function (subscriber) {
+
+			let subscriberList = subscriberMap[subscriber.msgTitle];
+
+			if (!subscriberList) {
+				subscriberList=subscriberMap[subscriber.msgTitle] = [];
+				messageClient.message_Subscribe(subscriber.msgTitle);
+			}
+
+			subscriberList.push(subscriber);
+		};
+
+
+		this.message_UnSubscribe = function (subscriber) {
+			let subscriberList = subscriberMap[subscriber.msgTitle];
+
+			if (!subscriberList) {
+				return false;
+			}
+
+			subscriberList = subscriberList.filter(m => m != subscriber);
+
+			if (subscriberList.length == 0) {
+				delete subscriberMap[subscriber.msgTitle];
+				messageClient.message_UnSubscribe(subscriber.msgTitle);
+			}
+		};
+
+
+		this.message_Consumer = function (msgTitle, msgData) {
+			let subscriberList = subscriberMap[msgTitle];
+
+			if (!subscriberList || !subscriberList.length) return;
+
+            for (let subscriber of subscriberList) {
+                try {
+                    if (!subscriber || !subscriber.onGetMessage) continue;
+                    subscriber.onGetMessage(msgData);
+                } catch (ex) {
+                    logger.error(ex);
+                }
+            }
+		};
+
+		messageClient.message_Consumer = this.message_Consumer;
+
+	};
+
+	//MessageSubscriber
+	sers.MessageSubscriber = function (subscriberManage,msgTitle, onGetMessage) {
+
+		this.msgTitle = msgTitle;
+
+		// bytes=>void
+		this.onGetMessage = onGetMessage;
+
+
+		this.subscribe = function () {
+			subscriberManage.message_Subscribe(this);
+			return this;
+		};
+
+		this.unSubscribe = function () {
+			subscriberManage.message_UnSubscribe(this);
+			return this;
+		};
+	};
+
+
+
 	//LocalApiService
 	sers.LocalApiService = function () {
 		var self = this;
@@ -925,10 +1110,11 @@
 
 
 	//ServiceStation
+	//	localApiService org apiClient serviceStationInfo
 	sers.ServiceStation = function () {
 		var self = this;
 
-		//(x.1) LocalApiService
+		//(x.1) localApiService
 		(function () {
 			self.localApiService = new sers.LocalApiService();
 		})();
@@ -945,27 +1131,37 @@
 		})();
 
 
-		//(x.3) ApiClient
+		//(x.3) apiClient
 		(function () {
 			self.apiClient = new sers.ApiClient(self.org);
 		})();
 
-		//(x.4)
+
+		//(x.4) subscriberManage
+		(function () {
+			let messageClient = new sers.MessageClient(self.org);
+			self.subscriberManage = new sers.SubscriberManage(messageClient);
+
+			self.org.event_onGetMessage = messageClient.onGetMessage;
+		})();
+
+
+		//(x.5)
 		self.stop = function () {
 			logger.info('[sers.ServiceStation] try stop...');
 			self.org.stop();
 			logger.info('[sers.ServiceStation] stoped.');
 		};
 
-		//(x.5)
+		//(x.6)
 		self.serviceStationInfo = {
 			serviceStationName: 'JsStation', serviceStationKey: '', stationVersion: '', info: {}
 		};
 
-		//(x.6)
+		//(x.7)
 		var deviceInfo = { deviceKey: ('' + Math.random()).substr(2) };
 
-		//(x.7)
+		//(x.8)
 		//callback: function(success){}
 		self.start = function (callback) {
 

Datei-Diff unterdrückt, da er zu groß ist
+ 3 - 3
dotnet/ServiceCenter/App.Gover.Gateway/wwwroot/JsStation/sers.ServiceStation.min.js


Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.