通过企业微信,向AD域过期用户发送更改密码提醒
目录
·先决条件
·语言与环境
·基本思路
·代码说明
1、连接LDAP查询密码过期的用户
2、获取企业微信中应用的Access_Token
3、通过邮箱获取用户在企业微信中的ID
4、通过企业微信的自建应用向获取的用户ID发送通知信息
5、主函数
·代码展示
·一些说明
·先决条件
·拥有一个AD域,并且要有一个AD域中有管理权限的账号和密码,若想要创建一个测试环境可以参考该博主的文章:
windows server 2016配置AD域_芒果黑的博客-CSDN博客_server2016ad域https://blog.csdn.net/a137748099/article/details/115016923windows server2016创建AD域账户并登录_芒果黑的博客-CSDN博客_windows域账户登录https://blog.csdn.net/a137748099/article/details/115017435#:~:text=1%EF%BC%89%E5%8F%B3%E9%94%AEAD%E6%9C%8D%E5%8A%A1%E5%99%A8%EF%BC%8C%E9%80%89%E6%8B%A9Active%20Directory%E7%AE%A1%E7%90%86%E4%B8%AD%E5%BF%83,2%EF%BC%89%E5%8F%B3%E9%94%AEAD%E5%90%8D%E7%A7%B0%EF%BC%8C%E9%80%89%E6%8B%A9%E6%96%B0%E5%BB%BA%E2%86%92%E7%BB%84%E7%BB%87%E5%8D%95%E4%BD%8D%203%EF%BC%89%E9%80%89%E6%8B%A9%E7%A0%94%E5%8F%91%E9%83%A8%EF%BC%8C%E5%8F%B3%E9%94%AE%E6%96%B0%E5%BB%BA%E2%86%92%E7%94%A8%E6%88%B7
·拥有企业微信的后台管理权限的账号,并且创建一个自建应用,本文是通过企业我微信的自建应用对相应的企业微信用户进行通知。创建自建应用时请注意以下几点:
1、注意自建应用的用户可见范围,只有在自建应用的可见范围之内的用户才能够收到消息
2、有些应用有一个可信ip的设置功能,有些没有,我估计是企业微信改版了,老的应用没有,新创建的应用会有,如果有的话并且代码执行时报出ip不可信的错误,可以尝试设置一下应用中的可信ip
3、Secret和AgentID是在自建应用中获取,点进应用就会有AgentID,而Secret需要点击查看,他会以信息的形式发送到你的企业微信中
4、企业ID在“我的企业”的最下面
·语言与环境
·python3(所需的模块:json、requests、ldap3、datetime)
·Windows Server 2016
·基本思路
1、连接AD域的LDAP,对域中固定范围的所有用户进行递归查询,并获取范围内所有用户的相应信息(由于本公司情况是AD域中的邮箱与企业微信中的个人邮箱是相同的,因此此处获取的信息为用户邮箱、最后一次更改密码的时间)
2、筛选密码即将过期的用户,将获取的信息进行处理成字典数据类型
3、使用企业微信提供的通过邮箱查找用户ID的API,通过邮箱查找企业微信中该用户对应的企业微信ID
4、使用企业微信提供的通过用户ID向用户发送信息的API,向用户发送响应的信息
·代码说明
1、连接LDAP查询密码过期的用户
class Con_LDAP():def __init__(self, domain, ip, admin='administrator', pwd=None):#domain:域名,格式为:xxx.xxx.xxx#ip:AD域控制器所在ip地址,格式为:xxx.xxx.xxx.xxx#admin:AD域中管理员账号#pwd:AD域名中管理员密码self.domain = domainself.DC = ','.join(['DC=' + dc for dc in domain.split('.')]) # 将‘CSDN.com’字符串的形式转换为‘DC=CSDN,DC=com’的形式self.pre = domain.split('.')[0].upper() # 用户登录的前缀,即域名的前一部分,如:域名为CSDN.com,用户登录的前缀即为CSDNself.ip = ipself.admin = adminself.pwd = pwdself.server = Server(self.ip, get_info=ALL)self.conn = Connection(self.server, user=self.pre + '\\' + self.admin,password=self.pwd, auto_bind=True,authentication=NTLM) #连接AD域服务器#查询组织下的用户def search(self, org):#org: 组织,格式为:aaa.bbb 即bbb组织下的aaa组织,不包含域地址att_list = ['pwdLastSet','mail'] #所需查询的内容org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC #将组织名处理成标准形式,如:将”CSDN技术部.Users.CSDN“处理成”OU=CSDN技术部,OU=Users,OU=CSDN“,再加上用户登录前缀#查询res = self.conn.search(search_base=org_base, #查询的数据内容search_filter='(objectclass=user)', # 查询数据的类型attributes=att_list, # 查询数据的哪些属性paged_size=1000) # 一次查询多少数据user_dict = dict()if res:for user in self.conn.entries: #遍历查询结果last_time = str(user['pwdLastSet']) #将最后一次更改密码时间的数据类型转换为字符串型last_time = last_time[0:last_time.find('.')] #去掉最后一次更改密码时间后的无关字符串last_time = last_time[0:last_time.find('+')] #去掉最后一次更改密码时间后的无关字符串last_time = datetime.datetime.strptime(last_time, '%Y-%m-%d %H:%M:%S') #更改数据类型为时间if (datetime.datetime.now() - last_time).days >= 83 and (datetime.datetime.now() - last_time).days <= 91: #筛选7天内密码即将过期以及密码已经过期一天的用户user_dict[str(user['mail'])] = last_time #将这些用户放入字典中(邮箱为key,上一次更改密码的时间为value)else:print('查询失败: ', self.conn.result['description'])return user_dict
2、获取企业微信中应用的Access_Token
若要使用企业微信的API对应用进行一些操作首先需要获取企业微信的Access_Token,这也相当于你的企业微信给你了一个许可凭证,允许你对企业中的这个应用进行信息的获取或者某些操作。
此处所需要的信息为:
·CORPID:企业ID
·CORPSECRET:创建的应用的Secret
这些信息需要自行在企业微信管理后台中寻找
CORPID = ‘xxxxxxxxxxxxxxxxxxx’ #企业ID
CORPSECRET = ‘xxxxxxxxxxxxxxxx’ #创建的应用的Secret
headers = {"Content-Type": "text/plain"} #输出格式#获取企业微信中相应应用的Access_Token
def getAccessToken():try:request = requests.get(url="https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid="+CORPID+"&corpsecret="+CORPSECRET, headers=headers) #向企业微信api发送请求获取Access_Token凭证request = json.loads(request.text) #将获取的结果转为字典数据类型access_token = Falseif request['errcode'] == 0:#获取返回信息的报错数据,并判断返回数据是否为0access_token = request['access_token'] #若报错信息为0,即请求未发生错误,获取返回信息中的Access_Tokenexcept Exception as err:raise errelse:return access_token #返回Access_Token
3、通过邮箱获取用户在企业微信中的ID
#通过邮箱获取该用户在企业微信中的ID
def getuserID(access_token, email):try:data = {"email": email, #指定的用户邮箱"email_type": 2 #查询的邮箱类型:1为企业邮箱,2为个人邮箱}request = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/user/get_userid_by_email?access_token=" + access_token,headers=headers, json=data) #向企业微信发送请求,通过邮箱获取用户的企业IDrequest = json.loads(request.text)userid = Falseif request['errcode'] == 0:userid = request['userid']except Exception as err:raise errelse:return userid
4、通过企业微信的自建应用向获取的用户ID发送通知信息
AGENTID = xxxxxxx #应用ID (此类型为int型)#使用Access_Token,通过企业中的应用向特定用户发送信息
def postMessage(access_token, agentid, email, PasswordLastSet):try:userid = getuserID(access_token, email) #通过用户邮箱获取用户在企业微信中的IDtime = 90-(datetime.datetime.now()-PasswordLastSet).days #计算还有几天过期if time >= 0: #若用户还没有过期data = {"touser": userid, #所要发送的用户"msgtype": "text", #发送内容的格式"agentid": agentid, #使用哪一个应用发送"text": { #发送内容"content": "您的域账户密码(邮件、Jira、Git、windows开机密码)即将在"+str(time)+"天后过期, "+(PasswordLastSet+datetime.timedelta(days=+90)).strftime('%Y/%m/%d %H:%M:%S')+"之后您将无法使用该账户登陆相关系统,请您尽快更改密码。\n"},"safe": 0, #表示发送内容是否保密(0为可对外分享,1为不可分享,切内容显示水印)"enable_id_trans": 0, #是否开启id转译"enable_duplicate_check": 1, #是否开启重复消息检查"duplicate_check_interval": 1 #重复消息检查的时间间隔,默认1800s,最大不超过4h(单位为s)}else: #若用户已过期data = {"touser": userid,"msgtype": "text","agentid": agentid,"text": {"content": "您的域账户密码(邮件、Jira、Git、windows开机密码)已过期""之后您将无法使用该账户登陆相关系统,请您尽快更改密码。\n"},"safe": 0,"enable_id_trans": 0,"enable_duplicate_check": 1,"duplicate_check_interval": 1}request = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="+access_token, headers=headers, json=data) #向企业微信发送请求,将信息发送给相应的用户except Exception as err:raise errelse:return request.text
5、主函数
def main():try:adladp = Adoper(domain='CSDN.com', ip='xxx.xxx.xxx.xxx', pwd='xxxxxx', admin='xxxxxx') #创建一个Adoper对象,并连接AD域服务器user_dict = adladp.search('Users.CSDN') #提供查询条件if user_dict: #如果user_dict不为空access_token = getAccessToken() #通过方法获取Access_Tokenfor user in user_dict.keys(): #遍历字典中的keytry:passpostMessage(access_token, AGENTID, user, user_dict[user]) #发送消息except Exception as err:raise errelse:print('No user')except Exception as err:raise err
·代码展示
import json
import requests
from ldap3 import Server, Connection, ALL, NTLM
import datetimeAGENTID = xxxxxxx #应用ID (此类型为int型)
CORPID = ‘xxxxxxxxxxxxxxxxxxx’ #企业ID
CORPSECRET = ‘xxxxxxxxxxxxxxxx’ #创建的应用的Secret
headers = {"Content-Type": "text/plain"} #输出格式class Con_LDAP():def __init__(self, domain, ip, admin='administrator', pwd=None):#domain:域名,格式为:xxx.xxx.xxx#ip:AD域控制器所在ip地址,格式为:xxx.xxx.xxx.xxx#admin:AD域中管理员账号#pwd:AD域名中管理员密码self.domain = domainself.DC = ','.join(['DC=' + dc for dc in domain.split('.')]) # 将‘CSDN.com’字符串的形式转换为‘DC=CSDN,DC=com’的形式self.pre = domain.split('.')[0].upper() # 用户登录的前缀,即域名的前一部分,如:域名为CSDN.com,用户登录的前缀即为CSDNself.ip = ipself.admin = adminself.pwd = pwdself.server = Server(self.ip, get_info=ALL)self.conn = Connection(self.server, user=self.pre + '\\' + self.admin,password=self.pwd, auto_bind=True,authentication=NTLM) #连接AD域服务器#查询组织下的用户def search(self, org):#org: 组织,格式为:aaa.bbb 即bbb组织下的aaa组织,不包含域地址att_list = ['pwdLastSet','mail'] #所需查询的内容org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC #将组织名处理成标准形式,如:将”CSDN技术部.Users.CSDN“处理成”OU=CSDN技术部,OU=Users,OU=CSDN“,再加上用户登录前缀#查询res = self.conn.search(search_base=org_base, #查询的数据内容search_filter='(objectclass=user)', # 查询数据的类型attributes=att_list, # 查询数据的哪些属性paged_size=1000) # 一次查询多少数据user_dict = dict()if res:for user in self.conn.entries: #遍历查询结果last_time = str(user['pwdLastSet']) #将最后一次更改密码时间的数据类型转换为字符串型last_time = last_time[0:last_time.find('.')] #去掉最后一次更改密码时间后的无关字符串last_time = last_time[0:last_time.find('+')] #去掉最后一次更改密码时间后的无关字符串last_time = datetime.datetime.strptime(last_time, '%Y-%m-%d %H:%M:%S') #更改数据类型为时间if (datetime.datetime.now() - last_time).days >= 83 and (datetime.datetime.now() - last_time).days <= 91: #筛选7天内密码即将过期以及密码已经过期一天的用户user_dict[str(user['mail'])] = last_time #将这些用户放入字典中(邮箱为key,上一次更改密码的时间为value)else:print('查询失败: ', self.conn.result['description'])return user_dict#获取企业微信中相应应用的Access_Token
def getAccessToken():try:request = requests.get(url="https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid="+CORPID+"&corpsecret="+CORPSECRET, headers=headers) #向企业微信api发送请求获取Access_Token凭证request = json.loads(request.text) #将获取的结果转为字典数据类型access_token = Falseif request['errcode'] == 0:#获取返回信息的报错数据,并判断返回数据是否为0access_token = request['access_token'] #若报错信息为0,即请求未发生错误,获取返回信息中的Access_Tokenexcept Exception as err:raise errelse:return access_token #返回Access_Token#通过邮箱获取该用户在企业微信中的ID
def getuserID(access_token, email):try:data = {"email": email, #指定的用户邮箱"email_type": 2 #查询的邮箱类型:1为企业邮箱,2为个人邮箱}request = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/user/get_userid_by_email?access_token=" + access_token,headers=headers, json=data) #向企业微信发送请求,通过邮箱获取用户的企业IDrequest = json.loads(request.text)userid = Falseif request['errcode'] == 0:userid = request['userid']except Exception as err:raise errelse:return userid#使用Access_Token,通过企业中的应用向特定用户发送信息
def postMessage(access_token, agentid, email, PasswordLastSet):try:userid = getuserID(access_token, email) #通过用户邮箱获取用户在企业微信中的IDtime = 90-(datetime.datetime.now()-PasswordLastSet).days #计算还有几天过期if time >= 0: #若用户还没有过期data = {"touser": userid, #所要发送的用户"msgtype": "text", #发送内容的格式"agentid": agentid, #使用哪一个应用发送"text": { #发送内容"content": "您的域账户密码(邮件、Jira、Git、windows开机密码)即将在"+str(time)+"天后过期, "+(PasswordLastSet+datetime.timedelta(days=+90)).strftime('%Y/%m/%d %H:%M:%S')+"之后您将无法使用该账户登陆相关系统,请您尽快更改密码。\n"},"safe": 0, #表示发送内容是否保密(0为可对外分享,1为不可分享,切内容显示水印)"enable_id_trans": 0, #是否开启id转译"enable_duplicate_check": 1, #是否开启重复消息检查"duplicate_check_interval": 1 #重复消息检查的时间间隔,默认1800s,最大不超过4h(单位为s)}else: #若用户已过期data = {"touser": userid,"msgtype": "text","agentid": agentid,"text": {"content": "您的域账户密码(邮件、Jira、Git、windows开机密码)已过期""之后您将无法使用该账户登陆相关系统,请您尽快更改密码。\n"},"safe": 0,"enable_id_trans": 0,"enable_duplicate_check": 1,"duplicate_check_interval": 1}request = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="+access_token, headers=headers, json=data) #向企业微信发送请求,将信息发送给相应的用户except Exception as err:raise errelse:return request.textdef main():try:adladp = Adoper(domain='CSDN.com', ip='xxx.xxx.xxx.xxx', pwd='xxxxxx', admin='xxxxxx') #创建一个Adoper对象,并连接AD域服务器user_dict = adladp.search('Users.CSDN') #提供查询条件if user_dict: #如果user_dict不为空access_token = getAccessToken() #通过方法获取Access_Tokenfor user in user_dict.keys(): #遍历字典中的keytry:passpostMessage(access_token, AGENTID, user, user_dict[user]) #发送消息except Exception as err:raise errelse:print('No user')except Exception as err:raise errif __name__ == '__main__':main()
·一些说明
1、有想要了解企业微信设置的API的朋友可以看一下企业微信开发者中心,这里上一下链接
企业微信开发者中心https://developer.work.weixin.qq.com/ 本文所用到的是以下三个API
获取access_token - 文档 - 企业微信开发者中心 (qq.com)https://developer.work.weixin.qq.com/document/path/91039邮箱获取userid - 文档 - 企业微信开发者中心 (qq.com)https://developer.work.weixin.qq.com/document/path/95895发送应用消息 - 文档 - 企业微信开发者中心 (qq.com)https://developer.work.weixin.qq.com/document/path/90236 2、该功能的实现是借鉴了逗老师的文章,但是两者之间的实现方式还是有所不同的,有想要看逗老师的文章的链接也放在下面啦
【逗老师带你学IT】通过企业微信推送AD域密码即将到期提醒_Python_运维开发网_运维开发技术经验分享 (qedev.com)https://www.qedev.com/python/320287.html 3、第一次在公司实习,自己研究了很久才成功完成这个小脚本,所以想要记录一下,同时也分享给大家防止大家跟我踩同样的坑。我是一个小菜鸡,以前都是看大佬的文章,这还是第一次自己写文章,如果有什么问题望大佬们指正哈
通过企业微信,向AD域过期用户发送更改密码提醒相关推荐
- 【.net】通过企业微信web api给指定用户发送消息
前言 在很多业务场景中经常会遇到与微博微信进行通信的需求,今天就和大家一起研究一下如果通过企业微信web api给指定用户发送消息 一.创建应用 1.打开企业微信并登录 https://work.we ...
- 使用企业微信的API给指定用户发送消息
上个月比较忙,等不忙了继续写点基础教程(五一还在高铁上写项目在).因为公司的原因,自己学习了点JavaWeb的知识,重新写了一个简单的后台管理,用于记录用户注册信息的.其中有这样的一个要求,就是在用户 ...
- python企业微信特定用户_使用企业微信的API给指定用户发送消息
/*** 微信发送消息 * *@authorPC-MXF **/ public classWeChatMsgSend {privateCloseableHttpClient httpClient;/* ...
- 企业微信开源系统,让开发者快速搭建基于企业微信的私域流量运营系统
"经过行业的实战应用,企业微信已经成为"私域流量"运营的主要工具" 尽管现在基于企业微信开发的第三方产品处于一个百家争鸣的时代,但仍旧未能看到一个开源的.真正为 ...
- 企业微信品牌私域运营案例合集
互联网下半场,正进入争夺流量.想尽办法抢占用户时间与心智的决赛圈.随着流量价格日益攀升,如何拉新.转化从而实现营收增长是企业亟待解决的问题.与此同时,私域流量一词也逐渐走进大众视野,从驻足观望到不得不 ...
- 【营销获客二】如何用企业微信搭建私域流量营销平台
[营销获客二]如何用企业微信搭建私域流量营销平台 一.环境背景 1.1 公域流量红利消退 1.2 私域流量运营势起 二.企业微信的核心价值 2.1 资源链接 2.2 流程简化 2.3 数据赋能 三.如 ...
- 如何用企业微信做好私域流量的引流、转化和精细化运营?
上周周三,也就是12月23日,企业微信举办了2020年度大会. 会议中,腾讯微信事业群副总裁黄铁鸣讲到,企业通过企业微信连接及服务的微信用户数已达4亿.使用企业微信的真实企业与组织数已超550万,活跃 ...
- 为什么用企业微信做私域运营
很多企业都在用企业微信做私域用户运营,取得了很不错的成绩,卖珠宝的周大福,家具定制的索菲亚,在线教育的猿辅导,休闲零食的恰恰,餐饮乐凯撒引流30万等 为什么这些企业都会采用企业微信呢?用两组数据,三件 ...
- 企业微信,私域流量的下一个战场
一.私域流量诞生背景 互联网时代常伴随着商业迅速变革,而私域流量打法一直处于核心地位,有了私域流量,就掌握了营销的主动权.很多洞察力强的朋友,看准了趋势,早期就开始玩私域流量,私域流量核心就是运营粉丝 ...
最新文章
- Linux QtCreator 设置mingw编译器生成windows程序
- 管理经验之没有必要的消费:空白卡片
- Android---手动创建线程与GUI线程同步(二)
- CSS属性之attr()
- tf.keras.layers.Attention 理解总结
- 硬件——nrf51822第二篇,如何设置keil用来下载程序
- [虚拟化/云][全栈demo] 为qemu增加一个PCI的watchdog外设(三)
- 洛谷P3265 装备购买
- 留学生Essay写作没思路的解决方案
- 【学习摘抄】渗透性测试方法和步骤
- python画国旗和八卦图
- 新手云服务器系统,新手云服务器系统
- 手机桌面计算机显示,手机如何显示在桌面?敬业签电脑手机同步云便签怎么在桌面显示便签?...
- Elasticsearch用java api 创建mapping
- Word中的字体大小(几号-几磅)【转】
- Accumulation Degree --- 换根dp
- 基于STM32F103 HAL库 MB85RS128 驱动程序
- 2012年5月编程语言排行榜:C再次位居第一 Java 占有率持续下降
- KEIL5 MDK编译后出现.\Output\led.axf: Error: L6218E: Undefined symbol SystemInitreferred from startup_解决方案
- 能ping通域名但是不能访问网页_给自己的网站免费套上cdn加速,访问更稳定