Celery结合flask完成异步任务与定时任务

Celery 常用于 web 异步任务、定时任务等。
使用 redis 作为 Celery的「消息代理 / 消息中间件」。
这里通过Flask-Mail使用qq邮箱延时发送邮件作为示例

pip install celery
pip install redis
pip install Flask-Mail

1、使用flask发送邮件

使用 Flask-Mail 发送邮件需要进行一下配置,其中QQ邮箱授权码的获取方式如下所述:

app = Flask(__name__)
app.config['SECRET_KEY'] = 'top-secret!'# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.qq.com'
app.config['MAIL_PORT'] = 465
# 启用/禁用传输安全层加密
app.config['MAIL_USE_TLS'] = False
# 启用/禁用安全套接字层加密
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = '我的QQ邮箱@qq.com'
app.config['MAIL_PASSWORD'] = '我的QQ邮箱授权码'
app.config['MAIL_DEFAULT_SENDER'] = '我的QQ邮箱@qq.com'# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'# Initialize extensions
mail = Mail(app)@app.route("/send_mail")
def index11():# sender:发件人    recipients:收件人msg = Message('Hello', sender = app.config['MAIL_DEFAULT_SENDER'], recipients = ['目标邮箱@qq.com'])msg.body = "来自python--flask框架发送的邮件内容~"mail.send(msg)#发送Message类对象的内容return "发送成功"

点进qq邮箱,在设置里面点击账号,向下滚动开启pop3服务获取授权码。

2、延时发送邮件

定义celery任务,与flask基本一样 只是前面多了修饰符@celery.task

@celery.task
def send_async_email(email_data):"""Background task to send an email with Flask-Mail."""msg = Message(email_data['subject'],sender=app.config['MAIL_DEFAULT_SENDER'],recipients=[email_data['to']])msg.body = email_data['body']with app.app_context():mail.send(msg)     
@app.route('/', methods=['GET', 'POST'])
def index():if request.method == 'GET':return render_template('index.html', email=session.get('email', ''))email = request.form['email']session['email'] = email# send the emailemail_data = {'subject': 'Hello from Flask','to': email,'body': '来自python--flask框架延时发送的邮件内容~'}if request.form['submit'] == 'Send':# send right awaysend_async_email.delay(email_data)print('here!--')flash('Sending email to {0}'.format(email))else:# send in one minutesend_async_email.apply_async(args=[email_data], countdown=60)flash('An email will be sent to {0} in one minute'.format(email))return redirect(url_for('index'))

3、生成带有状态信息进度条的异步任务


# bind为True,会传入self给被装饰的方法
@celery.task(bind=True)
def long_task(self):"""带有进度条以及状态报告的 异步任务"""verb = ['正在', '准备', '目前', '处于', '进行']adjective = ['全速', '努力', '默默地', '认真', '快速']noun = ['打开', '启动', '修复', '加载', '检查']message = ''total = random.randint(10, 50)  # 随机取10~50的一个随机数for i in range(total):selectnow = random.random()print(selectnow)# 拼接上面三个lsit  随机的生成一些状态描述if not message or selectnow < 0.25:message = '{0} {1} {2}...'.format(random.choice(verb),random.choice(adjective),random.choice(noun))# 更新Celery任务状态self.update_state(state='PROGRESS',meta={'current': i, 'total': total,'status': message})time.sleep(1)# 返回字典return {'current': 100, 'total': 100, 'status': '任务完成!','result': 42}@app.route('/longtask', methods=['POST'])
def longtask():task = long_task.apply_async()return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)}@app.route('/status/<task_id>')
def taskstatus(task_id):task = long_task.AsyncResult(task_id)# print(task.state)if task.state == 'PENDING':# PENDING的时候  如果一直PENDING可能是celery没开启response = {'state': task.state,'current': 0,'total': 1,'status': 'Pending...'}elif task.state != 'FAILURE':# 加载的时候response = {'state': task.state,'current': task.info.get('current', 0),'total': task.info.get('total', 1),'status': task.info.get('status', '')}if 'result' in task.info:response['result'] = task.info['result']else:# 报错时候的输出response = {'state': task.state,'current': 1,'total': 1,'status': str(task.info),  # this is the exception raised}return jsonify(response)

4、完整代码

文件结构

