简介

Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。

怎么实现呢?

使用线程的方式

当要执行耗时任务时,直接开启一个新的线程来执行任务,这种方式最为简单快速。

通过 ThreadPoolExecutor 来实现fromflaskimportFlask

fromtimeimportsleep

fromconcurrent.futuresimportThreadPoolExecutor

# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

# 创建线程池执行器

executor =ThreadPoolExecutor(2)

app =Flask(__name__)

@app.route('/jobs')

defrun_jobs():

# 交由线程去执行耗时任务

executor.submit(long_task,'hello',123)

return'long task running.'

# 耗时任务

deflong_task(arg1, arg2):

print("args: %s %s!"% (arg1, arg2))

sleep(5)

print("Task is done!")

if__name__ =='__main__':

app.run()

当要执行一些比较简单的耗时任务时就可以使用这种方式,如发邮件、发短信验证码等。

但这种方式有个问题,就是前端无法得知任务执行状态。

如果想要前端知道,就需要设计一些逻辑,比如将任务执行状态存储到 redis 中,通过唯一的任务 id 进行标识,然后再写一个接口,通过任务 id 去获取任务的状态,然后让前端定时去请求该接口,从而获得任务状态信息。

全部自己实现就显得有些麻烦了,而 Celery 刚好实现了这样的逻辑,来使用一下。

使用 Celery

为了满足前端可以获得任务状态的需求,可以使用 Celery。

Celery 是实时任务处理与调度的分布式任务队列,它常用于 web 异步任务、定时任务等,后面单独写一篇文章描述 Celery 的架构,这里不深入讨论。

现在我想让前端可以通过一个进度条来判断后端任务的执行情况。使用 Celery 就很容易实现,首先通过 pip 安装 Celery 与 redis,之所以要安装 redis,是因为让 Celery 选择 redis 作为「消息代理 / 消息中间件」。pip install celerypip install redis

在 Flask 中使用 Celery 其实很简单,这里先简单的过一下 Flask 中使用 Celery 的整体流程,然后再去实现具体的项目1. 在 Flask 中初始化 CeleryfromflaskimportFlask

fromceleryimportCelery

app =Flask(__name__)

# 配置

# 配置消息代理的路径,如果是在远程服务器上,则配置远程服务器中redis的URL

app.config['CELERY_BROKER_URL'] ='redis://localhost:6379/0'

# 要存储 Celery 任务的状态或运行结果时就必须要配置

app.config['CELERY_RESULT_BACKEND'] ='redis://localhost:6379/0'

# 初始化Celery

celery =Celery(app.name, broker=app.config['CELERY_BROKER_URL'])

# 将Flask中的配置直接传递给Celery

celery.conf.update(app.config)

上述代码中,通过 Celery 类初始化 celery 对象,传入的应用名称与消息代理的连接 URL。2. 通过 celery.task 装饰器装饰耗时任务对应的函数@celery.taskdef long_task(arg1, arg2):    # 耗时任务的逻辑    return result3.Flask 中定义接口通过异步的方式执行耗时任务@app.route('/', methods=['GET', 'POST'])def index():    task = long_task.delay(1, 2)

delay () 方法是 applyasync () 方法的快捷方式,applyasync () 参数更多,可以更加细致的控制耗时任务,比如想要 long_task () 在一分钟后再执行@app.route('/', methods=['GET', 'POST'])def index():    task = long_task.apply_async(args=[1, 2], countdown=60)

delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。

通过这 3 步就可以使用 Celery 了。

接着就具体来实现「让前端可以通过一个进度条来判断后端任务的执行情况」的需求。# bind为True,会传入self给被装饰的方法

@celery.task(bind=True)

deflong_task(self):

verb = ['Starting up','Booting','Repairing','Loading','Checking']

adjective = ['master','radiant','silent','harmonic','fast']

noun = ['solar array','particle reshaper','cosmic ray','orbiter','bit']

message =''

total = random.randint(10,50)

foriinrange(total):

ifnotmessageorrandom.random() <0.25:

# 随机的获取一些信息

message ='{0} {1} {2}...'.format(random.choice(verb),

random.choice(adjective),

random.choice(noun))

# 更新Celery任务状态

self.update_state(state='PROGRESS',

meta={'current': i,'total': total,

'status': message})

time.sleep(1)

# 返回字典

return{'current':100,'total':100,'status':'Task completed!',

'result':42}

上述代码中,celery.task () 装饰器使用了 bind=True 参数,这个参数会让 Celery 将 Celery 本身传入,可以用于记录与更新任务状态。

然后就是一个 for 迭代,迭代的逻辑没什么意义,就是随机从 list 中抽取一些词汇来模拟一些逻辑的运行,为了表示这是耗时逻辑,通过 time.sleep (1) 休眠一秒。

每次获取一次词汇,就通过 self.update_state () 更新 Celery 任务的状态,Celery 包含一些内置状态,如 SUCCESS、STARTED 等等,这里使用了自定义状态「PROGRESS」,除了状态外,还将本次循环的一些信息通过 meta 参数 (元数据) 以字典的形式存储起来。有了这些数据,前端就可以显示进度条了。

