layout: post
title: 任务队列和异步接口的正确打开方式(.NET Core版本)
category: dotnet core
date: 2019-01-12
tags:

  • dotnet core
  • redis
  • 消息队列
  • 异步API

任务队列和异步接口的正确打开方式

什么是异步接口?

<h2 id="asynchronous-operations">Asynchronous Operations</h2>

Certain types of operations might require processing of the request in an asynchronous manner (e.g. validating a bank account, processing an image, etc.) in order to avoid long delays on the client side and prevent long-standing open client connections waiting for the operations to complete. For such use cases, APIs MUST employ the following pattern:

_For POST requests_:

  • Return the 202 Accepted HTTP response code.
  • In the response body, include one or more URIs as hypermedia links, which could include:

    • The final URI of the resource where it will be available in future if the ID and path are already known. Clients can then make an HTTP GET request to that URI in order to obtain the completed resource. Until the resource is ready, the final URI SHOULD return the HTTP status code 404 Not Found.
`{ "rel": "self", "href": "/v1/namespace/resources/{resource_id}", "method": "GET" }`* A temporary request queue URI where the status of the operation may be obtained via some temporary identifier. Clients SHOULD make an HTTP `GET` request to obtain the status of the operation which MAY include such information as completion state, ETA, and final URI once it is completed.`{ "rel": "self", "href": "/v1/queue/requests/{request_id}, "method": "GET" }"`

_For PUT/PATCH/DELETE/GET requests_:

Like POST, you can support PUT/PATCH/DELETE/GET to be asynchronous. The behaviour would be as follows:

  • Return the 202 Accepted HTTP response code.
  • In the response body, include one or more URIs as hypermedia links, which could include:

    • A temporary request queue URI where the status of the operation may be obtained via some temporary identifier. Clients SHOULD make an HTTP GET request to obtain the status of the operation which MAY include such information as completion state, ETA, and final URI once it is completed.
`{ "rel": "self", "href": "/v1/queue/requests/{request_id}, "method": "GET" }"`

_APIs that support both synchronous and asynchronous processing for an URI_:

APIs that support both synchronous and asynchronous operations for a particular URI and an HTTP method combination, MUST recognize the Prefer header and exhibit following behavior:

  • If the request contains a Prefer=respond-async header, the service MUST switch the processing to asynchronous mode.
  • If the request doesn't contain a Prefer=respond-async header, the service MUST process the request synchronously.

It is desirable that all APIs that implement asynchronous processing, also support webhooks as a mechanism of pushing the processing status to the client.

资料引自:paypal/API Design Patterns And Use Cases:asynchronous-operations

用人话来说

  • 简单来说就是请求过来,直接返回对应的resourceId/request_id,然后可以通过resourceId/request_id查询处理结果
  • 处理过程可能是队列,也可能直接是异步操作
  • 如果还没完成处理,返回404,如果处理完成,正常返回对应数据

好像也没什么讲了....

全文结束吧.

样例代码部分啦

实现逻辑

  • 创建任务,生成"request-id"存储到对应redis zset队列中
  • 同时往redis channel发出任务消息, 后台任务处理服务自行处理此消息(生产者-消费者模式)
  • 任务处理服务处理完消息之后,将处理结果写入redis,request-id为key,结果为value,然后从从redis zset从移除对应的"request-id"
  • 获取request-id处理结果时:如果request-id能查询到对应的任务处理结果,直接返回处理完的数据; 如果request-id还在sortset队列则直接返回404 + 对应的位置n,表示还在处理中,前面还有n个请求;

时序图大概长这样:

喜闻乐见代码时间

RequestService.cs

// RequestService.csusing System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CorrelationId;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using static StackExchange.Redis.RedisChannel;namespace MTQueue.Service
{public class RequestService{private readonly ICorrelationContextAccessor _correlationContext;private readonly ConnectionMultiplexer _redisMultiplexer;private readonly IServiceProvider _services;private readonly ILogger<RequestService> _logger;public RequestService(ICorrelationContextAccessor correlationContext,ConnectionMultiplexer redisMultiplexer, IServiceProvider services,ILogger<RequestService> logger){_correlationContext = correlationContext;_redisMultiplexer = redisMultiplexer;_services = services;_logger = logger;}public long? AddRequest(JToken data){var requestId = _correlationContext.CorrelationContext.CorrelationId;var redisDB = _redisMultiplexer.GetDatabase(CommonConst.DEFAULT_DB);var index = redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);if (index == null){data["requestId"] = requestId;redisDB.SortedSetAdd(CommonConst.REQUESTS_SORT_SETKEY, requestId, GetTotalSeconds());PushRedisMessage(data.ToString());}return redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);}public static long GetTotalSeconds(){return (long)(DateTime.Now.ToLocalTime() - new DateTime(1970, 1, 1).ToLocalTime()).TotalSeconds;}private void PushRedisMessage(string message){Task.Run(() =>{try{using (var scope = _services.CreateScope()){var multiplexer = scope.ServiceProvider.GetRequiredService<ConnectionMultiplexer>();multiplexer.GetSubscriber().PublishAsync(CommonConst.REQUEST_CHANNEL, message);}}catch (Exception ex){_logger.LogError(-1, ex, message);}});}public Tuple<JToken, long?> GetRequest(string requestId){var redisDB = _redisMultiplexer.GetDatabase(CommonConst.DEFAULT_DB);var keyIndex = redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);var response = redisDB.StringGet(requestId);if (response.IsNull){return Tuple.Create<JToken, long?>(default(JToken), keyIndex);}return Tuple.Create<JToken, long?>(JToken.Parse(response), keyIndex);}}
}

