目录

·先决条件

·语言与环境

·基本思路

·代码说明

1、连接LDAP查询密码过期的用户

2、获取企业微信中应用的Access_Token

3、通过邮箱获取用户在企业微信中的ID

4、通过企业微信的自建应用向获取的用户ID发送通知信息

5、主函数

·代码展示

·一些说明


·先决条件

·拥有一个AD域,并且要有一个AD域中有管理权限的账号和密码,若想要创建一个测试环境可以参考该博主的文章:

windows server 2016配置AD域_芒果黑的博客-CSDN博客_server2016ad域https://blog.csdn.net/a137748099/article/details/115016923windows server2016创建AD域账户并登录_芒果黑的博客-CSDN博客_windows域账户登录https://blog.csdn.net/a137748099/article/details/115017435#:~:text=1%EF%BC%89%E5%8F%B3%E9%94%AEAD%E6%9C%8D%E5%8A%A1%E5%99%A8%EF%BC%8C%E9%80%89%E6%8B%A9Active%20Directory%E7%AE%A1%E7%90%86%E4%B8%AD%E5%BF%83,2%EF%BC%89%E5%8F%B3%E9%94%AEAD%E5%90%8D%E7%A7%B0%EF%BC%8C%E9%80%89%E6%8B%A9%E6%96%B0%E5%BB%BA%E2%86%92%E7%BB%84%E7%BB%87%E5%8D%95%E4%BD%8D%203%EF%BC%89%E9%80%89%E6%8B%A9%E7%A0%94%E5%8F%91%E9%83%A8%EF%BC%8C%E5%8F%B3%E9%94%AE%E6%96%B0%E5%BB%BA%E2%86%92%E7%94%A8%E6%88%B7

·拥有企业微信的后台管理权限的账号,并且创建一个自建应用,本文是通过企业我微信的自建应用对相应的企业微信用户进行通知。创建自建应用时请注意以下几点:

1、注意自建应用的用户可见范围,只有在自建应用的可见范围之内的用户才能够收到消息

2、有些应用有一个可信ip的设置功能,有些没有,我估计是企业微信改版了,老的应用没有,新创建的应用会有,如果有的话并且代码执行时报出ip不可信的错误,可以尝试设置一下应用中的可信ip

3、Secret和AgentID是在自建应用中获取,点进应用就会有AgentID,而Secret需要点击查看,他会以信息的形式发送到你的企业微信中

4、企业ID在“我的企业”的最下面

·语言与环境

·python3(所需的模块:json、requests、ldap3、datetime)

·Windows Server 2016

·基本思路

1、连接AD域的LDAP,对域中固定范围的所有用户进行递归查询,并获取范围内所有用户的相应信息(由于本公司情况是AD域中的邮箱与企业微信中的个人邮箱是相同的,因此此处获取的信息为用户邮箱、最后一次更改密码的时间)

2、筛选密码即将过期的用户,将获取的信息进行处理成字典数据类型

3、使用企业微信提供的通过邮箱查找用户ID的API,通过邮箱查找企业微信中该用户对应的企业微信ID

4、使用企业微信提供的通过用户ID向用户发送信息的API,向用户发送响应的信息

·代码说明

1、连接LDAP查询密码过期的用户

class Con_LDAP():def __init__(self, domain, ip, admin='administrator', pwd=None):#domain:域名,格式为:xxx.xxx.xxx#ip:AD域控制器所在ip地址,格式为:xxx.xxx.xxx.xxx#admin:AD域中管理员账号#pwd:AD域名中管理员密码self.domain = domainself.DC = ','.join(['DC=' + dc for dc in domain.split('.')])  # 将‘CSDN.com’字符串的形式转换为‘DC=CSDN,DC=com’的形式self.pre = domain.split('.')[0].upper()  # 用户登录的前缀,即域名的前一部分,如:域名为CSDN.com,用户登录的前缀即为CSDNself.ip = ipself.admin = adminself.pwd = pwdself.server = Server(self.ip, get_info=ALL)self.conn = Connection(self.server, user=self.pre + '\\' + self.admin,password=self.pwd, auto_bind=True,authentication=NTLM)     #连接AD域服务器#查询组织下的用户def search(self, org):#org: 组织,格式为:aaa.bbb 即bbb组织下的aaa组织,不包含域地址att_list = ['pwdLastSet','mail']    #所需查询的内容org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC  #将组织名处理成标准形式,如:将”CSDN技术部.Users.CSDN“处理成”OU=CSDN技术部,OU=Users,OU=CSDN“,再加上用户登录前缀#查询res = self.conn.search(search_base=org_base,    #查询的数据内容search_filter='(objectclass=user)',  # 查询数据的类型attributes=att_list,  # 查询数据的哪些属性paged_size=1000)  # 一次查询多少数据user_dict = dict()if res:for user in self.conn.entries:  #遍历查询结果last_time = str(user['pwdLastSet']) #将最后一次更改密码时间的数据类型转换为字符串型last_time = last_time[0:last_time.find('.')]    #去掉最后一次更改密码时间后的无关字符串last_time = last_time[0:last_time.find('+')]    #去掉最后一次更改密码时间后的无关字符串last_time = datetime.datetime.strptime(last_time, '%Y-%m-%d %H:%M:%S')  #更改数据类型为时间if (datetime.datetime.now() - last_time).days >= 83 and (datetime.datetime.now() - last_time).days <= 91:   #筛选7天内密码即将过期以及密码已经过期一天的用户user_dict[str(user['mail'])] = last_time    #将这些用户放入字典中(邮箱为key,上一次更改密码的时间为value)else:print('查询失败: ', self.conn.result['description'])return user_dict

