微信接入机器人实现对别人消息和群at消息的自动回复

有时候,我们想让我们的微信号对别人发出的各种消息做出回复。我们可以通过接入图灵机器人的方式实现。当然,我是盗用别人的成果加以修改的。
  • IDLE编写py文件并保存,命名为wxbot。
#!/usr/bin/env python
# coding: utf-8import os
import sys
import traceback
import webbrowser
import pyqrcode
import requests
import mimetypes
import json
import xml.dom.minidom
import urllib
import time
import re
import random
from traceback import format_exc
from requests.exceptions import ConnectionError, ReadTimeout
import HTMLParserUNKONWN = 'unkonwn'
SUCCESS = '200'
SCANED = '201'
TIMEOUT = '408'def map_username_batch(user_name):return {"UserName": user_name, "EncryChatRoomId": ""}def show_image(file_path):"""跨平台显示图片文件:param file_path: 图片文件路径"""if sys.version_info >= (3, 3):from shlex import quoteelse:from pipes import quoteif sys.platform == "darwin":command = "open -a /Applications/Preview.app %s&" % quote(file_path)os.system(command)else:webbrowser.open(os.path.join(os.getcwd(),'temp',file_path))class SafeSession(requests.Session):def request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None,timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None,json=None):for i in range(3):try:return super(SafeSession, self).request(method, url, params, data, headers, cookies, files, auth,timeout,allow_redirects, proxies, hooks, stream, verify, cert, json)except Exception as e:print e.message, traceback.format_exc()continue#重试3次以后再加一次,抛出异常try:return super(SafeSession, self).request(method, url, params, data, headers, cookies, files, auth,timeout,allow_redirects, proxies, hooks, stream, verify, cert, json)except Exception as e:raise eclass WXBot:"""WXBot功能类"""def __init__(self):self.DEBUG = Falseself.uuid = ''self.base_uri = ''self.base_host = ''self.redirect_uri = ''self.uin = ''self.sid = ''self.skey = ''self.pass_ticket = ''self.device_id = 'e' + repr(random.random())[2:17]self.base_request = {}self.sync_key_str = ''self.sync_key = []self.sync_host = ''status = 'wait4login'    #表示机器人状态,供WEBAPI读取,WxbotManage使用bot_conf = {} #机器人配置,在webapi初始化的时候传入,后续也可修改,WxbotManage使用self.batch_count = 50    #一次拉取50个联系人的信息self.full_user_name_list = []    #直接获取不到通讯录时,获取的username列表self.wxid_list = []   #获取到的wxid的列表self.cursor = 0   #拉取联系人信息的游标self.is_big_contact = False  #通讯录人数过多,无法直接获取#文件缓存目录self.temp_pwd  =  os.path.join(os.getcwd(),'temp')if os.path.exists(self.temp_pwd) == False:os.makedirs(self.temp_pwd)self.session = SafeSession()self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5'})self.conf = {'qr': 'png'}self.my_account = {}  # 当前账户# 所有相关账号: 联系人, 公众号, 群组, 特殊账号self.member_list = []# 所有群组的成员, {'group_id1': [member1, member2, ...], ...}self.group_members = {}# 所有账户, {'group_member':{'id':{'type':'group_member', 'info':{}}, ...}, 'normal_member':{'id':{}, ...}}self.account_info = {'group_member': {}, 'normal_member': {}}self.contact_list = []  # 联系人列表self.public_list = []  # 公众账号列表self.group_list = []  # 群聊列表self.special_list = []  # 特殊账号列表self.encry_chat_room_id_list = []  # 存储群聊的EncryChatRoomId,获取群内成员头像时需要用到self.file_index = 0#在未传入bot_conf的情况下尝试载入本地配置文件,WxbotManage使用def load_conf(self,bot_conf):try:if bot_conf == {}:with open(os.path.join(self.temp_pwd,'bot_conf.json')) as f:self.bot_conf= json.loads(f.read())except:self.bot_conf = {}#保存配置文件,WxbotManage使用def save_conf(self):with open(os.path.join(self.temp_pwd,'bot_conf.json'), 'w') as f:f.write(json.dumps(self.bot_conf))@staticmethoddef to_unicode(string, encoding='utf-8'):"""将字符串转换为Unicode:param string: 待转换字符串:param encoding: 字符串解码方式:return: 转换后的Unicode字符串"""if isinstance(string, str):return string.decode(encoding)elif isinstance(string, unicode):return stringelse:raise Exception('Unknown Type')def get_contact(self):"""获取当前账户的所有相关账号(包括联系人、公众号、群聊、特殊账号)"""if self.is_big_contact:return Falseurl = self.base_uri + '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' \% (self.pass_ticket, self.skey, int(time.time()))#如果通讯录联系人过多,这里会直接获取失败try:r = self.session.post(url, data='{}')except Exception as e:self.is_big_contact = Truereturn Falser.encoding = 'utf-8'if self.DEBUG:with open(os.path.join(self.temp_pwd,'contacts.json'), 'w') as f:f.write(r.text.encode('utf-8'))dic = json.loads(r.text)self.member_list = dic['MemberList']special_users = ['newsapp', 'fmessage', 'filehelper', 'weibo', 'qqmail','fmessage', 'tmessage', 'qmessage', 'qqsync', 'floatbottle','lbsapp', 'shakeapp', 'medianote', 'qqfriend', 'readerapp','blogapp', 'facebookapp', 'masssendapp', 'meishiapp','feedsapp', 'voip', 'blogappweixin', 'weixin', 'brandsessionholder','weixinreminder', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c','officialaccounts', 'notification_messages', 'wxid_novlwrv3lqwv11','gh_22b87fa7cb3c', 'wxitil', 'userexperience_alarm', 'notification_messages']self.contact_list = []self.public_list = []self.special_list = []self.group_list = []for contact in self.member_list:if contact['VerifyFlag'] & 8 != 0:  # 公众号self.public_list.append(contact)self.account_info['normal_member'][contact['UserName']] = {'type': 'public', 'info': contact}elif contact['UserName'] in special_users:  # 特殊账户self.special_list.append(contact)self.account_info['normal_member'][contact['UserName']] = {'type': 'special', 'info': contact}elif contact['UserName'].find('@@') != -1:  # 群聊self.group_list.append(contact)self.account_info['normal_member'][contact['UserName']] = {'type': 'group', 'info': contact}elif contact['UserName'] == self.my_account['UserName']:  # 自己self.account_info['normal_member'][contact['UserName']] = {'type': 'self', 'info': contact}else:self.contact_list.append(contact)self.account_info['normal_member'][contact['UserName']] = {'type': 'contact', 'info': contact}self.batch_get_group_members()for group in self.group_members:for member in self.group_members[group]:if member['UserName'] not in self.account_info:self.account_info['group_member'][member['UserName']] = \{'type': 'group_member', 'info': member, 'group': group}if self.DEBUG:with open(os.path.join(self.temp_pwd,'contact_list.json'), 'w') as f:f.write(json.dumps(self.contact_list))with open(os.path.join(self.temp_pwd,'special_list.json'), 'w') as f:f.write(json.dumps(self.special_list))with open(os.path.join(self.temp_pwd,'group_list.json'), 'w') as f:f.write(json.dumps(self.group_list))with open(os.path.join(self.temp_pwd,'public_list.json'), 'w') as f:f.write(json.dumps(self.public_list))with open(os.path.join(self.temp_pwd,'member_list.json'), 'w') as f:f.write(json.dumps(self.member_list))with open(os.path.join(self.temp_pwd,'group_users.json'), 'w') as f:f.write(json.dumps(self.group_members))with open(os.path.join(self.temp_pwd,'account_info.json'), 'w') as f:f.write(json.dumps(self.account_info))return Truedef get_big_contact(self):total_len = len(self.full_user_name_list)user_info_list = []#一次拉取50个联系人的信息,包括所有的群聊,公众号,好友while self.cursor < total_len:cur_batch = self.full_user_name_list[self.cursor:(self.cursor+self.batch_count)]self.cursor += self.batch_countcur_batch = map(map_username_batch, cur_batch)user_info_list += self.batch_get_contact(cur_batch)print "[INFO] Get batch contacts"self.member_list = user_info_listspecial_users = ['newsapp', 'filehelper', 'weibo', 'qqmail','fmessage', 'tmessage', 'qmessage', 'qqsync', 'floatbottle','lbsapp', 'shakeapp', 'medianote', 'qqfriend', 'readerapp','blogapp', 'facebookapp', 'masssendapp', 'meishiapp','feedsapp', 'voip', 'blogappweixin', 'weixin', 'brandsessionholder','weixinreminder', 'wxid_novlwrv3lqwv11','officialaccounts','gh_22b87fa7cb3c', 'wxitil', 'userexperience_alarm', 'notification_messages', 'notifymessage']self.contact_list = []self.public_list = []self.special_list = []self.group_list = []for i, contact in enumerate(self.member_list):if contact['VerifyFlag'] & 8 != 0:  # 公众号self.public_list.append(contact)self.account_info['normal_member'][contact['UserName']] = {'type': 'public', 'info': contact}elif contact['UserName'] in special_users or self.wxid_list[i] in special_users:  # 特殊账户self.special_list.append(contact)self.account_info['normal_member'][contact['UserName']] = {'type': 'special', 'info': contact}elif contact['UserName'].find('@@') != -1:  # 群聊self.group_list.append(contact)self.account_info['normal_member'][contact['UserName']] = {'type': 'group', 'info': contact}elif contact['UserName'] == self.my_account['UserName']:  # 自己self.account_info['normal_member'][contact['UserName']] = {'type': 'self', 'info': contact}else:self.contact_list.append(contact)self.account_info['normal_member'][contact['UserName']] = {'type': 'contact', 'info': contact}group_members = {}encry_chat_room_id = {}for group in self.group_list:gid = group['UserName']members = group['MemberList']group_members[gid] = membersencry_chat_room_id[gid] = group['EncryChatRoomId']self.group_members = group_membersself.encry_chat_room_id_list = encry_chat_room_idfor group in self.group_members:for member in self.group_members[group]:if member['UserName'] not in self.account_info:self.account_info['group_member'][member['UserName']] = \{'type': 'group_member', 'info': member, 'group': group}if self.DEBUG:with open(os.path.join(self.temp_pwd,'contact_list.json'), 'w') as f:f.write(json.dumps(self.contact_list))with open(os.path.join(self.temp_pwd,'special_list.json'), 'w') as f:f.write(json.dumps(self.special_list))with open(os.path.join(self.temp_pwd,'group_list.json'), 'w') as f:f.write(json.dumps(self.group_list))with open(os.path.join(self.temp_pwd,'public_list.json'), 'w') as f:f.write(json.dumps(self.public_list))with open(os.path.join(self.temp_pwd,'member_list.json'), 'w') as f:f.write(json.dumps(self.member_list))with open(os.path.join(self.temp_pwd,'group_users.json'), 'w') as f:f.write(json.dumps(self.group_members))with open(os.path.join(self.temp_pwd,'account_info.json'), 'w') as f:f.write(json.dumps(self.account_info))print '[INFO] Get %d contacts' % len(self.contact_list)print '[INFO] Start to process messages .'return Truedef batch_get_contact(self, cur_batch):"""批量获取成员信息"""url = self.base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (int(time.time()), self.pass_ticket)params = {'BaseRequest': self.base_request,"Count": len(cur_batch),"List": cur_batch}r = self.session.post(url, data=json.dumps(params))r.encoding = 'utf-8'dic = json.loads(r.text)#print dic['ContactList']return dic['ContactList']def batch_get_group_members(self):"""批量获取所有群聊成员信息"""url = self.base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (int(time.time()), self.pass_ticket)params = {'BaseRequest': self.base_request,"Count": len(self.group_list),"List": [{"UserName": group['UserName'], "EncryChatRoomId": ""} for group in self.group_list]}r = self.session.post(url, data=json.dumps(params))r.encoding = 'utf-8'dic = json.loads(r.text)group_members = {}encry_chat_room_id = {}for group in dic['ContactList']:gid = group['UserName']members = group['MemberList']group_members[gid] = membersencry_chat_room_id[gid] = group['EncryChatRoomId']self.group_members = group_membersself.encry_chat_room_id_list = encry_chat_room_iddef get_group_member_name(self, gid, uid):"""获取群聊中指定成员的名称信息:param gid: 群id:param uid: 群聊成员id:return: 名称信息,类似 {"display_name": "test_user", "nickname": "test", "remark_name": "for_test" }"""if gid not in self.group_members:return Nonegroup = self.group_members[gid]for member in group:if member['UserName'] == uid:names = {}if 'RemarkName' in member and member['RemarkName']:names['remark_name'] = member['RemarkName']if 'NickName' in member and member['NickName']:names['nickname'] = member['NickName']if 'DisplayName' in member and member['DisplayName']:names['display_name'] = member['DisplayName']return namesreturn Nonedef get_contact_info(self, uid):return self.account_info['normal_member'].get(uid)def get_group_member_info(self, uid):return self.account_info['group_member'].get(uid)def get_contact_name(self, uid):info = self.get_contact_info(uid)if info is None:return Noneinfo = info['info']name = {}if 'RemarkName' in info and info['RemarkName']:name['remark_name'] = info['RemarkName']if 'NickName' in info and info['NickName']:name['nickname'] = info['NickName']if 'DisplayName' in info and info['DisplayName']:name['display_name'] = info['DisplayName']if len(name) == 0:return Noneelse:return name@staticmethoddef get_contact_prefer_name(name):if name is None:return Noneif 'remark_name' in name:return name['remark_name']if 'nickname' in name:return name['nickname']if 'display_name' in name:return name['display_name']return None@staticmethoddef get_group_member_prefer_name(name):if name is None:return Noneif 'remark_name' in name:return name['remark_name']if 'display_name' in name:return name['display_name']if 'nickname' in name:return name['nickname']return Nonedef get_user_type(self, wx_user_id):"""获取特定账号与自己的关系:param wx_user_id: 账号id::return: 与当前账号的关系"""for account in self.contact_list:if wx_user_id == account['UserName']:return 'contact'for account in self.public_list:if wx_user_id == account['UserName']:return 'public'for account in self.special_list:if wx_user_id == account['UserName']:return 'special'for account in self.group_list:if wx_user_id == account['UserName']:return 'group'for group in self.group_members:for member in self.group_members[group]:if member['UserName'] == wx_user_id:return 'group_member'return 'unknown'def is_contact(self, uid):for account in self.contact_list:if uid == account['UserName']:return Truereturn Falsedef is_public(self, uid):for account in self.public_list:if uid == account['UserName']:return Truereturn Falsedef is_special(self, uid):for account in self.special_list:if uid == account['UserName']:return Truereturn Falsedef handle_msg_all(self, msg):"""处理所有消息,请子类化后覆盖此函数msg:msg_id  ->  消息idmsg_type_id  ->  消息类型iduser  ->  发送消息的账号idcontent  ->  消息内容:param msg: 收到的消息"""pass@staticmethoddef proc_at_info(msg):if not msg:return '', []segs = msg.split(u'\u2005')str_msg_all = ''str_msg = ''infos = []if len(segs) > 1:for i in range(0, len(segs) - 1):segs[i] += u'\u2005'pm = re.search(u'@.*\u2005', segs[i]).group()if pm:name = pm[1:-1]string = segs[i].replace(pm, '')str_msg_all += string + '@' + name + ' 'str_msg += stringif string:infos.append({'type': 'str', 'value': string})infos.append({'type': 'at', 'value': name})else:infos.append({'type': 'str', 'value': segs[i]})str_msg_all += segs[i]str_msg += segs[i]str_msg_all += segs[-1]str_msg += segs[-1]infos.append({'type': 'str', 'value': segs[-1]})else:infos.append({'type': 'str', 'value': segs[-1]})str_msg_all = msgstr_msg = msgreturn str_msg_all.replace(u'\u2005', ''), str_msg.replace(u'\u2005', ''), infosdef extract_msg_content(self, msg_type_id, msg):"""content_type_id:0 -> Text1 -> Location3 -> Image4 -> Voice5 -> Recommend6 -> Animation7 -> Share8 -> Video9 -> VideoCall10 -> Redraw11 -> Empty99 -> Unknown:param msg_type_id: 消息类型id:param msg: 消息结构体:return: 解析的消息"""mtype = msg['MsgType']content = HTMLParser.HTMLParser().unescape(msg['Content'])msg_id = msg['MsgId']msg_content = {}if msg_type_id == 0:return {'type': 11, 'data': ''}elif msg_type_id == 2:  # File Helperreturn {'type': 0, 'data': content.replace('<br/>', '\n')}elif msg_type_id == 3:  # 群聊sp = content.find('<br/>')uid = content[:sp]content = content[sp:]content = content.replace('<br/>', '')uid = uid[:-1]name = self.get_contact_prefer_name(self.get_contact_name(uid))if not name:name = self.get_group_member_prefer_name(self.get_group_member_name(msg['FromUserName'], uid))if not name:name = 'unknown'msg_content['user'] = {'id': uid, 'name': name}else:  # Self, Contact, Special, Public, Unknownpassmsg_prefix = (msg_content['user']['name'] + ':') if 'user' in msg_content else ''if mtype == 1:if content.find('http://weixin.qq.com/cgi-bin/redirectforward?args=') != -1:r = self.session.get(content)r.encoding = 'gbk'data = r.textpos = self.search_content('title', data, 'xml')msg_content['type'] = 1msg_content['data'] = posmsg_content['detail'] = dataif self.DEBUG:print '    %s[Location] %s ' % (msg_prefix, pos)else:msg_content['type'] = 0if msg_type_id == 3 or (msg_type_id == 1 and msg['ToUserName'][:2] == '@@'):  # Group text messagemsg_infos = self.proc_at_info(content)str_msg_all = msg_infos[0]str_msg = msg_infos[1]detail = msg_infos[2]msg_content['data'] = str_msg_allmsg_content['detail'] = detailmsg_content['desc'] = str_msgelse:msg_content['data'] = contentif self.DEBUG:try:print '    %s[Text] %s' % (msg_prefix, msg_content['data'])except UnicodeEncodeError:print '    %s[Text] (illegal text).' % msg_prefixelif mtype == 3:msg_content['type'] = 3msg_content['data'] = self.get_msg_img_url(msg_id)msg_content['img'] = self.session.get(msg_content['data']).content.encode('hex')if self.DEBUG:image = self.get_msg_img(msg_id)print '    %s[Image] %s' % (msg_prefix, image)elif mtype == 34:msg_content['type'] = 4msg_content['data'] = self.get_voice_url(msg_id)msg_content['voice'] = self.session.get(msg_content['data']).content.encode('hex')if self.DEBUG:voice = self.get_voice(msg_id)print '    %s[Voice] %s' % (msg_prefix, voice)elif mtype == 37:msg_content['type'] = 37msg_content['data'] = msg['RecommendInfo']if self.DEBUG:print '    %s[useradd] %s' % (msg_prefix,msg['RecommendInfo']['NickName'])elif mtype == 42:msg_content['type'] = 5info = msg['RecommendInfo']msg_content['data'] = {'nickname': info['NickName'],'alias': info['Alias'],'province': info['Province'],'city': info['City'],'gender': ['unknown', 'male', 'female'][info['Sex']]}if self.DEBUG:print '    %s[Recommend]' % msg_prefixprint '    -----------------------------'print '    | NickName: %s' % info['NickName']print '    | Alias: %s' % info['Alias']print '    | Local: %s %s' % (info['Province'], info['City'])print '    | Gender: %s' % ['unknown', 'male', 'female'][info['Sex']]print '    -----------------------------'elif mtype == 47:msg_content['type'] = 6msg_content['data'] = self.search_content('cdnurl', content)if self.DEBUG:print '    %s[Animation] %s' % (msg_prefix, msg_content['data'])elif mtype == 49:msg_content['type'] = 7if msg['AppMsgType'] == 3:app_msg_type = 'music'elif msg['AppMsgType'] == 5:app_msg_type = 'link'elif msg['AppMsgType'] == 7:app_msg_type = 'weibo'else:app_msg_type = 'unknown'msg_content['data'] = {'type': app_msg_type,'title': msg['FileName'],'desc': self.search_content('des', content, 'xml'),'url': msg['Url'],'from': self.search_content('appname', content, 'xml'),'content': msg.get('Content')  # 有的公众号会发一次性3 4条链接一个大图,如果只url那只能获取第一条,content里面有所有的链接}if self.DEBUG:print '    %s[Share] %s' % (msg_prefix, app_msg_type)print '    --------------------------'print '    | title: %s' % msg['FileName']print '    | desc: %s' % self.search_content('des', content, 'xml')print '    | link: %s' % msg['Url']print '    | from: %s' % self.search_content('appname', content, 'xml')print '    | content: %s' % (msg.get('content')[:20] if msg.get('content') else "unknown")print '    --------------------------'elif mtype == 62:msg_content['type'] = 8msg_content['data'] = contentif self.DEBUG:print '    %s[Video] Please check on mobiles' % msg_prefixelif mtype == 53:msg_content['type'] = 9msg_content['data'] = contentif self.DEBUG:print '    %s[Video Call]' % msg_prefixelif mtype == 10002:msg_content['type'] = 10msg_content['data'] = contentif self.DEBUG:print '    %s[Redraw]' % msg_prefixelif mtype == 10000:  # unknown, maybe red packet, or group invitemsg_content['type'] = 12msg_content['data'] = msg['Content']if self.DEBUG:print '    [Unknown]'elif mtype == 43:msg_content['type'] = 13msg_content['data'] = self.get_video_url(msg_id)if self.DEBUG:print '    %s[video] %s' % (msg_prefix, msg_content['data'])else:msg_content['type'] = 99msg_content['data'] = contentif self.DEBUG:print '    %s[Unknown]' % msg_prefixreturn msg_contentdef handle_msg(self, r):"""处理原始微信消息的内部函数msg_type_id:0 -> Init1 -> Self2 -> FileHelper3 -> Group4 -> Contact5 -> Public6 -> Special99 -> Unknown:param r: 原始微信消息"""for msg in r['AddMsgList']:user = {'id': msg['FromUserName'], 'name': 'unknown'}if msg['MsgType'] == 51 and msg['StatusNotifyCode'] == 4:  # init messagemsg_type_id = 0user['name'] = 'system'#会获取所有联系人的username 和 wxid,但是会收到3次这个消息,只取第一次if self.is_big_contact and len(self.full_user_name_list) == 0:self.full_user_name_list = msg['StatusNotifyUserName'].split(",")self.wxid_list = re.search(r"username&gt;(.*?)&lt;/username", msg["Content"]).group(1).split(",")with open(os.path.join(self.temp_pwd,'UserName.txt'), 'w') as f:f.write(msg['StatusNotifyUserName'])with open(os.path.join(self.temp_pwd,'wxid.txt'), 'w') as f:f.write(json.dumps(self.wxid_list))print "[INFO] Contact list is too big. Now start to fetch member list ."self.get_big_contact()elif msg['MsgType'] == 37:  # friend requestmsg_type_id = 37pass# content = msg['Content']# username = content[content.index('fromusername='): content.index('encryptusername')]# username = username[username.index('"') + 1: username.rindex('"')]# print u'[Friend Request]'# print u'       Nickname:' + msg['RecommendInfo']['NickName']# print u'       附加消息:'+msg['RecommendInfo']['Content']# # print u'Ticket:'+msg['RecommendInfo']['Ticket'] # Ticket添加好友时要用# print u'       微信号:'+username #未设置微信号的 腾讯会自动生成一段微信ID 但是无法通过搜索 搜索到此人elif msg['FromUserName'] == self.my_account['UserName']:  # Selfmsg_type_id = 1user['name'] = 'self'elif msg['ToUserName'] == 'filehelper':  # File Helpermsg_type_id = 2user['name'] = 'file_helper'elif msg['FromUserName'][:2] == '@@':  # Groupmsg_type_id = 3user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))elif self.is_contact(msg['FromUserName']):  # Contactmsg_type_id = 4user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))elif self.is_public(msg['FromUserName']):  # Publicmsg_type_id = 5user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))elif self.is_special(msg['FromUserName']):  # Specialmsg_type_id = 6user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))else:msg_type_id = 99user['name'] = 'unknown'if not user['name']:user['name'] = 'unknown'user['name'] = HTMLParser.HTMLParser().unescape(user['name'])if self.DEBUG and msg_type_id != 0:print u'[MSG] %s:' % user['name']content = self.extract_msg_content(msg_type_id, msg)message = {'msg_type_id': msg_type_id,'msg_id': msg['MsgId'],'content': content,'to_user_id': msg['ToUserName'],'user': user}self.handle_msg_all(message)def schedule(self):"""做任务型事情的函数,如果需要,可以在子类中覆盖此函数此函数在处理消息的间隙被调用,请不要长时间阻塞此函数"""passdef proc_msg(self):self.test_sync_check()self.status = 'loginsuccess'  #WxbotManage使用while True:if self.status == 'wait4loginout':  #WxbotManage使用return check_time = time.time()try:[retcode, selector] = self.sync_check()# print '[DEBUG] sync_check:', retcode, selectorif retcode == '1100':  # 从微信客户端上登出breakelif retcode == '1101':  # 从其它设备上登了网页微信breakelif retcode == '0':if selector == '2':  # 有新消息r = self.sync()if r is not None:self.handle_msg(r)elif selector == '3':  # 未知r = self.sync()if r is not None:self.handle_msg(r)elif selector == '4':  # 通讯录更新r = self.sync()if r is not None:self.get_contact()elif selector == '6':  # 可能是红包r = self.sync()if r is not None:self.handle_msg(r)elif selector == '7':  # 在手机上操作了微信r = self.sync()if r is not None:self.handle_msg(r)elif selector == '0':  # 无事件passelse:print '[DEBUG] sync_check:', retcode, selectorr = self.sync()if r is not None:self.handle_msg(r)else:print '[DEBUG] sync_check:', retcode, selectortime.sleep(10)self.schedule()except:print '[ERROR] Except in proc_msg'print format_exc()check_time = time.time() - check_timeif check_time < 0.8:time.sleep(1 - check_time)def apply_useradd_requests(self,RecommendInfo):url = self.base_uri + '/webwxverifyuser?r='+str(int(time.time()))+'&lang=zh_CN'params = {"BaseRequest": self.base_request,"Opcode": 3,"VerifyUserListSize": 1,"VerifyUserList": [{"Value": RecommendInfo['UserName'],"VerifyUserTicket": RecommendInfo['Ticket']             }],"VerifyContent": "","SceneListCount": 1,"SceneList": [33],"skey": self.skey}headers = {'content-type': 'application/json; charset=UTF-8'}data = json.dumps(params, ensure_ascii=False).encode('utf8')try:r = self.session.post(url, data=data, headers=headers)except (ConnectionError, ReadTimeout):return Falsedic = r.json()return dic['BaseResponse']['Ret'] == 0def add_groupuser_to_friend_by_uid(self,uid,VerifyContent):"""主动向群内人员打招呼,提交添加好友请求uid-群内人员得uid   VerifyContent-好友招呼内容慎用此接口!封号后果自负!慎用此接口!封号后果自负!慎用此接口!封号后果自负!"""if self.is_contact(uid):return Trueurl = self.base_uri + '/webwxverifyuser?r='+str(int(time.time()))+'&lang=zh_CN'params ={"BaseRequest": self.base_request,"Opcode": 2,"VerifyUserListSize": 1,"VerifyUserList": [{"Value": uid,"VerifyUserTicket": ""}],"VerifyContent": VerifyContent,"SceneListCount": 1,"SceneList": [33],"skey": self.skey}headers = {'content-type': 'application/json; charset=UTF-8'}data = json.dumps(params, ensure_ascii=False).encode('utf8')try:r = self.session.post(url, data=data, headers=headers)except (ConnectionError, ReadTimeout):return Falsedic = r.json()return dic['BaseResponse']['Ret'] == 0def add_friend_to_group(self,uid,group_name):"""将好友加入到群聊中"""gid = ''#通过群名获取群id,群没保存到通讯录中的话无法添加哦for group in self.group_list:if group['NickName'] == group_name:gid = group['UserName']if gid == '':return False#获取群成员数量并判断邀请方式group_num=len(self.group_members[gid])print '[DEBUG] group_name:%s group_num:%s' % (group_name,group_num)#通过群id判断uid是否在群中for user in self.group_members[gid]:if user['UserName'] == uid:#已经在群里面了,不用加了return Trueif group_num<=100:url = self.base_uri + '/webwxupdatechatroom?fun=addmember&pass_ticket=%s' % self.pass_ticketparams ={"AddMemberList": uid,"ChatRoomName": gid,"BaseRequest": self.base_request}else:url = self.base_uri + '/webwxupdatechatroom?fun=invitemember'params ={"InviteMemberList": uid,"ChatRoomName": gid,"BaseRequest": self.base_request}headers = {'content-type': 'application/json; charset=UTF-8'}data = json.dumps(params, ensure_ascii=False).encode('utf8')try:r = self.session.post(url, data=data, headers=headers)except (ConnectionError, ReadTimeout):return Falsedic = r.json()return dic['BaseResponse']['Ret'] == 0def invite_friend_to_group(self,uid,group_name):"""将好友加入到群中。对人数多的群,需要调用此方法。拉人时,可以先尝试使用add_friend_to_group方法,当调用失败(Ret=1)时,再尝试调用此方法。"""gid = ''# 通过群名获取群id,群没保存到通讯录中的话无法添加哦for group in self.group_list:if group['NickName'] == group_name:gid = group['UserName']if gid == '':return False# 通过群id判断uid是否在群中for user in self.group_members[gid]:if user['UserName'] == uid:# 已经在群里面了,不用加了return Trueurl = self.base_uri + '/webwxupdatechatroom?fun=invitemember&pass_ticket=%s' % self.pass_ticketparams = {"InviteMemberList": uid,"ChatRoomName": gid,"BaseRequest": self.base_request}headers = {'content-type': 'application/json; charset=UTF-8'}data = json.dumps(params, ensure_ascii=False).encode('utf8')try:r = self.session.post(url, data=data, headers=headers)except (ConnectionError, ReadTimeout):return Falsedic = r.json()return dic['BaseResponse']['Ret'] == 0def delete_user_from_group(self,uname,gid):"""将群用户从群中剔除,只有群管理员有权限"""uid = ""for user in self.group_members[gid]:if user['NickName'] == uname:uid = user['UserName']if uid == "":return Falseurl = self.base_uri + '/webwxupdatechatroom?fun=delmember&pass_ticket=%s' % self.pass_ticketparams ={"DelMemberList": uid,"ChatRoomName": gid,"BaseRequest": self.base_request}headers = {'content-type': 'application/json; charset=UTF-8'}data = json.dumps(params, ensure_ascii=False).encode('utf8')try:r = self.session.post(url, data=data, headers=headers)except (ConnectionError, ReadTimeout):return Falsedic = r.json()return dic['BaseResponse']['Ret'] == 0def set_group_name(self,gid,gname):"""设置群聊名称"""url = self.base_uri + '/webwxupdatechatroom?fun=modtopic&pass_ticket=%s' % self.pass_ticketparams ={"NewTopic": gname,"ChatRoomName": gid,"BaseRequest": self.base_request}headers = {'content-type': 'application/json; charset=UTF-8'}data = json.dumps(params, ensure_ascii=False).encode('utf8')try:r = self.session.post(url, data=data, headers=headers)except (ConnectionError, ReadTimeout):return Falsedic = r.json()return dic['BaseResponse']['Ret'] == 0def send_msg_by_uid(self, word, dst='filehelper'):url = self.base_uri + '/webwxsendmsg?pass_ticket=%s' % self.pass_ticketmsg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '')word = self.to_unicode(word)params = {'BaseRequest': self.base_request,'Msg': {"Type": 1,"Content": word,"FromUserName": self.my_account['UserName'],"ToUserName": dst,"LocalID": msg_id,"ClientMsgId": msg_id}}headers = {'content-type': 'application/json; charset=UTF-8'}data = json.dumps(params, ensure_ascii=False).encode('utf8')try:r = self.session.post(url, data=data, headers=headers)except (ConnectionError, ReadTimeout):return Falsedic = r.json()return dic['BaseResponse']['Ret'] == 0def upload_media(self, fpath, is_img=False):if not os.path.exists(fpath):print '[ERROR] File not exists.'return Noneurl_1 = 'https://file.'+self.base_host+'/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json'url_2 = 'https://file2.'+self.base_host+'/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json'flen = str(os.path.getsize(fpath))ftype = mimetypes.guess_type(fpath)[0] or 'application/octet-stream'files = {'id': (None, 'WU_FILE_%s' % str(self.file_index)),'name': (None, os.path.basename(fpath)),'type': (None, ftype),'lastModifiedDate': (None, time.strftime('%m/%d/%Y, %H:%M:%S GMT+0800 (CST)')),'size': (None, flen),'mediatype': (None, 'pic' if is_img else 'doc'),'uploadmediarequest': (None, json.dumps({'BaseRequest': self.base_request,'ClientMediaId': int(time.time()),'TotalLen': flen,'StartPos': 0,'DataLen': flen,'MediaType': 4,})),'webwx_data_ticket': (None, self.session.cookies['webwx_data_ticket']),'pass_ticket': (None, self.pass_ticket),'filename': (os.path.basename(fpath), open(fpath, 'rb'),ftype.split('/')[1]),}self.file_index += 1try:r = self.session.post(url_1, files=files)if json.loads(r.text)['BaseResponse']['Ret'] != 0:# 当file返回值不为0时则为上传失败,尝试第二服务器上传r = self.session.post(url_2, files=files)if json.loads(r.text)['BaseResponse']['Ret'] != 0:print '[ERROR] Upload media failure.'return Nonemid = json.loads(r.text)['MediaId']return midexcept Exception,e:return Nonedef send_file_msg_by_uid(self, fpath, uid):mid = self.upload_media(fpath)if mid is None or not mid:return Falseurl = self.base_uri + '/webwxsendappmsg?fun=async&f=json&pass_ticket=' + self.pass_ticketmsg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '')data = {'BaseRequest': self.base_request,'Msg': {'Type': 6,'Content': ("<appmsg appid='wxeb7ec651dd0aefa9' sdkver=''><title>%s</title><des></des><action></action><type>6</type><content></content><url></url><lowurl></lowurl><appattach><totallen>%s</totallen><attachid>%s</attachid><fileext>%s</fileext></appattach><extinfo></extinfo></appmsg>" % (os.path.basename(fpath).encode('utf-8'), str(os.path.getsize(fpath)), mid, fpath.split('.')[-1])).encode('utf8'),'FromUserName': self.my_account['UserName'],'ToUserName': uid,'LocalID': msg_id,'ClientMsgId': msg_id, }, }try:r = self.session.post(url, data=json.dumps(data))res = json.loads(r.text)if res['BaseResponse']['Ret'] == 0:return Trueelse:return Falseexcept Exception,e:return Falsedef send_img_msg_by_uid(self, fpath, uid):mid = self.upload_media(fpath, is_img=True)if mid is None:return Falseurl = self.base_uri + '/webwxsendmsgimg?fun=async&f=json'data = {'BaseRequest': self.base_request,'Msg': {'Type': 3,'MediaId': mid,'FromUserName': self.my_account['UserName'],'ToUserName': uid,'LocalID': str(time.time() * 1e7),'ClientMsgId': str(time.time() * 1e7), }, }if fpath[-4:] == '.gif':url = self.base_uri + '/webwxsendemoticon?fun=sys'data['Msg']['Type'] = 47data['Msg']['EmojiFlag'] = 2try:r = self.session.post(url, data=json.dumps(data))res = json.loads(r.text)if res['BaseResponse']['Ret'] == 0:return Trueelse:return Falseexcept Exception,e:return Falsedef get_user_id(self, name):if name == '':return Nonename = self.to_unicode(name)for contact in self.contact_list:if 'RemarkName' in contact and contact['RemarkName'] == name:return contact['UserName']elif 'NickName' in contact and contact['NickName'] == name:return contact['UserName']elif 'DisplayName' in contact and contact['DisplayName'] == name:return contact['UserName']for group in self.group_list:if 'RemarkName' in group and group['RemarkName'] == name:return group['UserName']if 'NickName' in group and group['NickName'] == name:return group['UserName']if 'DisplayName' in group and group['DisplayName'] == name:return group['UserName']return ''def send_msg(self, name, word, isfile=False):uid = self.get_user_id(name)if uid is not None:if isfile:with open(word, 'r') as f:result = Truefor line in f.readlines():line = line.replace('\n', '')print '-> ' + name + ': ' + lineif self.send_msg_by_uid(line, uid):passelse:result = Falsetime.sleep(1)return resultelse:word = self.to_unicode(word)if self.send_msg_by_uid(word, uid):return Trueelse:return Falseelse:if self.DEBUG:print '[ERROR] This user does not exist .'return True@staticmethoddef search_content(key, content, fmat='attr'):if fmat == 'attr':pm = re.search(key + '\s?=\s?"([^"<]+)"', content)if pm:return pm.group(1)elif fmat == 'xml':pm = re.search('<{0}>([^<]+)</{0}>'.format(key), content)if pm:return pm.group(1)return 'unknown'def run(self):try:self.get_uuid()self.gen_qr_code(os.path.join(self.temp_pwd,'wxqr.png'))print '[INFO] Please use WeChat to scan the QR code .'result = self.wait4login()if result != SUCCESS:print '[ERROR] Web WeChat login failed. failed code=%s' % (result,)self.status = 'loginout'returnif self.login():print '[INFO] Web WeChat login succeed .'else:print '[ERROR] Web WeChat login failed .'self.status = 'loginout'returnif self.init():print '[INFO] Web WeChat init succeed .'else:print '[INFO] Web WeChat init failed'self.status = 'loginout'returnself.status_notify()if self.get_contact():print '[INFO] Get %d contacts' % len(self.contact_list)print '[INFO] Start to process messages .'self.proc_msg()self.status = 'loginout'except Exception,e:print '[ERROR] Web WeChat run failed --> %s'%(e)self.status = 'loginout'def get_uuid(self):url = 'https://login.weixin.qq.com/jslogin'params = {'appid': 'wx782c26e4c19acffb','fun': 'new','lang': 'zh_CN','_': int(time.time()) * 1000 + random.randint(1, 999),}r = self.session.get(url, params=params)r.encoding = 'utf-8'data = r.textregx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"'pm = re.search(regx, data)if pm:code = pm.group(1)self.uuid = pm.group(2)return code == '200'return Falsedef gen_qr_code(self, qr_file_path):string = 'https://login.weixin.qq.com/l/' + self.uuidqr = pyqrcode.create(string)if self.conf['qr'] == 'png':qr.png(qr_file_path, scale=8)show_image(qr_file_path)# img = Image.open(qr_file_path)# img.show()elif self.conf['qr'] == 'tty':print(qr.terminal(quiet_zone=1))def do_request(self, url):r = self.session.get(url)r.encoding = 'utf-8'data = r.textparam = re.search(r'window.code=(\d+);', data)code = param.group(1)return code, datadef wait4login(self):"""http comet:tip=1, 等待用户扫描二维码,201: scaned408: timeouttip=0, 等待用户确认登录,200: confirmed"""LOGIN_TEMPLATE = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s'tip = 1try_later_secs = 1MAX_RETRY_TIMES = 10code = UNKONWNretry_time = MAX_RETRY_TIMESwhile retry_time > 0:url = LOGIN_TEMPLATE % (tip, self.uuid, int(time.time()))code, data = self.do_request(url)if code == SCANED:print '[INFO] Please confirm to login .'tip = 0elif code == SUCCESS:  # 确认登录成功param = re.search(r'window.redirect_uri="(\S+?)";', data)redirect_uri = param.group(1) + '&fun=new'self.redirect_uri = redirect_uriself.base_uri = redirect_uri[:redirect_uri.rfind('/')]temp_host = self.base_uri[8:]self.base_host = temp_host[:temp_host.find("/")]return codeelif code == TIMEOUT:print '[ERROR] WeChat login timeout. retry in %s secs later...' % (try_later_secs,)tip = 1  # 重置retry_time -= 1time.sleep(try_later_secs)else:print ('[ERROR] WeChat login exception return_code=%s. retry in %s secs later...' %(code, try_later_secs))tip = 1retry_time -= 1time.sleep(try_later_secs)return codedef login(self):if len(self.redirect_uri) < 4:print '[ERROR] Login failed due to network problem, please try again.'return Falser = self.session.get(self.redirect_uri)r.encoding = 'utf-8'data = r.textdoc = xml.dom.minidom.parseString(data)root = doc.documentElementfor node in root.childNodes:if node.nodeName == 'skey':self.skey = node.childNodes[0].dataelif node.nodeName == 'wxsid':self.sid = node.childNodes[0].dataelif node.nodeName == 'wxuin':self.uin = node.childNodes[0].dataelif node.nodeName == 'pass_ticket':self.pass_ticket = node.childNodes[0].dataif '' in (self.skey, self.sid, self.uin, self.pass_ticket):return Falseself.base_request = {'Uin': self.uin,'Sid': self.sid,'Skey': self.skey,'DeviceID': self.device_id,}return Truedef init(self):url = self.base_uri + '/webwxinit?r=%i&lang=en_US&pass_ticket=%s' % (int(time.time()), self.pass_ticket)params = {'BaseRequest': self.base_request}r = self.session.post(url, data=json.dumps(params))r.encoding = 'utf-8'dic = json.loads(r.text)self.sync_key = dic['SyncKey']self.my_account = dic['User']self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val'])for keyVal in self.sync_key['List']])return dic['BaseResponse']['Ret'] == 0def status_notify(self):url = self.base_uri + '/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' % self.pass_ticketself.base_request['Uin'] = int(self.base_request['Uin'])params = {'BaseRequest': self.base_request,"Code": 3,"FromUserName": self.my_account['UserName'],"ToUserName": self.my_account['UserName'],"ClientMsgId": int(time.time())}r = self.session.post(url, data=json.dumps(params))r.encoding = 'utf-8'dic = json.loads(r.text)return dic['BaseResponse']['Ret'] == 0def test_sync_check(self):for host1 in ['webpush.', 'webpush2.']:self.sync_host = host1+self.base_hosttry:retcode = self.sync_check()[0]except:retcode = -1if retcode == '0':return Truereturn Falsedef sync_check(self):params = {'r': int(time.time()),'sid': self.sid,'uin': self.uin,'skey': self.skey,'deviceid': self.device_id,'synckey': self.sync_key_str,'_': int(time.time()),}url = 'https://' + self.sync_host + '/cgi-bin/mmwebwx-bin/synccheck?' + urllib.urlencode(params)try:r = self.session.get(url, timeout=60)r.encoding = 'utf-8'data = r.textpm = re.search(r'window.synccheck=\{retcode:"(\d+)",selector:"(\d+)"\}', data)retcode = pm.group(1)selector = pm.group(2)return [retcode, selector]except:return [-1, -1]def sync(self):url = self.base_uri + '/webwxsync?sid=%s&skey=%s&lang=en_US&pass_ticket=%s' \% (self.sid, self.skey, self.pass_ticket)params = {'BaseRequest': self.base_request,'SyncKey': self.sync_key,'rr': ~int(time.time())}try:r = self.session.post(url, data=json.dumps(params), timeout=60)r.encoding = 'utf-8'dic = json.loads(r.text)if dic['BaseResponse']['Ret'] == 0:self.sync_key = dic['SyncKey']self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val'])for keyVal in self.sync_key['List']])return dicexcept:return Nonedef get_icon(self, uid, gid=None):"""获取联系人或者群聊成员头像:param uid: 联系人id:param gid: 群id,如果为非None获取群中成员头像,如果为None则获取联系人头像"""if gid is None:url = self.base_uri + '/webwxgeticon?username=%s&skey=%s' % (uid, self.skey)else:url = self.base_uri + '/webwxgeticon?username=%s&skey=%s&chatroomid=%s' % (uid, self.skey, self.encry_chat_room_id_list[gid])r = self.session.get(url)data = r.contentfn = 'icon_' + uid + '.jpg'with open(os.path.join(self.temp_pwd,fn), 'wb') as f:f.write(data)return fndef get_head_img(self, uid):"""获取群头像:param uid: 群uid"""url = self.base_uri + '/webwxgetheadimg?username=%s&skey=%s' % (uid, self.skey)r = self.session.get(url)data = r.contentfn = 'head_' + uid + '.jpg'with open(os.path.join(self.temp_pwd,fn), 'wb') as f:f.write(data)return fndef get_msg_img_url(self, msgid):return self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey)def get_msg_img(self, msgid):"""获取图片消息,下载图片到本地:param msgid: 消息id:return: 保存的本地图片文件路径"""url = self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey)r = self.session.get(url)data = r.contentfn = 'img_' + msgid + '.jpg'with open(os.path.join(self.temp_pwd,fn), 'wb') as f:f.write(data)return fndef get_voice_url(self, msgid):return self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey)def get_voice(self, msgid):"""获取语音消息,下载语音到本地:param msgid: 语音消息id:return: 保存的本地语音文件路径"""url = self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey)r = self.session.get(url)data = r.contentfn = 'voice_' + msgid + '.mp3'with open(os.path.join(self.temp_pwd,fn), 'wb') as f:f.write(data)return fndef get_video_url(self, msgid):return self.base_uri + '/webwxgetvideo?msgid=%s&skey=%s' % (msgid, self.skey)def get_video(self, msgid):"""获取视频消息,下载视频到本地:param msgid: 视频消息id:return: 保存的本地视频文件路径"""url = self.base_uri + '/webwxgetvideo?msgid=%s&skey=%s' % (msgid, self.skey)headers = {'Range': 'bytes=0-'}r = self.session.get(url, headers=headers)data = r.contentfn = 'video_' + msgid + '.mp4'with open(os.path.join(self.temp_pwd,fn), 'wb') as f:f.write(data)return fndef set_remarkname(self,uid,remarkname):#设置联系人的备注名url = self.base_uri + '/webwxoplog?lang=zh_CN&pass_ticket=%s' \% (self.pass_ticket)remarkname = self.to_unicode(remarkname)params = {'BaseRequest': self.base_request,'CmdId': 2,'RemarkName': remarkname,'UserName': uid}try:r = self.session.post(url, data=json.dumps(params), timeout=60)r.encoding = 'utf-8'dic = json.loads(r.text)return dic['BaseResponse']['ErrMsg']except:return None

