电子商务网站建设教案公司广告推广方案
注意下文里面的 SignalR 不是 Core 版本,而是 Framework 下的
本文使用的方式是把 SignalR 写在控制台项目里,再用 Topshelf 注册成 Windows 服务
这样做有两点好处
- 传统 Window 服务项目调试时需要“附加到进程”,开发体验比较差,影响效率
- 使用控制台不仅可以随时打断点调试,还可以随时打印调试信息,非常方便
Topshelf 的使用方法这里不再阐述,在控制台里使用 Topshelf 三个步骤 :
- 定义一个 Owin 自托管作为 SignalR的宿主,里面设置允许跨域,起名为 Startup
using Microsoft.AspNet.SignalR; using Microsoft.Owin.Cors; using Owin; using System; using System.Diagnostics;namespace HenryMes.SignalR.Hosting {/// <summary>/// 配置跨域请求、SignalR Server/// </summary>class Startup{public void Configuration(IAppBuilder app){//app.UseErrorPage();app.UseCors(CorsOptions.AllowAll);// 有关如何配置应用程序的详细信息,请访问 http://go.microsoft.com/fwlink/?LinkID=316888//Hub Modeapp.MapSignalR("/lcc", new HubConfiguration());app.Map("/signalr", map =>{var config = new HubConfiguration{// You can enable JSONP by uncommenting this line// JSONP requests are insecure but some older browsers (and some// versions of IE) require JSONP to work cross domainEnableJSONP = true};//config.EnableCrossDomain = true;// Turns cors support on allowing everything// In real applications, the origins should be locked downmap.UseCors(CorsOptions.AllowAll).RunSignalR(config);});Make long polling connections wait a maximum of 110 seconds for aresponse. When that time expires, trigger a timeout command andmake the client reconnect.//GlobalHost.Configuration.ConnectionTimeout = TimeSpan.FromSeconds(110);Wait a maximum of 30 seconds after a transport connection is lostbefore raising the Disconnected event to terminate the SignalR connection.//GlobalHost.Configuration.DisconnectTimeout = TimeSpan.FromSeconds(30);For transports other than long polling, send a keepalive packet every10 seconds. This value must be no more than 1/3 of the DisconnectTimeout value.//GlobalHost.Configuration.KeepAlive = TimeSpan.FromSeconds(10);// Turn tracing on programmaticallyGlobalHost.TraceManager.Switch.Level = SourceLevels.Information;}} }
- 建立可在服务里运行的服务类,使用了上面的Startup配置实例化宿主对象,里面定义了服务的启动,暂停,关闭等触发时的一些动作,本文就建立一个 JobManager 类来完成这些工作
using HenryMes.Utils; using Microsoft.Owin.Hosting; using System; using System.Threading.Tasks;namespace HenryMes.SignalR.Hosting {public class JobManager{private const string displayName = "SignalR 状态监控";IDisposable SignalR { get; set; }public bool Start(){try{//signalr server地址,端口可以更换,确保不被占用,否则服务启动不了 #if DEBUGvar url = $"http://{JsonConfig.Instance.Root()?.Debug?.Ip}:{JsonConfig.Instance.Root()?.Debug?.Port}";var Port = $"{JsonConfig.Instance.Root()?.Debug?.Port}"; #elsevar url = $"http://{JsonConfig.Instance.Root()?.Release?.Ip}:{JsonConfig.Instance.Root()?.Release?.Port}";var Port = $"{JsonConfig.Instance.Root()?.Release?.Port}"; #endifStartOptions options = new StartOptions();options.Urls.Add(url);options.Urls.Add($"http://+:{Port}");//此处需要用一个全局变量来保存WebApp,否则在发布为后台服务的时候生命周期会提前结束,被系统回收掉SignalR = WebApp.Start<Startup>(options);Task.Delay(TimeSpan.FromSeconds(1)).Wait();Console.WriteLine("Server running on {0}", url);Console.WriteLine($"{displayName}服务开始");Console.ReadLine();LogHelper.GetInstance().Information($"{displayName}服务开始,地址 {url}");return true;}catch (Exception ex){LogHelper.GetInstance().Error(ex);}return false;}public bool Stop(){SignalR.Dispose();LogHelper.GetInstance().Information($"{displayName}服务停止");System.Threading.Thread.Sleep(1500);return true;}public bool Shutdown(){SignalR.Dispose();LogHelper.GetInstance().Information($"{displayName}服务停止");System.Threading.Thread.Sleep(1500);return true;}} }
- 在 Program.cs 文件,也就是入口函数 main 调用 Topshelf 对服务进行配置
using Topshelf;namespace HenryMes.SignalR.Hosting
{internal class Program{private const string displayName = "HenryMes.SignalR.Hosting";static void Main(string[] args){HostFactory.Run(x => {x.Service<JobManager>(s =>{s.ConstructUsing(name => new JobManager());s.WhenStarted(tc => tc.Start());s.WhenShutdown(tc => tc.Shutdown());s.WhenStopped(tc => tc.Stop());});x.RunAsLocalSystem();x.StartAutomatically();x.SetDescription(displayName);x.SetDisplayName(displayName);x.SetServiceName(displayName);});}}
}
下面定义一个 SignalR 的 Hub 基类,里面管理了SignalR 的连接和断开,一个线程管理一个连接,连接断开,线程自动取消,建立一个抽象类 BaseHub
using HenryMes.Utils;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;namespace HenryMes.SignalR.Hosting
{/// <summary>/// /// </summary>[HubName(nameof(T))]public abstract class BaseHub<T> : Hub where T : IHub{/// <summary>/// 线程安全版本的字典,管理SignalR的连接/// </summary>protected static readonly ConcurrentDictionary<string, CancellationTokenSource> Connections =new ConcurrentDictionary<string, CancellationTokenSource>();/// <summary>/// 异步锁/// </summary>public static readonly object AsyncObj = new object();/// <summary>/// 设置超时时间/// </summary>abstract protected int millisecondsTimeout { get; }/// <summary>/// 设置线程轮询时间/// </summary>abstract protected int intervalTime { get; }/// <summary>/// 跑单个Task/// </summary>abstract protected Func<object> RunMethod { get; }/// <summary>/// 跑多个Task, 返回是否超时/// </summary>abstract protected Func<CancellationTokenSource, (bool, object)> RunMultiTaskMethod { get; }/// <summary>/// 是否跑多任务/// </summary>abstract protected bool runMultiTask { get; }//当客户端与服务器建立连接后执行的方法public override Task OnConnected(){//获取客户端IDConsole.WriteLine("{0}已连接", Context.ConnectionId);LogHelper.GetInstance().Information($"服务端与客户端:【{typeof(T).Name}】{Context.ConnectionId} 成功建立连接!");return base.OnConnected();}public override Task OnReconnected(){Console.WriteLine("{0}已重连", Context.ConnectionId);LogHelper.GetInstance().Information($"服务端与客户端:【{typeof(T).Name}】{Context.ConnectionId} 已重连!");Send(Context.ConnectionId);return base.OnReconnected();}/// <summary>/// 所有任务执行完是否超时/// </summary>/// <param name="tokenSource"></param>/// <param name="allTasks"></param>/// <returns></returns>public bool IsCompletedAllTasks(CancellationTokenSource tokenSource, Task[] allTasks){try{return Task.WaitAll(allTasks, millisecondsTimeout, tokenSource.Token);}catch (AggregateException ex){LogHelper.GetInstance().Error($"系统错误:{this.GetType().Name},{ex.Flatten().InnerException.Message}");tokenSource.Cancel();}return false;}/// <summary>/// 向客户端发送消息/// </summary>/// <param name="connectId"></param>public void Send(string connectId){lock (AsyncObj){var tokenSource = new CancellationTokenSource();Connections.TryAdd(connectId, tokenSource);Task.Run(() =>{while (!tokenSource.Token.IsCancellationRequested){try{// 是否是多任务if (runMultiTask == false){var result = RunMethod();var message = $"【{typeof(T).Name}】 {connectId} 正在回传数据!";LogHelper.GetInstance().Information(message);// 把组装好的数据推送到前端BaseNotifer<T>.Refresh(connectId, JsonConvert.SerializeObject(result));tokenSource.Token.WaitHandle.WaitOne(intervalTime);}else{// 是否超时var (isCompleted, result) = RunMultiTaskMethod(tokenSource);if (isCompleted){var message = $"【{typeof(T).Name}】 {connectId} 正在回传数据!";LogHelper.GetInstance().Information(message);// 把组装好的数据推送到前端BaseNotifer<T>.Refresh(connectId, JsonConvert.SerializeObject(result));// 下一次推送等待N秒后进行tokenSource.Token.WaitHandle.WaitOne(intervalTime);}else{// 等待超时tokenSource.Cancel();// 打印超时错误日志LogHelper.GetInstance().Error($@"{this.GetType().Name} 推送超时! 当前超时时间设置为{millisecondsTimeout}毫秒!");// 重新执行Connections.TryRemove(connectId, out tokenSource);Send(connectId);}}}catch(AggregateException ex){LogHelper.GetInstance().Error($"系统错误:{this.GetType().Name},{ex.Flatten().InnerException.Message}");tokenSource.Token.WaitHandle.WaitOne(intervalTime);}}}, tokenSource.Token);}}/// <summary>/// 连接断开事件/// </summary>/// <param name="stopCalled"></param>/// <returns></returns>public override Task OnDisconnected(bool stopCalled){lock (AsyncObj){try{var tokenSource = Connections[Context.ConnectionId];Connections.TryRemove(Context.ConnectionId, out tokenSource);tokenSource.Cancel();LogHelper.GetInstance().Information($"服务端与客户端:【{typeof(T).Name}】{Context.ConnectionId} 连接已断开!");}catch (Exception ex){if (Connections.ContainsKey(Context.ConnectionId)){var tokenSource = Connections[Context.ConnectionId];Connections.TryRemove(Context.ConnectionId, out tokenSource);}// 打印错误日志LogHelper.GetInstance().Error($@"{this.GetType().Name} 已断开! {ex.Message}!");}}return base.OnDisconnected(stopCalled);}}
}
以一个具体的 Hub 为例,继承上面的 BaseHub, 建立一个 具体实现的 Hub 名为 OperationKanBanHub ,使用 RunMultiTaskMethod 并行执行一些任务,这是项目里的一个真实案例,不必关心细节
using HenryMes.Entitys;
using HenryMes.WebApi.Controllers;
using HenryMes.WebApi.Controllers.Other;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;namespace HenryMes.SignalR.Hosting.Hubs
{/// <summary>/// 运营看板/// </summary>public class OperationKanBanHub : BaseHub<OperationKanBanHub>{/// <summary>/// 设置超时时间/// </summary>protected override int millisecondsTimeout => 10000;/// <summary>/// 设置线程轮询时间/// </summary>protected override int intervalTime => 5000;/// <summary>/// 是否跑多任务/// </summary>protected override bool runMultiTask => true;/// <summary>/// 跑单个Task/// </summary>protected override Func<object> RunMethod => throw new NotImplementedException();/// <summary>/// 跑多个Task/// </summary>protected override Func<CancellationTokenSource, (bool, object)> RunMultiTaskMethod => (TokenSource) =>{#region Task取数// 菜籽收购var taskSum4Rapeseed = Task.Run(() =>{KanbanController controller = new KanbanController();dynamic data = controller.ControlCenter_Center_Sum4Rapeseed();return data.Content;});// 油品生产,销售var taskSum4Oil = Task.Run(() =>{KanbanController controller = new KanbanController();dynamic data = controller.ControlCenter_Center_Sum4Oil();return data.Content;});// 库存量前10的存货var taskSum4Top = Task.Run(() =>{KanbanController controller = new KanbanController();dynamic data = controller.ControlCenter_Center_Sum4Top();return data.Content;});// 罐区存油,读取 mongodbvar taskTankOilQuantity = Task.Run(() =>{return new List<dynamic>{new { tank = "Tank1001",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1002",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1003",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1004",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1005",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1006",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1007",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1008",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1009",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1010",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1011",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1012",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1013",temperature = "21.4℃",pressure = "21PA", quantity = "100" },new { tank = "Tank1014",temperature = "21.4℃",pressure = "21PA", quantity = "100" },};});// 近一年产出销售 1-12月var taskSaleDispatch4Month = Task.Run(() =>{KanbanController controller = new KanbanController();dynamic data = controller.ControlCenter_Center_SaleDispatch4Month();return data.Content;});// 最近10条采购信息var taskPreInStore4Lately = Task.Run(() =>{KanbanController controller = new KanbanController();dynamic data = controller.ControlCenter_Center_PreInStore4Lately();return data.Content;});// 最近10条生产计划var taskProductionTask = Task.Run(() =>{ProductionTaskController controller = new ProductionTaskController();dynamic data = controller.QueryTake(new SqlSugarPageRequest{PageIndex = 1,PageSize = 10,Filter = new List<SqlSugar.ConditionalModel>()});return data.Content;});#endregion#region 同步阻塞等待所有Task执行完// 所有线程任务是否完成 默认falsevar isCompleted = IsCompletedAllTasks(TokenSource, new Task[] {taskSum4Rapeseed,taskSum4Oil,taskSum4Top,taskTankOilQuantity,taskSaleDispatch4Month,taskPreInStore4Lately,taskProductionTask});#endregionif (isCompleted){#region 所有Task已完成// 菜籽var RapeseedResult = taskSum4Rapeseed.Result;var Rapeseed = new{GYS = new{PurchaseReceiveQuantity = RapeseedResult?.Data?.Rapeseed_GYS?.PurchaseReceiveQuantity,BalanceQuantity = RapeseedResult?.Data?.Rapeseed_GYS?.BalanceQuantity,},SD = new{PurchaseReceiveQuantity = RapeseedResult?.Data?.Rapeseed_SD?.PurchaseReceiveQuantity,BalanceQuantity = RapeseedResult?.Data?.Rapeseed_SD?.BalanceQuantity,}};// 油品生产,销售dynamic Sum4OilResult = taskSum4Oil.Result;var Sum4Oil = new{// 产出成品油TankOilQuantity = Sum4OilResult?.Data?.TankOil?.ProductReceiveQuantity,// 产出包装油PackageOilQuantity = Sum4OilResult?.Data?.PackageOil?.ProductReceiveQuantity,// 销售包装油SaleOilQuantity = Sum4OilResult?.Data?.PackageOil?.SaleDispatchQuantity};// 存货中库存量前10的存货var Sum4TopResult = taskSum4Top.Result;var Sum4Top = new{Sum4TopResult?.Data?.DataSource};// 罐区存油var TankOilQuantity = taskTankOilQuantity.Result;// 近一年产出销售 1-12月var TaskSaleDispatch4MonthResult = taskSaleDispatch4Month.Result;var SaleDispatch4Month = new{Sale = TaskSaleDispatch4MonthResult?.Data?.SaleDispatch.Details,Product = TaskSaleDispatch4MonthResult?.Data?.ProductReceive.Details};// 最近10条采购信息var TaskPreInStore4LatelyResult = taskPreInStore4Lately.Result;var PreInStore4Lately = TaskPreInStore4LatelyResult?.Data?.DataSource;// 最近10条生产计划var taskProductionTaskResult = taskProductionTask.Result;var ProductionTask = taskProductionTaskResult?.Data;return (isCompleted, new{Rapeseed,Sum4Oil,Sum4Top,TankOilQuantity,SaleDispatch4Month,PreInStore4Lately,ProductionTask});#endregion}return (isCompleted, new { });};}
}
此时 SignalR 的后台推送基本就完成了,再来就是web前端的接收推送和断线下的自动重新连接(比如说后台服务程序做了更新,此时需要关闭服务再启动服务,这个时候要求web端不断尝试重新连接,直到后台服务启动并重新连接上为止)
前端使用 Vue 2.0 + jQuery.signalR 2.4.2 , 只列一下关键代码
import $ from "jquery";
import "signalr";
import echarts from "../../pages/kanban/OperationKanBanEcharts.vue";
export default {components: { echarts },data() {return {connection: null,proxy: null,// 是否需要断线重连的标记,当页面关闭时是不需要继续推送的tryReconnect : true}},methods: {// 从SignalR推送过来的数据,刷新看板refreshKanban(message) {// 刷新时间this.getDateTime()let obj = JSON.parse(message)// 省略无关代码......},},mounted() {this.$nextTick(() => {this.connection = $.hubConnection(process.env.SignalR);// 定义服务器端SignalR推送过来的消息接收代理this.proxy = this.connection.createHubProxy("OperationKanBanHub");this.proxy.on("Refresh", (message) => {console.log(`接收到来自服务端 ${this.connection.id} 的数据!`)this.refreshKanban(message)});// 创建连接到服务器端SignalR的连接this.connection.start().done(() => {// 客户端发送信息到服务器this.proxy.invoke("Send", this.connection.id);}).fail((err) => {console.log(err);});this.connection.disconnected(() => {if(this.tryReconnect) {setTimeout(() => {console.log('连接已断开,正尝试重新连接!')this.connection.start().done(() => {this.proxy.invoke("Send", this.connection.id); // 客户端发送信息到服务器}).fail((err) => {console.log(err);});}, 5000); // Restart connection after 5 seconds.}});});},deactivated() {if (this.connection) {// 关闭SignalR连接this.tryReconnect = falsethis.connection.stop();// 清除缓存this.$vnode.parent.componentInstance.cache = {};this.$vnode.parent.componentInstance.keys = [];}},
};
最后的一个步骤,怎么把后台的控制台SignalR宿主程序安装成 Windows 服务?在项目里建立两个批处理文件,Install.bat 安装服务,UnInstall.bat 卸载服务,点击右键点文件属性,把他们的编码改为 ansi(不要问我为什么......因为不改的话,打开批处理命令窗口的时候中文会显示成乱码)
Install.bat
@echo onrem 设置DOS窗口的背景颜色及字体颜色
color 2frem 设置DOS窗口大小
mode con: cols=80 lines=25@echo off
echo 请按任意键开始安装 HenryMes.SignalR.Hosting 服务rem 以管理员身份运行
%1 mshta vbscript:CreateObject("Shell.Application").ShellExecute("cmd.exe","/c %~s0 ::","","runas",1)(window.close)&&exit
:Adminrem 输出空行
echo.
pausecd /d %~dp0
HenryMes.SignalR.Hosting install --autostart start
net start HenryMes.SignalR.Hostingpause
UnInstall.bat
@echo onrem 设置DOS窗口的背景颜色及字体颜色
color 2frem 设置DOS窗口大小
mode con: cols=80 lines=25@echo off
echo 请按任意键开始卸载 HenryMes.SignalR.Hosting 服务rem 以管理员身份运行
%1 mshta vbscript:CreateObject("Shell.Application").ShellExecute("cmd.exe","/c %~s0 ::","","runas",1)(window.close)&&exit
:Adminrem 输出空行
echo.
pausecd /d %~dp0
net stop HenryMes.SignalR.Hosting
HenryMes.SignalR.Hosting uninstallpause