定义好耗时方法后,再定义一个 Flask 接口方法来调用该耗时方法@app.route('/longtask', methods=['POST'])def longtask():    # 异步调用    task = long_task.apply_async()    # 返回 202,与Location头    return jsonify({}), 202, {'Location': url_for('taskstatus',                                                  task_id=task.id)}

简单而言,前端通过 POST 请求到 /longtask,让后端开始去执行耗时任务。

返回的状态码为 202,202 通常表示一个请求正在进行中,然后还在返回数据包的包头 (Header) 中添加了 Location 头信息,前端可以通过读取数据包中 Header 中的 Location 的信息来获取任务 id 对应的完整 url。

前端有了任务 id 对应的 url 后,还需要提供一个接口给前端,让前端可以通过任务 id 去获取当前时刻任务的具体状态。@app.route('/status/')def taskstatus(task_id):    task = long_task.AsyncResult(task_id)    if task.state == 'PENDING': # 在等待        response = {            'state': task.state,            'current': 0,            'total': 1,            'status': 'Pending...'        }    elif task.state != 'FAILURE': # 没有失败        response = {            'state': task.state, # 状态            # meta中的数据,通过task.info.get()可以获得            'current': task.info.get('current', 0), # 当前循环进度            'total': task.info.get('total', 1), # 总循环进度            'status': task.info.get('status', '')        }        if 'result' in task.info:             response['result'] = task.info['result']    else:        # 后端执行任务出现了一些问题        response = {            'state': task.state,            'current': 1,            'total': 1,            'status': str(task.info),  # 报错的具体异常        }    return jsonify(response)

为了可以获得任务对象中的信息,使用任务 id 初始化 AsyncResult 类,获得任务对象,然后就可以从任务对象中获得当前任务的信息。

该方法会返回一个 JSON,其中包含了任务状态以及 meta 中指定的信息,前端可以利用这些信息构建一个进度条。

如果任务在 PENDING 状态,表示该任务还没有开始,在这种状态下,任务中是没有什么信息的,这里人为的返回一些数据。如果任务执行失败,就返回 task.info 中包含的异常信息,此外就是正常执行了,正常执行可以通 task.info 获得任务中具体的信息。

这样,后端的逻辑就处理完成了,接着就来实现前端的逻辑,要实现图形进度条,可以直接使用 nanobar.js,简单两句话就可以实现一个进度条,其官网例子如下:varoptions = {

classname:'my-class',

id:'my-id',

// 进度条要出现的位置

target: document.getElementById('myDivId')

};

// 初始化进度条对象

varnanobar =newNanobar( options );

nanobar.go(30);// 30% 进度条

nanobar.go(76);// 76% 进度条

// 100% 进度条,进度条结束

nanobar.go(100);

有了 nanobar.js 就非常简单了。

先定义一个简单的 HTML 界面

Long running task with progress updates

Start Long Calculation

通过 JavaScript 实现对后台的请求// 按钮点击事件

$(function() {

$('#start-bg-job').click(start_long_task);

});

// 请求 longtask 接口