这样我们就做好了一个微信消息处理模块,我们只要在需要的时候将其导入我们的主程序,就可以调用其相关的类和方法。

  • 接着我们要做的是去图灵机器人官网申请一个接口,也就是要有一个用于对别人消息做出自动回复的机器人。注册申请成功后,将其key记录下来。用记事本编写配置文件conf.ini,以供程序调用。
[main]
key=31fd87ea28a0e6cc774ef7913d4499c1

这里的key要换成你自己的。将这个文件放在和主程序相同的目录下。

  • 编写主函数如下:
#!/usr/bin/env python
# coding: utf-8from wxbot import *#导入类函数
import ConfigParser
import jsonclass TulingWXBot(WXBot):#图灵key的读入def __init__(self):WXBot.__init__(self)self.tuling_key = ""self.robot_switch = Truetry:cf = ConfigParser.ConfigParser()cf.read('conf.ini')self.tuling_key = cf.get('main', 'key')#对配置文件写入except Exception:passprint 'tuling_key:', self.tuling_keydef tuling_auto_reply(self, uid, msg):if self.tuling_key:url = "http://www.tuling123.com/openapi/api"user_id = uid.replace('@', '')[:30]body = {'key': self.tuling_key, 'info': msg.encode('utf8'), 'userid': user_id}r = requests.post(url, data=body)respond = json.loads(r.text)result = ''if respond['code'] == 100000:result = respond['text'].replace('<br>', '  ')elif respond['code'] == 200000:result = respond['url']else:result = respond['text'].replace('<br>', '  ')print '    ROBOT:', resultreturn resultelse:return u"有点忙,回聊哦。"def auto_switch(self, msg):msg_data = msg['content']['data']stop_cmd = [u'退下', u'走开', u'关闭', u'关掉', u'休息', u'滚开']start_cmd = [u'出来', u'启动', u'工作']if self.robot_switch:for i in stop_cmd:if i == msg_data:self.robot_switch = Falseself.send_msg_by_uid(u'[Robot]' + u'机器人已关闭!', msg['to_user_id'])else:for i in start_cmd:if i == msg_data:self.robot_switch = Trueself.send_msg_by_uid(u'[Robot]' + u'机器人已开启!', msg['to_user_id'])def handle_msg_all(self, msg):if not self.robot_switch and msg['msg_type_id'] != 1:returnif msg['msg_type_id'] == 1 and msg['content']['type'] == 0:  # reply to selfself.auto_switch(msg)elif msg['msg_type_id'] == 4 and msg['content']['type'] == 0:  # text message from contactself.send_msg_by_uid(self.tuling_auto_reply(msg['user']['id'], msg['content']['data']), msg['user']['id'])elif msg['msg_type_id'] == 3 and msg['content']['type'] == 0:  # group text messageif 'detail' in msg['content']:my_names = self.get_group_member_name(self.my_account['UserName'], msg['user']['id'])if my_names is None:my_names = {}if 'NickName' in self.my_account and self.my_account['NickName']:my_names['nickname2'] = self.my_account['NickName']if 'RemarkName' in self.my_account and self.my_account['RemarkName']:my_names['remark_name2'] = self.my_account['RemarkName']is_at_me = Falsefor detail in msg['content']['detail']:if detail['type'] == 'at':for k in my_names:if my_names[k] and my_names[k] == detail['value']:is_at_me = Truebreakif is_at_me:src_name = msg['content']['user']['name']reply = 'to ' + src_name + ': 'if msg['content']['type'] == 0:  # text messagereply += self.tuling_auto_reply(msg['content']['user']['id'], msg['content']['desc'])else:reply += u"对不起,读的书少,不认识你发的乱七八糟的东西。"self.send_msg_by_uid(reply, msg['user']['id'])def main():bot = TulingWXBot()bot.DEBUG = Truebot.conf['qr'] = 'png'bot.run()if __name__ == '__main__':main()