2、获取企业微信中应用的Access_Token

若要使用企业微信的API对应用进行一些操作首先需要获取企业微信的Access_Token,这也相当于你的企业微信给你了一个许可凭证,允许你对企业中的这个应用进行信息的获取或者某些操作。

此处所需要的信息为:

·CORPID:企业ID

·CORPSECRET:创建的应用的Secret

这些信息需要自行在企业微信管理后台中寻找

CORPID = ‘xxxxxxxxxxxxxxxxxxx’ #企业ID
CORPSECRET = ‘xxxxxxxxxxxxxxxx’ #创建的应用的Secret
headers = {"Content-Type": "text/plain"}  #输出格式#获取企业微信中相应应用的Access_Token
def getAccessToken():try:request = requests.get(url="https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid="+CORPID+"&corpsecret="+CORPSECRET, headers=headers) #向企业微信api发送请求获取Access_Token凭证request = json.loads(request.text)  #将获取的结果转为字典数据类型access_token = Falseif request['errcode'] == 0:#获取返回信息的报错数据,并判断返回数据是否为0access_token = request['access_token']  #若报错信息为0,即请求未发生错误,获取返回信息中的Access_Tokenexcept Exception as err:raise errelse:return access_token #返回Access_Token

3、通过邮箱获取用户在企业微信中的ID

