# coding:utf-8
import os
import threading
import time
# from com.android.monkeyrunner import MonkeyDevice as md
from com.android.monkeyrunner import MonkeyImage as mi
from com.android.monkeyrunner import MonkeyRunner as mr
# from com.android.monkeyrunner.easy import By
# from com.android.monkeyrunner.easy import EasyMonkeyDevice as emd
# 工具类 有两个方法
# getDeviceSerial:读取保存devices,并保存到devices[]
# viewServerisOpen:判断viewserver是否可用
class Tools(object):
def __init__(self, deviceSerial):
self.deviceSerial = deviceSerial
# ************************************************************************
# 判断手机 是否打开viewserver 如果打开 可以使用EasyMonkeyDevice
# ************************************************************************
def viewServerIsOpen(self):
# print(self.deviceSerial)
command = "adb -s "+self.deviceSerial
viewServer = os.system(command + " shell service call window 3")
# viewServer = subprocess.getstatusoutput(
# command + " shell service call window 3")
# print(viewServer)
# if viewServer[0] == 1:#monkeyrunner 不支持该方法
if viewServer == 1:
# print("你的手机已经打开viewServer")
return True
else:
# print("开启ViewServer")
ovs = os.system(command+" shell service call window 1 i32 4939")
# ovs = subprocess.getstatusoutput(
# command+" shell service call window 1 i32 4939")
# print(ovs)
# if ovs[0] == 1:
if ovs == 1:
# print("你的手机已经打开viewServer")
return True
else:
# print("你的手机无法打开viewserver 无法使用EasyMonkeyDevice")
return False
# ************************************************************************
# 获取所有的设备 返回所有设备的数组
# ************************************************************************
@ classmethod
def getdeviceSerial(self):
# print("获取所有的devices,并返回devices[]")
# 创建一个数组用来存放devices
devices = []
# 将所有的devices 写入devices.text
# devicesPath = str(os.getcwd())
devicesPath = "D:/PythonProject/MonkeyRunner"
os.system("adb devices > "+devicesPath+"/a3s/devices.text")
# 读取devices.text
f = open(devicesPath+"/a3s/devices.text", "r")
content = f.readlines()
i = 1 # 因为第一行没有device的信息 所以下标从1开始,而且最后一行空白的所以i要小于len-1
# print(len(content))
while i < len(content)-1:
# 找到空格的位置或者有时识别不出空客 用device
# findNumber = content[i].find(" ")
findNumber = content[i].find("device")-1
# print(findNumber)
# 截取空格之前的字符串保存到devices[]
devices.append(content[i][0:findNumber])
i += 1
# 读取所有的devices
# for device in devices:
# print(device)
return devices
# ************************************************************************
# 设置完成后截图和标准图片对比 如果一致输入all right;
# 如果不一致 输出错误 并保存截图
# ************************************************************************
def compareImages(device, imageNum, deviceNumber):
imagePath = r"D:\PythonProject\MonkeyRunner\a3s\images"
standardImage1 = mr.loadImageFromFile(
imagePath + r"\standard\supersu_1.png")
standardImage2 = mr.loadImageFromFile(
imagePath + r"\standard\supersu_2.png")
standardImage3 = mr.loadImageFromFile(
imagePath + r"\standard\supersu_3.png")
image = device.takeSnapshot().getSubImage((0, 300, 720, 750))
if imageNum == 1:
result = mi.sameAs(
image, standardImage1.getSubImage((0, 300, 720, 750)))
if result:
print("step1 is all right")
else:
imagePath = imagePath+"\\"+deviceNumber
device.takeSnapshot().writeToFile(imagePath+"_supersu_1.png")
print("step1 is wrong!!")
elif imageNum == 2:
result = mi.sameAs(
image, standardImage2.getSubImage((0, 50, 720, 750)))
if result:
print("step2 is all right")
else:
imagePath = imagePath+"\\"+deviceNumber
device.takeSnapshot().writeToFile(imagePath+"_supersu_2.png")
print("step2 is wrong!!")
else:
result = mi.sameAs(
image, standardImage3.getSubImage((0, 50, 720, 750)))
if result:
print("step3 is all right")
else:
imagePath = imagePath+"\\"+deviceNumber
device.takeSnapshot().writeToFile(imagePath+"_supersu_3.png")
print("step3 is wrong!!")
# 方法1:通过monkeydevice的installPackage 安装appp
def installApp(device):
print("begin install app by monkeyrunner")
# 安装jkt
device.installPackage("D:\\JKT\\apks\\JKT_V1.2.24-release.apk")
print("安装jkt--ok")
# 安装weike2.1.16
device.installPackage("D:\\JKT\\apks\\Weike_V2.1.16-release.apk")
print("安装weike--ok")
# 安装test
device.installPackage(
"D:\\JKT\\apks\\Weike_V2.1.10-debug-androidTest.apk")
print("安装test--ok")
# 安装wechat6.6.5
device.installPackage("D:\\JKT\\apks\\Wechat_V6.6.5.apk")
print("wechat--ok")
print("install app end ")
# 设置supersu
def setUpSperSU(device, deviceNumber):
print("***********************supersu begining**************************")
command = "eu.chainfire.supersu/eu.chainfire.supersu.MainActivity-Material"
device.startActivity(component=command)
mr.sleep(3)
# 点击设置
device.touch(600, 210, "DOWN_AND_UP")
mr.sleep(1)
# 点击重新验证
device.touch(500, 900, "DOWN_AND_UP")
mr.sleep(1)
# 点击默认操作
device.touch(200, 1100, "DOWN_AND_UP")
mr.sleep(1)
# 点击默认操作--授权
device.touch(200, 650, "DOWN_AND_UP")
mr.sleep(1)
compareImages(device, 1, deviceNumber)
# 下滑
device.drag((500, 1000), (500, 400), 2, 10)
mr.sleep(0.1)
device.drag((500, 1000), (500, 400), 2, 10)
mr.sleep(0.1)
device.drag((500, 1000), (500, 400), 2, 10)
mr.sleep(0.1)
# 点击开机允许所有授权申请
device.touch(500, 900, "DOWN_AND_UP")
mr.sleep(1)
compareImages(device, 2, deviceNumber)
# 下滑
device.drag((500, 1000), (500, 400), 2, 10)
mr.sleep(0.1)
device.drag((500, 1000), (500, 400), 2, 10)
mr.sleep(0.1)
device.drag((500, 1000), (500, 400), 2, 10)
mr.sleep(0.1)
# 点击开机允许所有授权申请
device.touch(500, 600, "DOWN_AND_UP")
mr.sleep(1)
compareImages(device, 3, deviceNumber)
print("***********************supersu end**************************")
class myThread(threading.Thread):
def __init__(self, device):
threading.Thread.__init__(self)
self.deviceSerial = device
self.device = mr.waitForConnection(1.0, device)
def run(self):
if __name__ == '__main__':
print(self.deviceSerial)
# 判断能否使用EasyMonkeyDevice
tool = Tools(self.deviceSerial)
if tool.viewServerIsOpen():
# 能使用
print("viewServer is opened!")
# easyDevice = emd(device)
else:
print("Can't open viewServer ")
# print("link device")
# device = mr.waitForConnection(1.0, temp)
os.system("adb -s " + temp +
" shell am force-stop eu.chainfire.supersu")
installApp(self.device)
setUpSperSU(self.device, self.deviceSerial)
if __name__ == '__main__':
try:
# 获取所有的devices[]
devices = Tools.getdeviceSerial()
# 创建线程组
tt = []
# 遍历devices,
for temp in devices:
# print(temp)
t = myThread(temp)
tt.append(t)
for t in tt:
t.start()
time.sleep(5)
for t in tt:
t.join()
except Exception:
print("线程运行失败")