摁F5,轻松执行。

tip: 如果主程序调入不了ini文中中的key,可以直接吧key黏贴到主程序对应的位置,这样就不用那个key文件了。代码如下:

#!/usr/bin/env python
# coding: utf-8from wxbot import *#导入类函数
import ConfigParser
import json
import timeclass TulingWXBot(WXBot):#图灵key的读入def __init__(self):WXBot.__init__(self)self.tuling_key = "a49d20e989274afaa455fd18d747d0ba"self.robot_switch = Trueprint 'tuling_key:', self.tuling_keydef tuling_auto_reply(self, uid, msg):if self.tuling_key:url = "http://www.tuling123.com/openapi/api"user_id = uid.replace('@', '')[:30]body = {'key': self.tuling_key, 'info': msg.encode('utf8'), 'userid': user_id}r = requests.post(url, data=body)respond = json.loads(r.text)result = ''if respond['code'] == 100000:result = respond['text'].replace('<br>', '  ')result = result.replace(u'\xa0', u' ')elif respond['code'] == 200000:result = respond['url']elif respond['code'] == 302000:for k in respond['list']:result = result + u"【" + k['source'] + u"】 " +\k['article'] + "\t" + k['detailurl'] + "\n"else:result = respond['text'].replace('<br>', '  ')result = result.replace(u'\xa0', u' ')print '    ROBOT:', resultreturn resultelse:return u"有点忙,回聊哦。"def auto_switch(self, msg):msg_data = msg['content']['data']stop_cmd = [u'退下', u'走开', u'关闭', u'关掉', u'休息', u'滚开']start_cmd = [u'出来', u'启动', u'工作']if self.robot_switch:for i in stop_cmd:if i == msg_data:self.robot_switch = Falseself.send_msg_by_uid(u'[Robot]' + u'机器人已关闭!', msg['to_user_id'])else:for i in start_cmd:if i == msg_data:self.robot_switch = Trueself.send_msg_by_uid(u'[Robot]' + u'机器人已开启!', msg['to_user_id'])def handle_msg_all(self, msg):if not self.robot_switch and msg['msg_type_id'] != 1:returnif msg['msg_type_id'] == 1 and msg['content']['type'] == 0:  # reply to selfself.auto_switch(msg)elif msg['msg_type_id'] == 4 or msg['msg_type_id'] == 5 and msg['content']['type'] == 0:  # text message from contacttime.sleep(3)self.send_msg_by_uid(self.tuling_auto_reply(msg['user']['id'], msg['content']['data']), msg['user']['id'])elif msg['msg_type_id'] == 3 and msg['content']['type'] == 0:  # group text messageif 'detail' in msg['content']:my_names = self.get_group_member_name(self.my_account['UserName'], msg['user']['id'])if my_names is None:my_names = {}if 'NickName' in self.my_account and self.my_account['NickName']:my_names['nickname2'] = self.my_account['NickName']if 'RemarkName' in self.my_account and self.my_account['RemarkName']:my_names['remark_name2'] = self.my_account['RemarkName']is_at_me = Falsefor detail in msg['content']['detail']:if detail['type'] == 'at':for k in my_names:if my_names[k] and my_names[k] == detail['value']:is_at_me = Truebreakif is_at_me:src_name = msg['content']['user']['name']reply = 'to ' + src_name + ': 'if msg['content']['type'] == 0:  # text messagereply += self.tuling_auto_reply(msg['content']['user']['id'], msg['content']['desc'])else:reply += u"对不起,读的书少,不认识你发的乱七八糟的东西。"self.send_msg_by_uid(reply, msg['user']['id'])def main():bot = TulingWXBot()bot.DEBUG = Truebot.conf['qr'] = 'png'bot.run()if __name__ == '__main__':main()