functionstart_long_task() {

// 添加元素在html中

div = $('

0%
...


');

$('#progress').append(div);

// 创建进度条对象

varnanobar =newNanobar({

bg:'#44f',

target: div[0].childNodes[0]

});

// ajax请求longtask

$.ajax({

type:'POST',

url:'/longtask',

// 获得数据,从响应头中获取Location

success:function(data, status, request) {

status_url = request.getResponseHeader('Location');

// 调用 update_progress() 方法更新进度条

update_progress(status_url, nanobar, div[0]);

},

error:function() {

alert('Unexpected error');

}

});

}

// 更新进度条

functionupdate_progress(status_url, nanobar, status_div) {

// getJSON()方法是JQuery内置方法,这里向Location中对应的url发起请求,即请求「/status/」

$.getJSON(status_url,function(data) {

// 计算进度

percent = parseInt(data['current'] *100/ data['total']);

// 更新进度条

nanobar.go(percent);

// 更新文字

$(status_div.childNodes[1]).text(percent +'%');

$(status_div.childNodes[2]).text(data['status']);

if(data['state'] !='PENDING'&& data['state'] !='PROGRESS') {

if('result'in data) {

// 展示结果

$(status_div.childNodes[3]).text('Result: '+ data['result']);

}

else{

// 意料之外的事情发生

$(status_div.childNodes[3]).text('Result: '+ data['state']);

}

}

else{

// 2秒后再次运行

setTimeout(function() {

update_progress(status_url, nanobar, status_div);

},2000);

}

});

}

可以通过注释阅读代码整体逻辑。

至此,需求实现完了,运行一下。

首先运行 Redisredis-server

然后运行 celerycelery worker -A app.celery --loglevel=info

最后运行 Flask 项目python app.py

效果如下:

Flask 异步运行任务的常见方式就介绍完了,因为本人在开发一个用于自动生成字幕的小玩具,其中视频上传以及字幕生成都是耗时任务,所以就单独写一篇文章来介绍一下这部分的内容,后面会将小玩具的代码开源让大家学习一下,先一睹其真容:

以下文章来源于hackpython ,作者ayuliao

python进度条 装饰器_2种方式解决Python执行卡顿问题相关推荐

  1. python交并补符号_三种方式实现 Python 中的集合的交、并、补运算-阿里云开发者社区...

    三种方式实现 Python 中的集合的交.并.补运算 一 背景 集合这个概念在我们高中阶段就有所了解,毕业已多年,我们一起回顾一下几个集合相关的基本概念吧? 集合是指具有某种特定性质的具体的或抽象的对 ...

  2. 自定义加载进度条loading的几种方式

    1. <?xml version="1.0" encoding="UTF-8"?> <animation-list android:onesh ...

  3. 对python装饰器几种常见方式的使用与理解

    1.装饰器的理论: (1)装饰器实际上就是一个函数 (2)有2个特别之处,参数是一个函数.返回值是一个参数 2.装饰器的简单理解: 实际上就是为了给一个程序添加功能,但是该程序已经上线或者已被使用,那 ...

  4. python装饰器类-PYTHON里的装饰器能装饰类吗

    扩展回答 如何理解python里的装饰器 通常可以理解它是一个hook 的回调函数. 或者是理解成python 留给二次开发的一个内置API. 一般是用回调和hook 方式实现的. 如何理解Pytho ...

  5. python高级语法装饰器_Python高级编程——装饰器Decorator超详细讲解上

    Python高级编程--装饰器Decorator超详细讲解(上篇) 送你小心心记得关注我哦!! 进入正文 全文摘要 装饰器decorator,是python语言的重要特性,我们平时都会遇到,无论是面向 ...

  6. python装饰器由浅入深_详解Python装饰器由浅入深

    装饰器的功能在很多语言中都有,名字也不尽相同,其实它体现的是一种设计模式,强调的是开放封闭原则,更多的用于后期功能升级而不是编写新的代码.装饰器不光能装饰函数,也能装饰其他的对象,比如类,但通常,我们 ...

  7. python 生成器装饰器_七.python迭代器生成器装饰器

    1.迭代器 1.1 什么是可迭代对象(Iterable)? 定义:可以直接作用于for循环的对象统称为可迭代对象,即Iterable. 可迭代对象包括: 1.集合数据类型:如list.tuple.di ...

  8. python生成器和装饰器_python三大法器:生成器、装饰器、迭代器

    迭代器 迭代的概念 使用for循环遍历取值的过程叫做迭代,比如:使用for循环遍历列表获取值的过程 使用for循环遍历取值的对象叫做可迭代对象, 比如:列表.元组.字典.集合.range.字符串 判断 ...

  9. python类装饰器详解-python 中的装饰器详解

    装饰器 闭包 闭包简单的来说就是一个函数,在该函数内部再定义一个函数,并且这个内部函数用到了外部变量(即是外部函数的参数),最终这个函数返回内部函数的引用,这就是闭包. def decorator(p ...

最新文章

  1. YOLO v1到YOLO v4(上)
  2. 供配电负荷计算方法详解
  3. 64MQQ2440烧写MINI2440光盘中的内容同样可以跑起来
  4. c语言文学研究助手题目,各位达人,给小弟一个文学研究助手的c程序啊!急啊!谢谢大家啦!...
  5. 算法题存档20200505
  6. 关于表数据行统计的问题和相关误区
  7. 10.15 sigstjmp以及siglongjmp函数
  8. sql server 触发器
  9. 拓端tecdat|R语言马尔可夫MCMC中的Metropolis Hastings,MH算法抽样(采样)法可视化实例
  10. laptop3换硬盘_实战laptop3拆机硬盘扩容
  11. 大学本科计算机专业的课程
  12. linux下设置定时器,linux定时器设置.
  13. SpringBoot接入微信公众号模板消息推送
  14. win7删除文件提示“您需要权限才能执行此操作”的3种解决办法
  15. python爬取豆瓣电影评论_python 爬取豆瓣电影评论,并进行词云展示及出现的问题解决办法...
  16. 免费WebCamps-北美,亚洲和欧洲-*立即注册*
  17. 事件营销有哪些策略?
  18. 运行程序报错:请检查是否存在数组、列表等越界非法访问,内存非法访问等情况
  19. QObject::moveToThread: Current thread (0x5651ebdaa180) is not the object’s thread (0x5651eba7e2a0).
  20. Python程序流程控制

热门文章

  1. linux下代理上网设置
  2. 为什么Rust这么受欢迎?
  3. 百度的Android招聘面试题
  4. 中级Shader教程10 shader建模工具--SDF
  5. C语言:Hello Word(梦开始的地方)
  6. 如何找回丢失的硬盘分区表?
  7. c语言中占位符,Java C# C语言中的占位符
  8. 【2022年 华为上海无线部门实习 逻辑岗位 面试复盘】
  9. 计算机错误代码字母,电脑开机蓝屏英文字母的解决方法
  10. Python 基础 2-1 列表入门