基于ASP.NET Core SignalR的流式传输

SignalR概述

SignalR是ASP.NET Core下非常流行的实现Web实时功能的库。微软再文档中列出了适合的应用场景:

适合 SignalR 的候选项:

  • 需要从服务器进行高频率更新的应用。示例包括游戏、社交网络、投票、拍卖、地图和 GPS 应用。

  • 仪表板和监视应用。示例包括公司仪表板、即时销售更新或旅行警报。

  • 协作应用。协作应用的示例包括白板应用和团队会议软件。

  • 需要通知的应用。社交网络、电子邮件、聊天、游戏、旅行警报和很多其他应用都需使用通知。

其实只要适合使用Ajax的场景都能使用,他比WebSockets更高级,实现了断线重连,广播,分组等功能。

流式传输

在介绍SingalR流式处理之前,我想先介绍一下流式处理的基本概念,

一提到流式传输,很多人往往感到比较棘手,那是因为可能用的比较少,通常我们习惯了准备数据然后一口气处理数据的编程范式,而不习惯一个个处理数据的范式。

流式处理就像一个水管,一头进水,一头出水。

例如C#中流式处理一个文本文件,我们是一次读取一行并处理他,而不是一口气读取文件中的所有行并处理他。

using (StreamReader sr = new StreamReader("TestFile.txt"))
{string line;// Read and display lines from the file until the end of// the file is reached.while ((line = sr.ReadLine()) != null)
{Console.WriteLine(line);
}
}

流式处理的好处就是数据的一部分准备好了,就可以对他立即进行处理,在内存中每次仅保留需要处理的那部分数据,这会大大优化内存的使用。

流式梳理非常适合处理大型数据集,例如文件读取,网络数据下载,IoT的数据传输,遍历并逐步处理数据库数据。

IEnumerable<T>

有必要再对这个接口重新做一个简要说明,枚举(称呼枚举或者迭代器,个人认为迭代器更合适)就是一个个列举。他是一种序列的概念,例如 1 2 3 4 5等,每列举一个我就可以处理一个。C#中的foreach就是用于处理迭代器的语法糖:

//枚举每一个Item
foreach(var item in GetNumbers())
{//处理每个item
}IEnumerable<int> GetNumbers()
{int i=0;while(i<10)
{yield return i++;
}
}//作为比对我列举一个常见的编程一口气准备好的编程范式。
List<int> GetNumbers()
{List<int> numbers = new List<int>(); //数组会分配内存,如果数据量很大,分配的内存会非常高。int i=0;while(i<10){numbers.add(i);}return numbers;
}

在C# 8.0以前,这个接口都是同步的,也就是说产生序列的方法会阻塞调用序列的方法。在异步async await 大行其道的今天,显然没有异步版本的迭代器实现是一个巨大的缺陷。(在这一以前可以用使用ValueTask<T>)

IAsyncEnumerable<T>

C# 8.0引入了这个接口,官网文档将它称为异步流。这个接口其实就是为了统一async await IEnumerable<T> . 微软在背后做了大量的工作,具体实现细节大家可以参考Async streams - C# 8.0 specification proposals | Microsoft Docs

我们来使用这个接口返回一个异步的迭代器:

public async IAsyncEnumerable<Data> GetData([EnumeratorCancellation] CancellationToken cancellationToken)
{var data = await _respository.GetDataAsync(cancellationToken);//为了演示,假设从数据库异步读取数据。foreach (var item in data)
{yield return item;
}
}
SignalR的流式传输

SignalR的流式传输使用了IAsyncEnumerable<T>接口,我直接引用微软的说法:

ASP.NET Core SignalR支持从客户端到服务器以及从服务器到客户端的流式传输。这适用于数据片段随着时间的推移而发生的情况。流式传输时,每个片段一旦变为可用,就会发送到客户端或服务器,而不是等待所有数据都可用。

当集线器方法返回 IAsyncEnumerable ChannelReader Task<IAsyncEnumerable<T>> Task<ChannelReader<T>> 时它会自动成为流式处理中心方法