wxBot 是用Python包装Web微信协议实现的微信机器人框架。

目前的消息支持情况:

  • [ ] 群消息

    • [x] 文本
    • [x] 图片
    • [x] 地理位置
    • [x] 个人名片
    • [x] 语音
    • [x] 动画
    • [ ] 语音电话
    • [ ] 红包
  • [ ] 联系人消息

    • [x] 文本
    • [x] 图片
    • [x] 地理位置
    • [x] 个人名片
    • [x] 语音
    • [x] 小视频
    • [x] 动画
    • [ ] 视频电话
    • [ ] 红包
    • [ ] 转账

Web微信协议参考资料:

挖掘微信Web版通信的全过程

微信协议简单调研笔记

qwx: WeChat Qt frontend 微信Qt前端

1 环境与依赖

此版本只能运行于Python 2环境 。

wxBot 用到了Python requests , pypng , Pillow* 以及 **pyqrcode 库。

使用之前需要所依赖的库:

pip install requests
pip install pyqrcode
pip install pypng
pip install Pillow

2 快速开发

利用 wxBot 最简单的方法就是继承WXBot类并实现 handle_msg_all 或者 schedule 函数,然后实例化子类并调用 run 方法 。

2.1 代码

以下的代码对所有来自好友的文本消息回复 hi , 并不断向好友 tb 发送 schedule