--- current--- templates--- index.html--- asyn_001.py
这个是asyn_001.py
import os
import random
import time
from flask import Flask, request, render_template, session, flash, redirect, \url_for, jsonify
from flask_mail import Mail, Message
from celery import Celeryapp = Flask(__name__)
app.config['SECRET_KEY'] = 'top-secret!'# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.qq.com'
app.config['MAIL_PORT'] = 465
# 启用/禁用传输安全层加密
app.config['MAIL_USE_TLS'] = False
# 启用/禁用安全套接字层加密
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = '我的QQ邮箱@qq.com'
app.config['MAIL_PASSWORD'] = '我的QQ邮箱授权码'
app.config['MAIL_DEFAULT_SENDER'] = '我的QQ邮箱@qq.com'# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'# Initialize extensions
mail = Mail(app)@app.route("/send_mail")
def index11():# sender:发件人    recipients:收件人msg = Message('Hello', sender = app.config['MAIL_DEFAULT_SENDER'], recipients = ['目标邮箱@qq.com'])msg.body = "来自python--flask框架发送的邮件内容~"mail.send(msg)#发送Message类对象的内容return "发送成功"# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)@celery.task
def send_async_email(email_data):"""Background task to send an email with Flask-Mail."""msg = Message(email_data['subject'],sender=app.config['MAIL_DEFAULT_SENDER'],recipients=[email_data['to']])msg.body = email_data['body']with app.app_context():mail.send(msg)@app.route('/', methods=['GET', 'POST'])
def index():if request.method == 'GET':return render_template('index.html', email=session.get('email', ''))email = request.form['email']session['email'] = email# send the emailemail_data = {'subject': 'Hello from Flask','to': email,'body': '来自python--flask框架延时发送的邮件内容~'}if request.form['submit'] == 'Send':# send right awaysend_async_email.delay(email_data)print('here!--')flash('Sending email to {0}'.format(email))else:# send in one minutesend_async_email.apply_async(args=[email_data], countdown=60)flash('An email will be sent to {0} in one minute'.format(email))return redirect(url_for('index'))# bind为True,会传入self给被装饰的方法
@celery.task(bind=True)
def long_task(self):"""带有进度条以及状态报告的 异步任务"""verb = ['正在', '准备', '目前', '处于', '进行']adjective = ['全速', '努力', '默默地', '认真', '快速']noun = ['打开', '启动', '修复', '加载', '检查']message = ''total = random.randint(10, 50)  # 随机取10~50的一个随机数for i in range(total):selectnow = random.random()print(selectnow)# 拼接上面三个lsit  随机的生成一些状态描述if not message or selectnow < 0.25:message = '{0} {1} {2}...'.format(random.choice(verb),random.choice(adjective),random.choice(noun))# 更新Celery任务状态self.update_state(state='PROGRESS',meta={'current': i, 'total': total,'status': message})time.sleep(1)# 返回字典return {'current': 100, 'total': 100, 'status': '任务完成!','result': 42}@app.route('/longtask', methods=['POST'])
def longtask():task = long_task.apply_async()return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)}@app.route('/status/<task_id>')
def taskstatus(task_id):task = long_task.AsyncResult(task_id)# print(task.state)if task.state == 'PENDING':# PENDING的时候  如果一直PENDING可能是celery没开启response = {'state': task.state,'current': 0,'total': 1,'status': 'Pending...'}elif task.state != 'FAILURE':# 加载的时候response = {'state': task.state,'current': task.info.get('current', 0),'total': task.info.get('total', 1),'status': task.info.get('status', '')}if 'result' in task.info:response['result'] = task.info['result']else:# 报错时候的输出response = {'state': task.state,'current': 1,'total': 1,'status': str(task.info),  # this is the exception raised}return jsonify(response)if __name__ == '__main__':app.run(debug=True)
这个是index.html
<html><head><title>Flask + Celery 示例</title><style>.progress {width: 100%;text-align: center;}</style></head><body><h1>Flask + Celery 示例</h1><h2>Example 1: 发送异步邮件</h2>{% for message in get_flashed_messages() %}<p style="color: red;">{{ message }}</p>{% endfor %}<form method="POST"><p>Send test email to: <input type="text" name="email" value="{{ email }}"></p><input type="submit" name="submit" value="Send"><input type="submit" name="submit" value="Send in 1 minute"></form><hr><h2>Example 2: 生成进度条以及状态报告</h2><!--<button οnclick="start_long_task();">Start Long Calculation</button><br><br>--><button id="start-bg-job">Start Long Calculation</button><br><br><div id="progress"></div><script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script><script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script><script>function start_long_task() {// add task status elementsdiv = $('<div class="progress"><div></div><div>0%</div><div>...</div><div>&nbsp;</div></div><hr>');$('#progress').append(div);// create a progress barvar nanobar = new Nanobar({bg: '#44f',target: div[0].childNodes[0]});// send ajax POST request to start background job$.ajax({type: 'POST',url: '/longtask',success: function(data, status, request) {status_url = request.getResponseHeader('Location');console.log("status_url", status_url,"nanobar", nanobar, "div[0]", div[0])console.log("data", data)update_progress(status_url, nanobar, div[0]);},error: function() {alert('Unexpected error');}});}function update_progress(status_url, nanobar, status_div) {// send GET request to status URL$.getJSON(status_url, function(data) {// update UIpercent = parseInt(data['current'] * 100 / data['total']);nanobar.go(percent);$(status_div.childNodes[1]).text(percent + '%');$(status_div.childNodes[2]).text(data['status']);if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {if ('result' in data) {// show result$(status_div.childNodes[3]).text('Result: ' + data['result']);}else {// something unexpected happened$(status_div.childNodes[3]).text('Result: ' + data['state']);}}else {// rerun in 2 secondssetTimeout(function() {update_progress(status_url, nanobar, status_div);}, 2000);}});}$(function() {$('#start-bg-job').click(start_long_task);});</script></body>
</html>

5、启动任务

终端cd到current文件夹所在目录
在这里插入图片描述
启动asyn_001程序,即可观察到异步任务的执行。

参考1 Celery实现异步任务和定时任务的简单示例
参考2 Using Celery with Flask

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145355.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【无标题】ICCV 2023 | CAPEAM:基于上下文感知规划和环境感知记忆机制构建具身智能体

文章链接&#xff1a; https://arxiv.org/abs/2308.07241 2023年&#xff0c;大型语言模型&#xff08;LLMs&#xff09;以及AI Agents的蓬勃发展为整个机器智能领域带来了全新的发展机遇。一直以来&#xff0c;研究者们对具身智能&#xff08;Embodied Artificial Intelligenc…

通过java向jar写入新文件

文章目录 原始需求分析实施步骤引入依赖核心编码运行效果 原始需求 有网友提问&#xff1a; 我想在程序中动态地向同一个jar包中添加文件&#xff0c;比如&#xff0c;我的可执行jar包是test.jar,我要在它运行时生成一些xml文件并将这些文件添加到test.jar中,请问如何实现&…

【分布式计算】三、虚拟化 Virtualization

1.什么是虚拟化 1.1.非虚拟化 我们首先来认识什么是非虚拟化   1.一台机器、一个操作系统、几个应用程序   2.应用程序可能会相互影响。   3.机器利用率较低&#xff0c;正常情况下低于25%。 关于X86平台&#xff1a; 1.服务器基础设施利用率低&#xff08;10-18%&#…

Linux驱动开发笔记

疑问 file_operation中每个操作函数的形参中inode的作用 file_operation定义了Linux内核驱动的所有的操作函数&#xff0c;每个操作函数与一个系统调用对应&#xff0c;对于字符设备来说&#xff0c;常用的函数有&#xff1a;llseek、read、write、pool等等&#xff0c;这些操…

阿里云七代云服务器实例、倚天云服务器及通用算力型和经济型实例规格介绍

在目前阿里云的云服务器产品中&#xff0c;既有五代六代实例规格&#xff0c;也有七代和八代倚天云服务器&#xff0c;同时还有通用算力型及经济型这些刚推出不久的新品云服务器实例&#xff0c;其中第五代实例规格目前不在是主推的实例规格了&#xff0c;现在主售的实例规格是…

【数据结构】堆,堆的实现,堆排序,TOP-K问题

大家好&#xff01;今天我们来学习数据结构中的堆及其应用 目录 1. 堆的概念及结构 2. 堆的实现 2.1 初始化堆 2.2 销毁堆 2.3 打印堆 2.4 交换函数 2.5 堆的向上调整 2.6 堆的向下调整 2.7 堆的插入 2.8 堆的删除 2.9 取堆顶的数据 2.10 堆的数据个数 2.11 堆的判…

内存函数的介绍和模拟实现

目录 1.memcpy的使用(内存拷贝) 2.memcpy的实现 3.memmove的使用&#xff08;内存拷贝&#xff09; 4.memmove的实现 5.memset 的使用&#xff08;内存设置&#xff09; 6.memcmp的使用&#xff08;内存比较&#xff09; 1.memcpy的使用(内存拷贝) void * memcpy ( void * …

整型提升——(巩固提高——字符截取oneNote笔记详解)

文章目录 前言一、整型提升是什么&#xff1f;二、详细图解1.图解展示 总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 整型提升是数据存储的重要题型&#xff0c;也是计算机组成原理的核心知识点。学习c语言进阶的时候,了解内存中数据怎么存&#…

孤举者难起,众行者易趋,openGauss 5.1.0版本正式发布!

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

华为云云耀云服务器L实例评测|云耀云服务器L实例搭建个人镜像站

华为云云耀云服务器L实例评测&#xff5c;云耀云服务器L实例搭建个人镜像站 一、云耀云服务器L实例介绍1.1 云耀云服务器L实例简介1.2 云耀云服务器L实例特点 二、Apache介绍2.1 Apache简介2.2 Apache特点 三、本次实践介绍3.1 本次实践简介3.2 本次环境规划 四、远程登录华为云…

SpringCloud Alibaba 入门到精通 - Sentinel

SpringCloud Alibaba 入门到精通 - Sentinel 一、基础结构搭建1.父工程创建2.子工程创建 二、Sentinel的整合SpringCloud1.微服务可能存在的问题2.SpringCloud集成Sentinel搭建Dashboard3 SpringCloud 整合Sentinel 三、服务降级1 服务降级-Sentinel2 Sentinel 整合 OpenFeign3…

【深度学习实验】卷积神经网络(三):自定义二维卷积层:步长、填充、输入输出通道

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. 步长、填充 a. 二维互相关运算&#xff08;corr2d&#xff09; b. 二维卷积层类&#xff08;Conv2D&#xff09; c. 模型测试 d. 代码整合 2. 输入输出通道 a…

Arcgis克里金插值报错:ERROR 999999: 执行函数时出错。 表名无效。 空间参考不存在。 ERROR 010429: GRID IO 中存在错误

ERROR 999999: 执行函数时出错。 问题描述 表名无效。 空间参考不存在。 ERROR 010429: GRID IO 中存在错误: WindowSetLyr: Window cell size does not match layer cell size. name: c:\users\lenovo\appdata\local\temp\arc2f89\t_t164, adepth: 32, type: 1, iomode: 6, …

智能合约漏洞,Dyna 事件分析

智能合约漏洞&#xff0c;Dyna 事件分析 1. 漏洞简介 https://twitter.com/BlockSecTeam/status/1628319536117153794 https://twitter.com/BeosinAlert/status/1628301635834486784 2. 相关地址或交易 攻击交易 1&#xff1a; https://bscscan.com/tx/0x7fa89d869fd1b89e…

算法通过村第十一关-位运算|青铜笔记|初始位运算

文章目录 前言1. 数字在计算中的表示拓展&#xff1a;为什么要有原码、反码和补码? 2. 位运算规则2.1 与、或、异或和取反2.2 位移运算2.3 位移运算和乘除的关系2.4 位运算的常用技巧 总结 前言 提示&#xff1a;我的父亲从我出生起便认识我&#xff0c;可他对我的了解却那么少…

西北主要河流水系(绿洲)流域(山区)及高程分类数据集(一)

最近收集整理的了西北地区主要河流水系&#xff08;绿洲&#xff09;流域&#xff08;山区&#xff09;及高程分类数据&#xff0c;&#xff0c;本次主要是新疆的河流水系&#xff08;绿洲&#xff09;流域&#xff08;山区&#xff09;及高程分类数据&#xff08;矢量&#xf…

ThemeForest – Canvas 7.2.0 – 多用途 HTML5 模板

ThemeForest 上的 HTML 网站模板受到全球数百万客户的喜爱。与包含网站所有页面并允许您在 WP 仪表板中自定义字体和样式的 WordPress 主题不同&#xff0c;这些设计模板是用 HTML 构建的。您可以在 HTML 编辑器中编辑模板&#xff0c;但不能在 WordPress 上编辑模板&#xff0…

机器人过程自动化(RPA)入门 7. 处理用户事件和助手机器人

在UiPath中,有两种类型的Robot用于自动化任何流程。一个是后台机器人,它在后台工作。它独立工作,这意味着它不需要用户的输入或任何用户交互。另一个是前台机器人,也被称为助理机器人。 本章介绍前台机器人。在这里,我们将了解自动化过程中通过简单按键、单击鼠标等触发事…

【Vue】数据监视输入绑定

hello&#xff0c;我是小索奇&#xff0c;精心制作的Vue系列持续发放&#xff0c;涵盖大量的经验和示例&#xff0c;如有需要&#xff0c;可以收藏哈 本章给大家讲解的是数据监视&#xff0c;前面的章节已经更新完毕&#xff0c;后面的章节持续输出&#xff0c;有任何问题都可以…

Pikachu-xxe (xml外部实体注入漏洞)过关笔记

Pikachu-xxe过关笔记 有回显探测是否有回显file:///协议查看本地系统文件php://协议查看php源代码&#xff08;无法查看当前网页代码&#xff0c;只能看别的&#xff09;http://协议爆破开放端口&#xff08;两者的加载时间不同&#xff09; 无回显第一步第二步第三步 运行结果…