public class AsyncEnumerableHub : Hub
{public async IAsyncEnumerable<int> Counter(int count,int delay,[EnumeratorCancellation]CancellationToken cancellationToken){for (var i = 0; i < count; i++){// Check the cancellation token regularly so that the server will stop// producing items if the client disconnects.cancellationToken.ThrowIfCancellationRequested();yield return i;// Use the cancellationToken in other APIs that accept cancellation// tokens so the cancellation can flow down to them.await Task.Delay(delay, cancellationToken);}}
}

代码很清楚的说明了这一做法。ChannelReader是另一种实现方法,这篇不做讲解。

做一个例子来实现一下

当下非常流行数据监控大屏应用。监控交通状况,股票行情,企业生产数据看板,IOT传感器实时数据显示,实时销售数据分析等等。

我想做一个简单的数据更新大屏的例子(一切为了简单,只用内存中的数据来存放数据),原谅我UI能比比较差,这篇不展示UI上的东西,而只展示数据如何以流的方式发送。

第一步 准备一个类用于处理数据更新

这个类使用一个定时器来定期更新数据,并记录一个数据是否更新的状态以便于只在数据更新的时候才发送数据。同时使用一个并发字典记录待更新的数据。

public class DataTicker{private Timer _timer; private volatile bool isUpdated  = false;//标记数据是否更新private readonly ConcurrentDictionary<string, Data> _datas = new ConcurrentDictionary<string, Data>();public DataTicker(){InitData();StartUpdateData();}public bool IsUpdated{get { return isUpdated; }}public async Task<ICollection<Data>> GetData(){if (IsUpdated) //如果数据已更新才发送数据{await Task.Delay(500); //模拟返回数据的延迟。这里好的做法是异步的方式返回数据。return _datas.Values;}return null;}public void StartUpdateData(){_timer = new Timer(UpdateDate, null, TimeSpan.FromMilliseconds(3000), TimeSpan.FromMilliseconds(3000));}private void InitData(){for(int i=0;i<10;i++){Data data = new Data();data.Id = i;data.Name = i.ToString();data.UpdatedDate = DateTime.Now;_datas.TryAdd(i.ToString(), data);}}public void UpdateDate(object state){var newData = foreach (var item in _datas.Values){item.UpdatedDate = System.DateTime.Now;}this.isUpdated = true;}public void MarkAsRead(){this.isUpdated = false;}}
第二部 编写SignalR中心异步流方法
public class DataUpdateHub : Hub{private DataTicker _ticker;public DataUpdateHub(DataTicker stockTicker){this._ticker = stockTicker;}//这个方法没有使用cancellationToken,好的做法是从外部传递cancellationTokenpublic async IAsyncEnumerable<Data> GetData([EnumeratorCancellation] CancellationToken cancellationToken){while (true){var data = await _ticker.GetData(); //当有数据更新的时候方法会返回数据,否则返回空。if (data != null){foreach (var item in data){yield return item;}_ticker.MarkAsRead(); //发送完毕后,将标记数据标记为已发送。}}}}
第三步 Javascript客户端调用代码
connection.start().then(function () {connection.stream("GetData") //连接到hub的异步流方法,当有数据从中心流出时候,next方法会被调用。因为本例中心使用wihle(true)方法,数据流会一直发送,所以complete方法将不会被调用。如果中心方法返回有限数据结束后例如使用foreach,则会调用complete方法。.subscribe({next: (item) => {var li = document.createElement("li"); li.textContent = "ID:" + item.id + "Datetime:" + item.updatedDate;document.getElementById("messagesList").appendChild(li);},complete: () => {var li = document.createElement("li");li.textContent = "Stream completed";document.getElementById("messagesList").appendChild(li);},error: (err) => {var li = document.createElement("li");li.textContent = err;document.getElementById("messagesList").appendChild(li);},});
}).catch(function (err) {return console.error(err.toString());
});
第四步 配置DataTicker依赖注入
builder.Services.AddSignalR();
builder.Services.AddSingleton<DataTicker>();

注意:这里的DataTicker注册的是单例的,也就是说所有发送给客户端的数据是共享的。既然是共享的,也就存在并发访问的问题,这就是为什么使用volatile关键字和并发字典的原因。

总结

ASP.NET SignalR可以很方便的使用异步流传输数据,这样客户端和服务器端是使用流的方式连接在一起的。

本文使用的是从服务器到客户端的流,当然你可以使用从客户端到服务器端的流。从.Net和Java的客户端都可以调用SignalR的中心流方法。具体可以参考微软官方文档,使用 ASP.NET Core 中的流式处理SignalR | Microsoft Docs

基于ASP.NET Core SignalR的流式传输相关推荐

  1. ASP.NET Core SignalR中的流式传输

    什么是流式传输? 流式传输是这一种以稳定持续流的形式传输数据的技术. 流式传输的使用场景 有些场景中,服务器返回的数据量较大,等待时间较长,客户端不得不等待服务器返回所有数据后,再进行相应的操作.这时 ...

  2. 从零开始实现ASP.NET Core MVC的插件式开发(五) - 插件的删除和升级

    标题:从零开始实现ASP.NET Core MVC的插件式开发(五) - 使用AssemblyLoadContext实现插件的升级和删除 作者:Lamond Lu 地址:https://www.cnb ...

  3. 从零开始实现 ASP.NET Core MVC 的插件式开发(九) - 如何启用预编译视图

    标题:从零开始实现 ASP.NET Core MVC 的插件式开发(九) - 升级.NET 5及启用预编译视图 作者:Lamond Lu 地址:https://www.cnblogs.com/lwql ...

  4. 推荐一本基于ASP.NET Core 3.1的实战来了

    第一本基于 ASP.NET Core 3.1 的实战书来了 我脱产花费了一年时间创作书籍<深入浅出 ASP.NET Core>,终于上架了.目前天猫.京东等主流平台均有销售. 这本书是基于 ...

  5. 从零开始实现ASP.NET Core MVC的插件式开发(六) - 如何加载插件引用

    标题:从零开始实现ASP.NET Core MVC的插件式开发(六) - 如何加载插件引用. 作者:Lamond Lu 地址:https://www.cnblogs.com/lwqlun/p/1171 ...

  6. Asp.Net Core SignalR 与微信小程序交互笔记

    什么是Asp.Net Core SignalR Asp.Net Core SignalR 是微软开发的一套基于Asp.Net Core的与Web进行实时交互的类库,它使我们的应用能够实时的把数据推送给 ...

  7. 发现 ASP.NET Core SignalR

    ASP.NET SignalR 是几年前推出的工具,可供 ASP.NET 开发人员使用,以向应用程序添加实时功能.只要基于 ASP.NET 的应用程序必须接收来自服务器(从监视系统到游戏)的频繁异步更 ...

  8. GitHub高赞!ASP.NET Core SignalR聊天室开源了!

    能双向通信的SignalR框架,很多人都感兴趣却又玩不转,最近发现一个基于.NET6的ASP.NET Core SignalR聊天室,完成了基于SqlSugar+SQLServer登陆退出和聊天存档等 ...

  9. 基于亚马逊云科技的流式传输云游戏,让安卓游戏出奇制胜

    将游戏从云端流式传输到移动设备是一项新兴技术,可以方便低端移动设备,在硬件条件受限的情况下,提高游戏品质.有了这项技术,玩家无需更换升级移动设备(如智能手机.平板电脑和智能电视),就能享受高品质游戏体 ...

最新文章

  1. 《数学之美》第13章 Google AK-47的设计者—阿米特.辛格博士
  2. oracle sql statement ignored,sql – Oracle无效使用类型名称或子类型名称
  3. 解读main()方法中的String[] args
  4. python module是干什么的_如何最简单、通俗地理解Python的模块?
  5. java返回两个string_java – 为什么String.intern()方法返回两个不同的结果?
  6. Java输入输出流和文件操作
  7. python中数据用折线图表示_用python处理文本数据
  8. python程序员怎么面试_Python程序员面试,这些问题你必须提前准备!
  9. 【BZOJ2221】面试的考验,随机数列+线段树+离线
  10. LVDS之一_理解SerDes
  11. 阻止默认事件event.preventDefault();
  12. adodb 连接mysql_PHP中使用ADODB连接MySQL中文乱码
  13. 权重计算方法二:熵权法(EWM)
  14. Scene窗口—Scene视图导航
  15. springboot 之 SpringBoot指定额外需要扫描的包
  16. 人工神经网络的训练步骤,神经网络常用训练方法
  17. input 表单的type属性
  18. 网页多次刷新出不来怎么办
  19. 十年再出发:阿里云智能战略加速的“四级火箭”
  20. K8S原理剖析:Pod、工作负载与服务

热门文章

  1. 在Orchard中使用Image Gallery模块
  2. mysql数据库的优缺点
  3. linux 去掉 ^M 的方法
  4. 2019.04.24笔记
  5. 页面缓存处理的几种方法
  6. 深入理解null的原理
  7. 文本生成器(bzoj 1030)
  8. T T[] toArray(T[] a);
  9. JS里的onclick事件
  10. Andorid Binder进程间通信---总结