handle_msg_all 函数用于处理收到的每条消息,而 schedule 函数可以做一些任务性的工作(例如不断向好友推送信息或者一些定时任务)。

#!/usr/bin/env python
# coding: utf-8import time
from wxbot import *class MyWXBot(WXBot):def handle_msg_all(self, msg):if msg['msg_type_id'] == 4 and msg['content']['type'] == 0:self.send_msg_by_uid('hi', msg['user']['id'])def schedule(self):self.send_msg('tb', 'schedule')time.sleep(1)def main():bot = MyWXBot()bot.DEBUG = Truebot.run()if __name__ == '__main__':main()

2.2 运行

直接用 python 运行代码(如运行测试代码 test.py ):

python test.py

2.3 登录微信

程序运行之后,会在当前目录下生成二维码图片文件 qr.png 并自动打开,用微信扫描此二维码并按操作指示确认登录网页微信。

如果运行在Linux下,还可以通过设置 WXBot 对象的 conf['qr']tty 的方式直接在终端打印二维码(此方法只能在Linux终端下使用),效果如下:

3 接口
3.1 handle_msg_all

handle_msg_all 函数的参数 msg 是代表一条消息的字典。字段的内容为:

字段名 字段内容
msg_type_id 整数,消息类型,具体解释可以查看 消息类型表
msg_id 字符串,消息id
content 字典,消息内容,具体含有的字段请参考 消息类型表 ,一般含有 type(数据类型)与 data(数据内容)字段,typedata的对应关系可以参考 数据类型表
user 字典,消息来源,字典包含 name(发送者名称,如果是群则为群名称,如果为微信号,有备注则为备注名,否则为微信号或者群昵称)字段与 id(发送者id)字段,都是字符串