// RedisMQListener.csusing System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MTQueue.Model;
using MTQueue.Service;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using static StackExchange.Redis.RedisChannel;namespace MTQueue.Listener
{public class RedisMQListener : IHostedService{private readonly ConnectionMultiplexer _redisMultiplexer;private readonly IServiceProvider _services;private readonly ILogger<RedisMQListener> _logger;public RedisMQListener(IServiceProvider services, ConnectionMultiplexer redisMultiplexer,ILogger<RedisMQListener> logger){_services = services;_redisMultiplexer = redisMultiplexer;_logger = logger;}public Task StartAsync(CancellationToken cancellationToken){Register();return Task.CompletedTask;}public virtual bool Process(RedisChannel ch, RedisValue message){_logger.LogInformation("Process start,message: " + message);var redisDB = _services.GetRequiredService<ConnectionMultiplexer>().GetDatabase(CommonConst.DEFAULT_DB);var messageJson = JToken.Parse(message);var requestId = messageJson["requestId"]?.ToString();if (string.IsNullOrEmpty(requestId)){_logger.LogWarning("requestId not in message.");return false;}var mtAgent = _services.GetRequiredService<ZhihuClient>();var text = mtAgent.GetZhuanlan(messageJson);redisDB.StringSet(requestId, text.ToString(), CommonConst.RESPONSE_TS);_logger.LogInformation("Process finish,requestId:" + requestId);redisDB.SortedSetRemove(CommonConst.REQUESTS_SORT_SETKEY, requestId);return true;}public void Register(){var sub = _redisMultiplexer.GetSubscriber();var channel = CommonConst.REQUEST_CHANNEL;sub.SubscribeAsync(channel, (ch, value) =>{Process(ch, value);});}public void DeRegister(){// this.connection.Close();}public Task StopAsync(CancellationToken cancellationToken){// this.connection.Close();return Task.CompletedTask;}}}

// RequestsController.csusing System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CorrelationId;
using Microsoft.AspNetCore.Mvc;
using MTQueue.Service;
using Newtonsoft.Json.Linq;namespace MTQueue.Controllers
{[Route("v1/[controller]")][ApiController]public class RequestsController : ControllerBase{private readonly ICorrelationContextAccessor _correlationContext;private readonly RequestService _requestService;private readonly ZhihuClient _mtAgentClient;public RequestsController(ICorrelationContextAccessor correlationContext,RequestService requestService, ZhihuClient mtAgentClient){_correlationContext = correlationContext;_requestService = requestService;_mtAgentClient = mtAgentClient;}[HttpGet("{requestId}")]public IActionResult Get(string requestId){var result = _requestService.GetRequest(requestId);var resource = $"/v1/requests/{requestId}";if (result.Item1 == default(JToken)){return NotFound(new { rel = "self", href = resource, method = "GET", index = result.Item2 });}return Ok(result.Item1);}[HttpPost]public IActionResult Post([FromBody] JToken data, [FromHeader(Name = "Prefer")]string prefer){if (!string.IsNullOrEmpty(prefer) && prefer == "respond-async"){var index = _requestService.AddRequest(data);var requestId = _correlationContext.CorrelationContext.CorrelationId;var resource = $"/v1/requests/{requestId}";return Accepted(resource, new { rel = "self", href = resource, method = "GET", index = index });}return Ok(_mtAgentClient.GetZhuanlan(data));}}
}

完整代码见:https://github.com/liguobao/TaskQueueSample

任务队列和异步接口的正确打开方式(.NET Core版本)相关推荐

  1. 这才是目前百度统计接口的正确打开方式20180322

    这才是目前百度统计接口的正确打开方式20180322 关于百度统计接口的说明 1.登陆接口网站找到的有2种方式 第一种调用(不能用) https://api.baidu.com/sem/common/ ...

  2. [分布式训练] 单机多卡的正确打开方式:理论基础

    [分布式训练] 单机多卡的正确打开方式:理论基础 转自:https://fyubang.com/2019/07/08/distributed-training/ 瓦砾由于最近bert-large用的比 ...

  3. pytorch单机多卡的正确打开方式 以及可能会遇到的问题和相应的解决方法

    pytorch 单机多卡的正确打开方式 pytorch 使用单机多卡,大体上有两种方式 简单方便的 torch.nn.DataParallel(很 low,但是真的很简单很友好) 使用 torch.d ...

  4. C# 倍福ADS的正确打开方式,使用AdsRemote组件优雅的通过ADS通讯

    C# 倍福ADS的正确打开方式,使用AdsRemote组件优雅的通过ADS通讯,支持WPF窗体控件的绑定机制,做上位机页面很方便,大大节省了开发时间. 倍福的官方文档给的例子我就不多说了,今天介绍一种 ...

  5. opengl 贴图坐标控制_材质贴图正确打开方式

    哈喽,各位观众朋友们好鸭~欢迎来到讲道理画图的地方,我是黄玮宁. 最近呀经常有小伙伴来问我那些不同通道的材质贴图该怎么用,而且频率不是一般的高,所以我觉得有必要来说说这些通道贴图的用法了. 视频版(B ...

  6. Console控制台的正确打开方式

    Console控制台的正确打开方式 console对象提供了访问浏览器调试模式的信息到控制台 -- Console对象|-- assert() 如果第一个参数断言为false,则在控制台输出错误信息| ...

  7. log python_基于Python log 的正确打开方式

    保存代码到文件:logger.py import os import logbook from logbook.more import ColorizedStderrHandler import sm ...

  8. python四舍五入round_四舍五入就用round( )?Python四舍五入的正确打开方式!

    四舍五入就用round( )?Python四舍五入的正确打开方式! 2018-09-22 21:40 阅读数 4 <>round( )函数简介 菜鸟教程中介绍到,round() 函数作用就 ...

  9. 通过机器学习识别“迪士尼在逃公主”,程序员宠女的正确打开方式!

    到了庆祝的时候了!我们刚刚送走了圣诞老人.现在正等待新年的钟声敲响.所以我想到建立一个很酷的东西(至少我的七岁小公主会觉得)同时学一点机器学习.所以我们要做一个什么? 我借用的我女儿所有迪士尼公主人偶 ...

最新文章

  1. 20145236《网络攻防》Exp4 恶意代码分析
  2. html中不透明度怎么写,css如何设置div不透明度?
  3. [Korean]发音
  4. linux 网络错误 nf_conntrack: table full, dropping packet. 路由跟踪表满
  5. 将 Shiro 作为应用的权限基础 五:密码的加密/解密在Spring中的应用
  6. 【蓝桥杯】BASIC-8 回文数(2020-06-08)
  7. linux路由内核实现分析(三)---路由查找过程
  8. 重磅进展,Intel已能够生产量子芯片硅晶圆
  9. wince 错误 Error: failed PB timebomb check
  10. wps 宏 禁用_WPS表格如何解除宏禁用
  11. 计算机word设置信纸,一分钟教你学会用Word做信纸和公章!
  12. c语言实现对矩阵进行行程编码,游程编码行程编码.pptx
  13. 利用modem发传真
  14. #芯片# R8025(RX-8025T)
  15. 华为OD机试题:叠积木(Python 解法)
  16. 关于sammy的初理解
  17. 【渝粤教育】电大中专电商运营实操 (4)作业 题库
  18. 手机vnc远程桌面,手机vnc远程桌面教程加配置说明
  19. 《算法图解》读书笔记—像小说一样有趣的算法入门书
  20. 动态规划:什么是动态规划?

热门文章

  1. 2022年全球及中国商业净水器行业十四五运营方向与盈利前景分析报告
  2. 大数据谋定农业发展路径-丰收节贸易会:全球十大经典案例
  3. git 常用操作命令(Common operation)
  4. Jenkins部署:The username and password you provided are not correct (error 401)
  5. Go语言中的匿名函数和闭包的样子
  6. 第二阶段冲刺10天 第五天
  7. 阿里云服务器1M带宽是多少
  8. poj1789(prim)
  9. 谢欣伦 - OpenDev原创教程 - 蓝牙设备查找类CxBthRemoteDeviceFind
  10. 这个世界并不缺少创意,而是缺少发现