#通过邮箱获取该用户在企业微信中的ID
def getuserID(access_token, email):try:data = {"email": email, #指定的用户邮箱"email_type": 2 #查询的邮箱类型:1为企业邮箱,2为个人邮箱}request = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/user/get_userid_by_email?access_token=" + access_token,headers=headers, json=data) #向企业微信发送请求,通过邮箱获取用户的企业IDrequest = json.loads(request.text)userid = Falseif request['errcode'] == 0:userid = request['userid']except Exception as err:raise errelse:return userid

4、通过企业微信的自建应用向获取的用户ID发送通知信息

AGENTID = xxxxxxx #应用ID (此类型为int型)#使用Access_Token,通过企业中的应用向特定用户发送信息
def postMessage(access_token, agentid, email, PasswordLastSet):try:userid = getuserID(access_token, email) #通过用户邮箱获取用户在企业微信中的IDtime = 90-(datetime.datetime.now()-PasswordLastSet).days    #计算还有几天过期if time >= 0:   #若用户还没有过期data = {"touser": userid,   #所要发送的用户"msgtype": "text",  #发送内容的格式"agentid": agentid, #使用哪一个应用发送"text": {   #发送内容"content": "您的域账户密码(邮件、Jira、Git、windows开机密码)即将在"+str(time)+"天后过期, "+(PasswordLastSet+datetime.timedelta(days=+90)).strftime('%Y/%m/%d %H:%M:%S')+"之后您将无法使用该账户登陆相关系统,请您尽快更改密码。\n"},"safe": 0,  #表示发送内容是否保密(0为可对外分享,1为不可分享,切内容显示水印)"enable_id_trans": 0,   #是否开启id转译"enable_duplicate_check": 1,    #是否开启重复消息检查"duplicate_check_interval": 1   #重复消息检查的时间间隔,默认1800s,最大不超过4h(单位为s)}else:   #若用户已过期data = {"touser": userid,"msgtype": "text","agentid": agentid,"text": {"content": "您的域账户密码(邮件、Jira、Git、windows开机密码)已过期""之后您将无法使用该账户登陆相关系统,请您尽快更改密码。\n"},"safe": 0,"enable_id_trans": 0,"enable_duplicate_check": 1,"duplicate_check_interval": 1}request = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="+access_token, headers=headers, json=data)  #向企业微信发送请求,将信息发送给相应的用户except Exception as err:raise errelse:return request.text

5、主函数

def main():try:adladp = Adoper(domain='CSDN.com', ip='xxx.xxx.xxx.xxx', pwd='xxxxxx', admin='xxxxxx')   #创建一个Adoper对象,并连接AD域服务器user_dict = adladp.search('Users.CSDN')  #提供查询条件if user_dict:   #如果user_dict不为空access_token = getAccessToken() #通过方法获取Access_Tokenfor user in user_dict.keys(): #遍历字典中的keytry:passpostMessage(access_token, AGENTID, user, user_dict[user]) #发送消息except Exception as err:raise errelse:print('No user')except Exception as err:raise err

·代码展示

import json
import requests
from ldap3 import Server, Connection, ALL, NTLM
import datetimeAGENTID = xxxxxxx #应用ID (此类型为int型)
CORPID = ‘xxxxxxxxxxxxxxxxxxx’ #企业ID
CORPSECRET = ‘xxxxxxxxxxxxxxxx’ #创建的应用的Secret
headers = {"Content-Type": "text/plain"}  #输出格式class Con_LDAP():def __init__(self, domain, ip, admin='administrator', pwd=None):#domain:域名,格式为:xxx.xxx.xxx#ip:AD域控制器所在ip地址,格式为:xxx.xxx.xxx.xxx#admin:AD域中管理员账号#pwd:AD域名中管理员密码self.domain = domainself.DC = ','.join(['DC=' + dc for dc in domain.split('.')])  # 将‘CSDN.com’字符串的形式转换为‘DC=CSDN,DC=com’的形式self.pre = domain.split('.')[0].upper()  # 用户登录的前缀,即域名的前一部分,如:域名为CSDN.com,用户登录的前缀即为CSDNself.ip = ipself.admin = adminself.pwd = pwdself.server = Server(self.ip, get_info=ALL)self.conn = Connection(self.server, user=self.pre + '\\' + self.admin,password=self.pwd, auto_bind=True,authentication=NTLM)     #连接AD域服务器#查询组织下的用户def search(self, org):#org: 组织,格式为:aaa.bbb 即bbb组织下的aaa组织,不包含域地址att_list = ['pwdLastSet','mail']    #所需查询的内容org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC  #将组织名处理成标准形式,如:将”CSDN技术部.Users.CSDN“处理成”OU=CSDN技术部,OU=Users,OU=CSDN“,再加上用户登录前缀#查询res = self.conn.search(search_base=org_base,    #查询的数据内容search_filter='(objectclass=user)',  # 查询数据的类型attributes=att_list,  # 查询数据的哪些属性paged_size=1000)  # 一次查询多少数据user_dict = dict()if res:for user in self.conn.entries:  #遍历查询结果last_time = str(user['pwdLastSet']) #将最后一次更改密码时间的数据类型转换为字符串型last_time = last_time[0:last_time.find('.')]    #去掉最后一次更改密码时间后的无关字符串last_time = last_time[0:last_time.find('+')]    #去掉最后一次更改密码时间后的无关字符串last_time = datetime.datetime.strptime(last_time, '%Y-%m-%d %H:%M:%S')  #更改数据类型为时间if (datetime.datetime.now() - last_time).days >= 83 and (datetime.datetime.now() - last_time).days <= 91:   #筛选7天内密码即将过期以及密码已经过期一天的用户user_dict[str(user['mail'])] = last_time    #将这些用户放入字典中(邮箱为key,上一次更改密码的时间为value)else:print('查询失败: ', self.conn.result['description'])return user_dict#获取企业微信中相应应用的Access_Token
def getAccessToken():try:request = requests.get(url="https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid="+CORPID+"&corpsecret="+CORPSECRET, headers=headers) #向企业微信api发送请求获取Access_Token凭证request = json.loads(request.text)  #将获取的结果转为字典数据类型access_token = Falseif request['errcode'] == 0:#获取返回信息的报错数据,并判断返回数据是否为0access_token = request['access_token']  #若报错信息为0,即请求未发生错误,获取返回信息中的Access_Tokenexcept Exception as err:raise errelse:return access_token #返回Access_Token#通过邮箱获取该用户在企业微信中的ID
def getuserID(access_token, email):try:data = {"email": email, #指定的用户邮箱"email_type": 2 #查询的邮箱类型:1为企业邮箱,2为个人邮箱}request = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/user/get_userid_by_email?access_token=" + access_token,headers=headers, json=data) #向企业微信发送请求,通过邮箱获取用户的企业IDrequest = json.loads(request.text)userid = Falseif request['errcode'] == 0:userid = request['userid']except Exception as err:raise errelse:return userid#使用Access_Token,通过企业中的应用向特定用户发送信息
def postMessage(access_token, agentid, email, PasswordLastSet):try:userid = getuserID(access_token, email) #通过用户邮箱获取用户在企业微信中的IDtime = 90-(datetime.datetime.now()-PasswordLastSet).days    #计算还有几天过期if time >= 0:   #若用户还没有过期data = {"touser": userid,   #所要发送的用户"msgtype": "text",  #发送内容的格式"agentid": agentid, #使用哪一个应用发送"text": {   #发送内容"content": "您的域账户密码(邮件、Jira、Git、windows开机密码)即将在"+str(time)+"天后过期, "+(PasswordLastSet+datetime.timedelta(days=+90)).strftime('%Y/%m/%d %H:%M:%S')+"之后您将无法使用该账户登陆相关系统,请您尽快更改密码。\n"},"safe": 0,  #表示发送内容是否保密(0为可对外分享,1为不可分享,切内容显示水印)"enable_id_trans": 0,   #是否开启id转译"enable_duplicate_check": 1,    #是否开启重复消息检查"duplicate_check_interval": 1   #重复消息检查的时间间隔,默认1800s,最大不超过4h(单位为s)}else:   #若用户已过期data = {"touser": userid,"msgtype": "text","agentid": agentid,"text": {"content": "您的域账户密码(邮件、Jira、Git、windows开机密码)已过期""之后您将无法使用该账户登陆相关系统,请您尽快更改密码。\n"},"safe": 0,"enable_id_trans": 0,"enable_duplicate_check": 1,"duplicate_check_interval": 1}request = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="+access_token, headers=headers, json=data)  #向企业微信发送请求,将信息发送给相应的用户except Exception as err:raise errelse:return request.textdef main():try:adladp = Adoper(domain='CSDN.com', ip='xxx.xxx.xxx.xxx', pwd='xxxxxx', admin='xxxxxx')   #创建一个Adoper对象,并连接AD域服务器user_dict = adladp.search('Users.CSDN')  #提供查询条件if user_dict:   #如果user_dict不为空access_token = getAccessToken() #通过方法获取Access_Tokenfor user in user_dict.keys(): #遍历字典中的keytry:passpostMessage(access_token, AGENTID, user, user_dict[user]) #发送消息except Exception as err:raise errelse:print('No user')except Exception as err:raise errif __name__ == '__main__':main()

·一些说明

1、有想要了解企业微信设置的API的朋友可以看一下企业微信开发者中心,这里上一下链接

企业微信开发者中心https://developer.work.weixin.qq.com/               本文所用到的是以下三个API

获取access_token - 文档 - 企业微信开发者中心 (qq.com)https://developer.work.weixin.qq.com/document/path/91039邮箱获取userid - 文档 - 企业微信开发者中心 (qq.com)https://developer.work.weixin.qq.com/document/path/95895发送应用消息 - 文档 - 企业微信开发者中心 (qq.com)https://developer.work.weixin.qq.com/document/path/90236        2、该功能的实现是借鉴了逗老师的文章,但是两者之间的实现方式还是有所不同的,有想要看逗老师的文章的链接也放在下面啦
【逗老师带你学IT】通过企业微信推送AD域密码即将到期提醒_Python_运维开发网_运维开发技术经验分享 (qedev.com)https://www.qedev.com/python/320287.html        3、第一次在公司实习,自己研究了很久才成功完成这个小脚本,所以想要记录一下,同时也分享给大家防止大家跟我踩同样的坑。我是一个小菜鸡,以前都是看大佬的文章,这还是第一次自己写文章,如果有什么问题望大佬们指正哈

通过企业微信,向AD域过期用户发送更改密码提醒相关推荐

  1. 【.net】通过企业微信web api给指定用户发送消息

    前言 在很多业务场景中经常会遇到与微博微信进行通信的需求,今天就和大家一起研究一下如果通过企业微信web api给指定用户发送消息 一.创建应用 1.打开企业微信并登录 https://work.we ...

  2. 使用企业微信的API给指定用户发送消息

    上个月比较忙,等不忙了继续写点基础教程(五一还在高铁上写项目在).因为公司的原因,自己学习了点JavaWeb的知识,重新写了一个简单的后台管理,用于记录用户注册信息的.其中有这样的一个要求,就是在用户 ...

  3. python企业微信特定用户_使用企业微信的API给指定用户发送消息

    /*** 微信发送消息 * *@authorPC-MXF **/ public classWeChatMsgSend {privateCloseableHttpClient httpClient;/* ...

  4. 企业微信开源系统,让开发者快速搭建基于企业微信的私域流量运营系统

    "经过行业的实战应用,企业微信已经成为"私域流量"运营的主要工具" 尽管现在基于企业微信开发的第三方产品处于一个百家争鸣的时代,但仍旧未能看到一个开源的.真正为 ...

  5. 企业微信品牌私域运营案例合集

    互联网下半场,正进入争夺流量.想尽办法抢占用户时间与心智的决赛圈.随着流量价格日益攀升,如何拉新.转化从而实现营收增长是企业亟待解决的问题.与此同时,私域流量一词也逐渐走进大众视野,从驻足观望到不得不 ...

  6. 【营销获客二】如何用企业微信搭建私域流量营销平台

    [营销获客二]如何用企业微信搭建私域流量营销平台 一.环境背景 1.1 公域流量红利消退 1.2 私域流量运营势起 二.企业微信的核心价值 2.1 资源链接 2.2 流程简化 2.3 数据赋能 三.如 ...

  7. 如何用企业微信做好私域流量的引流、转化和精细化运营?

    上周周三,也就是12月23日,企业微信举办了2020年度大会. 会议中,腾讯微信事业群副总裁黄铁鸣讲到,企业通过企业微信连接及服务的微信用户数已达4亿.使用企业微信的真实企业与组织数已超550万,活跃 ...

  8. 为什么用企业微信做私域运营

    很多企业都在用企业微信做私域用户运营,取得了很不错的成绩,卖珠宝的周大福,家具定制的索菲亚,在线教育的猿辅导,休闲零食的恰恰,餐饮乐凯撒引流30万等 为什么这些企业都会采用企业微信呢?用两组数据,三件 ...

  9. 企业微信,私域流量的下一个战场

    一.私域流量诞生背景 互联网时代常伴随着商业迅速变革,而私域流量打法一直处于核心地位,有了私域流量,就掌握了营销的主动权.很多洞察力强的朋友,看准了趋势,早期就开始玩私域流量,私域流量核心就是运营粉丝 ...

最新文章

  1. Linux QtCreator 设置mingw编译器生成windows程序
  2. 管理经验之没有必要的消费:空白卡片
  3. Android---手动创建线程与GUI线程同步(二)
  4. CSS属性之attr()
  5. tf.keras.layers.Attention 理解总结
  6. 硬件——nrf51822第二篇,如何设置keil用来下载程序
  7. [虚拟化/云][全栈demo] 为qemu增加一个PCI的watchdog外设(三)
  8. 洛谷P3265 装备购买
  9. 留学生Essay写作没思路的解决方案
  10. 【学习摘抄】渗透性测试方法和步骤
  11. python画国旗和八卦图
  12. 新手云服务器系统,新手云服务器系统
  13. 手机桌面计算机显示,手机如何显示在桌面?敬业签电脑手机同步云便签怎么在桌面显示便签?...
  14. Elasticsearch用java api 创建mapping
  15. Word中的字体大小(几号-几磅)【转】
  16. Accumulation Degree --- 换根dp
  17. 基于STM32F103 HAL库 MB85RS128 驱动程序
  18. 2012年5月编程语言排行榜:C再次位居第一 Java 占有率持续下降
  19. KEIL5 MDK编译后出现.\Output\led.axf: Error: L6218E: Undefined symbol SystemInitreferred from startup_解决方案
  20. 能ping通域名但是不能访问网页_给自己的网站免费套上cdn加速,访问更稳定

热门文章

  1. PV操作详解(附详细例题解析和总结)
  2. PTA 使我精神焕发
  3. 【MySQL多表操作练习】
  4. 蚂蚁金服崔恒斌:金融智能——对话机器人新形态
  5. 组图:1936年柏林奥运会
  6. java大文件加密速度_java版AES文件加密速度问题
  7. Humble Numbers (谦卑数 || 丑数)
  8. 如果编程语言是女孩子……
  9. 强悍的Spring之spring validation
  10. NETDMIS5.0脱机编程2023