3.2 消息类型表

类型号 消息类型 content
0 初始化消息,内部数据 无意义,可以忽略
1 自己发送的消息 无意义,可以忽略
2 文件消息 字典,包含 typedata 字段
3 群消息 字典, 包含 user (字典,包含 idname字段,都是字符串,表示发送此消息的群用户)与 typedata 字段,红包消息只有 type 字段, 文本消息还有detail、desc字段, 参考 群文本消息
4 联系人消息 字典,包含 typedata 字段
5 公众号消息 字典,包含 typedata 字段
6 特殊账号消息 字典,包含 typedata 字段
99 未知账号消息 无意义,可以忽略

3.3 数据类型表

type 数据类型 data
0 文本 字符串,表示文本消息的具体内容
1 地理位置 字符串,表示地理位置
3 图片 字符串,图片数据的url,HTTP POST请求此url可以得到jpg文件格式的数据
4 语音 字符串,语音数据的url,HTTP POST请求此url可以得到mp3文件格式的数据
5 名片 字典,包含 nickname (昵称), alias (别名),province (省份),city (城市), gender (性别)字段
6 动画 字符串, 动画url, HTTP POST请求此url可以得到gif文件格式的数据
7 分享 字典,包含 type (类型),title (标题),desc (描述),url (链接),from (源网站)字段
8 视频 不可用
9 视频电话 不可用
10 撤回消息 不可用
11 空内容 空字符串
12 红包 不可用
99 未知类型 不可用

3.4 群文本消息

由于群文本消息中可能含有@信息,因此群文本消息的 content 字典除了含有 typedata 字段外,还含有 detaildesc 字段。

各字段内容为:

字段 内容
type 数据类型, 为0(文本)
data 字符串,消息内容,含有@信息
desc 字符串,删除了所有@信息
detail 数组,元素类型为含有 typevalue 字段的字典, type 为字符串 str (表示元素为普通字符串,此时value为消息内容) 或 at (表示元素为@信息, 此时value为所@的用户名)

3.5 WXBot对象属性

WXBot 对象在登录并初始化之后,含有以下的可用数据:

属性 描述
contact_list 当前用户的微信联系人列表
group_list 当前用户的微信群列表
public_list 当前用户关注的公众号列表
special_list 特殊账号列表
session WXBot 与WEB微信服务器端交互所用的 Requests Session 对象

3.6 WXBot对象方法

WXBot 对象还含有一些可以利用的方法

方法 描述
get_icon(id) 获取用户icon并保存到本地文件 img_[id].jpg , id 为用户id(Web微信数据)
get_head_img(id) 获取用户头像并保存到本地文件 img_[id].jpgid 为用户id(Web微信数据)
get_msg_img(msgid) 获取图像消息并保存到本地文件 img_[msgid].jpg , msgid 为消息id(Web微信数据)
get_voice(msgid) 获取语音消息并保存到本地文件 voice_[msgid].mp3 , msgid 为消息id(Web微信数据)
get_contact_name(uid) 获取微信id对应的名称,返回一个可能包含 remark_name (备注名), nickname (昵称), display_name (群名称)的字典
send_msg_by_uid(word, dst) 向好友发送消息,word 为消息字符串,dst 为好友用户id(Web微信数据)
send_msg(name, word, isfile) 向好友发送消息,name 为好友的备注名或者好友微信号, isfileFalseword 为消息,isfileTrueword 为文件路径(此时向好友发送文件里的每一行),此方法在有重名好友时会有问题,因此更推荐使用 send_msg_by_uid(word, dst)
is_contact(uid) 判断id为 uid 的账号是否是本帐号的好友,返回 True (是)或 False (不是)
is_public(uid) 判断id为 uid 的账号是否是本帐号所关注的公众号,返回 True (是)或 False (不是)

4 群聊机器人示例

bot.py图灵机器人 API 以及 wxBot 实现了一个自动回复机器人.

此机器人会回复来自联系人的消息,以及群里@此账号的消息。

并且本帐号可以通过发送 退下走开关闭关掉休息滚开 来关闭机器人的自动回复。

也可以通过发送 出来启动工作 来再次开启机器人的自动回复。

群聊时需要将对应的群保存到联系人列表。

群聊实现效果:

bot.py 的运行方法:

  • 要接入图灵机器人API时:

    1. 在图灵机器人官网注册账号,申请图灵key: 图灵key申请地址

    2. bot.py 文件所在目录下新建 conf.ini 文件,内容为:(key字段内容为申请到的图灵key)

    [main]
    key=1d2678900f734aa0a23734ace8aec5b1
    1. 运行 bot.py
    python bot.py
  • 不接入图灵机器人API时(此时机器人对联系人消息以及群里@自己的消息统一回复 知道了 ):

    1. 运行 bot.py
    python bot.py

5 帮助项目

欢迎对本项目提意见、贡献代码,参考: 如何帮助项目

微信接入机器人实现对别人消息和群at消息的自动回复相关推荐

  1. python自动回复机器人手机版_GitHub - HZQHZA/wxpy: Python 写 微信聊天 根据 自动回复 接入机器人 等等.......

    wxpy Python 写 微信聊天 根据 自动回复 接入机器人 等等.... wxpy: 用 Python 玩微信 #项目介绍 wxpy登录就给好友发消息 发图片   自动回复信息   添加好友自动 ...

  2. 【chrome插件】web版微信接入图灵机器人API实现自动回复

    小贱鸡自动回复API已经不可以用了,现在改良接入图灵机器人API 360chrome浏览器团队翻译了部分谷歌插件开发文档 地址:http://open.chrome.360.cn/extension_ ...

  3. 微信接入智能机器人回复消息

    微信接入智能机器人回复消息 1 寻找智能机器人API 此处我使用的是极速数据 七款不错的聊天机器人API推荐 ①登录注册之后,直接搜索机器人 ②申请数据 ③在个人中心,找到自己的appKey ④编写代 ...

  4. 企业微信接入群聊机器人详细步骤

    目录 一. 创建群机器人 二.机器人配置 三.机器人信息推送 四.线上使用 五.推送效果 一. 创建群机器人 先选择一个企业微信群 右键添加机器人 完善机器人的头像.名称即可 二.机器人配置 查看生成 ...

  5. 微信接入探秘(三)——加密消息的处理

    本文出处:http://blog.csdn.net/chaijunkun/article/details/53435972,转载请注明.由于本人不定期会整理相关博文,会对相应内容作出完善.因此强烈建议 ...

  6. Python使用微信接入图灵机器人

    1.wxpy库介绍 wxpy 在 itchat 的基础上,通过大量接口优化提升了模块的易用性,并进行丰富的功能扩展. 文档地址:https://wxpy.readthedocs.io 从 PYPI 官 ...

  7. JAVA实现 springMVC方式的微信接入、实现消息自动回复

    前段时间小忙了一阵,微信公众号的开发,从零开始看文档,踩了不少坑,也算是熬过来了,最近考虑做一些总结,方便以后再开发的时候回顾,也给正在做相关项目的同学做个参考. 思路 微信接入:用户消息和开发者需要 ...

  8. python 微信聊天机器人_python操作微信自动发消息的实现(微信聊天机器人)

    前言 最近在学习python,发现一个微信自动发消息的小demo感觉很有意思,试了一下,不成功,因为demo中用的是itchat这个库来操作微信,而这个库是通过微信网页版来操作微信的,现在微信网页版已 ...

  9. 微信公众平台接入机器人小黄鸡

    微信公众平台接口开放后,公司也想接入机器人,申请的免费小黄鸡key可以免费试用30天,但每天请求次数有限制,抓包模拟访问官网的在线试用.由于答案的不可预知,接入后回答的答案不太理想,用来过滤规则太多, ...

最新文章

  1. 线性代数:04 特征值与特征向量 -- 特征值与特征向量
  2. Python中scrapy下载保存图片
  3. Springboot项目中配置tomcta监控日志
  4. 【电子信息复试】考研复试常考问题——数据库
  5. 她在哭,但我没资格安慰她......​
  6. C/C++构造及析构顺序及变量的生命周期
  7. java房产源码_基于jsp的房屋交易管理系统-JavaEE实现房屋交易管理系统 - java项目源码...
  8. 我眼中的服务提供和服务消费
  9. 西瓜书+实战+吴恩达机器学习(一)机器学习基础(数据集划分、分类回归评估指标)
  10. Perceptual Losses for Real-Time Style Transfer and Super-Resolution
  11. lintcode:Number of Islands 岛屿的个数
  12. 手机号码正则表达式匹配
  13. Matlab常用函数表
  14. 利用ADO连接数据库时,Rs.recordcount总是返回-1,解决办法
  15. openwrt 怎么进入串口_OpenWrt路由器读取串口数据,建立tcp服务
  16. BigDecimal取整数
  17. 笔记本桌面窗口管理器占用内存过高怎么办?
  18. C语言之负数的左移/右移运算
  19. Chatbots 中对话式交互系统的分析与应用
  20. 人工智能、深度学习、机器学习常见面试题83~100

热门文章

  1. IP-guard V4 服务器迁移方法
  2. 苹果付费app共享公众号_【苹果iOS付费游戏应用帐号分享】新增一款40元iOS游戏应用共享帐号...
  3. java-不死神兔百钱百鸡
  4. Redis缓存及缓存粒度
  5. 南京沁恒推出的国产 M3 架构芯片与意法半导体 M3 芯片对比
  6. (Python)卫星RPC有理多项式模型读取与正反投影坐标计算原理与实现
  7. ETH数据集下载及相关问题
  8. 面试题:你的缺点是什么?(为难我?怎么可能)
  9. Ipoe和Pppoe,宽带认证技术
  10. 跨境物流运输方式有哪些