基于ATX的app自动化
常规做APPUI自动化时,基本上都是采取的pom模式加上关键字驱动、数据驱动、实现测试数据分离。
需求是:
(1)实现基本业务流程的测试
(2)多设备同时运行
(3)自动拉取最新apk并自动安装
(4)持续集成
分析:
业务的基本流程覆盖,相信做过接口web端的自动化或者接口自动化的同学来说,实现app ui自动化其实不难,也是相同的思路pom模式、关键字驱动、数据驱动、数据分离等。
1.先来自动拉取最新apk 并且实现自动化安装。
import os
import re
import shutil
from urllib.request import urlopen # 用于获取网页
import requests
from bs4 import BeautifulSoup # 用于解析网页path = os.path.normpath(os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)))))class automatic(object):def __init__(self):self.url = 'http://10.124.106.120:28759/service/rest/repository/browse/maven-releases/tcl/release/android/tclplus/'@staticmethoddef _get_url(url):"""根据传入的url来获取超链接"""html = urlopen(url)bsObj = BeautifulSoup(html, 'html.parser')t1 = bsObj.find_all('a')return t1# 获取线上最新版本号与版本时间def get_latest_version(self):"""获取最新版本号"""t1 = self._get_url(self.url)time_version_dict = []for t2 in t1:t3 = t2.get('href')if '2.0' not in t3:# 将2.0以上的版本全部过滤掉,然后加入到列表里面time_version_dict.append(str(t3).split("/")[0].strip("..").strip("http:"))# 找出最大的版本号latest_version = max(time_version_dict)print(f"最大版本号为:{latest_version}")result_url = self.url + str(latest_version)return result_url, latest_versiondef download_apk(self):"""下载apk"""global apk_nameurls, number = self.get_latest_version()t1 = self._get_url(urls)time_version_dict = []url_list = []for t2 in t1:t3 = t2.get('href')url_list.append(t3)# 利用正则表达式找到2021开头的版本号pattern = re.compile(r'(?:2021)\d+\.?\d*')time_version_dict.append(pattern.findall(t3))Ak = max(time_version_dict)[0]for url in url_list:if str(Ak) in url:file = requests.get(url, timeout=60)apk_name = f"{number}-{max(time_version_dict)[0]}-jiagu-xtest-release.apk"with open(path + '/APK/' + apk_name, 'wb') as zip_file:zip_file.write(file.content)return apk_name
主要就是发起请求,取到最新版本号,然后获取链接地址,拿到下载地址之后,来写file.content,保存apk 即可。
2.第二步,多设备链接并自动安装
Decorator.py
import timefrom functools import wraps
from Common.BasePage import BasePage
from Common.ReportPath import ReportPath
from Common.Log import Logflag = 'IMAGE:'
log = Log()def _screenshot(name):date_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))screenshot = name + '-' + date_time + '.PNG'path = ReportPath().get_path() + '/' + screenshotdriver, sess = BasePage().get_driver()driver.screenshot(path)return screenshotdef teststep(func):@wraps(func)def wrapper(*args, **kwargs):try:log.i('\t--> %s', func.__qualname__)ret = func(*args, **kwargs)return retexcept AssertionError as e:log.e('AssertionError, %s', e)log.e('\t<-- %s, %s, %s', func.__qualname__, 'AssertionError', 'Error')if flag in str(e):raise AssertionError(e)else:raise AssertionError(flag + _screenshot(func.__qualname__))except Exception as e:log.e('Exception, %s', e)log.e('\t<-- %s, %s, %s', func.__qualname__, 'Exception', 'Error')if flag in str(e):raise Exception(e)else:raise Exception(flag + _screenshot(func.__qualname__))return wrapperdef teststeps(func):@wraps(func)def wrapper(*args, **kwargs):try:log.i(' --> %s', func.__qualname__)ret = func(*args, **kwargs)log.i(' <-- %s, %s', func.__qualname__, 'Success')return retexcept AssertionError as e:log.e('AssertionError, %s', e)log.e(' <-- %s, %s, %s', func.__qualname__, 'AssertionError', 'Error')if flag in str(e):raise AssertionError(e)else:raise AssertionError(flag + _screenshot(func.__qualname__))except Exception as e:log.e('Exception, %s', e)log.e(' <-- %s, %s, %s', func.__qualname__, 'Exception', 'Error')if flag in str(e):raise Exception(e)else:raise Exception(flag + _screenshot(func.__qualname__))return wrapperdef _wrapper(func):@wraps(func)def wrapper(*args, **kwargs):try:log.i('--> %s', func.__qualname__)ret = func(*args, **kwargs)log.i('<-- %s, %s\n', func.__qualname__, 'Success')return retexcept AssertionError as e:log.e('AssertionError, %s', e)log.e('<-- %s, %s, %s\n', func.__qualname__, 'AssertionError', 'Fail')if flag in str(e):raise AssertionError(e)else:raise AssertionError(flag + _screenshot(func.__qualname__))except Exception as e:log.e('Exception, %s', e)log.e('<-- %s, %s, %s\n', func.__qualname__, 'Exception', 'Error')if flag in str(e):raise Exception(e)else:raise Exception(flag + _screenshot(func.__qualname__))return wrapperdef testcase(func):return _wrapper(func)def setup(func):return _wrapper(func)def teardown(func):return _wrapper(func)def setupclass(func):return _wrapper(func)def teardownclass(func):return _wrapper(func)
基础定位方法封装
基础定位方法封装
BasePage.py
import time
import uiautomator2 as u2
from Common.chromedriver import ChromeDriver
from Common.Ports import Ports
from Common.ReportPath import ReportPathclass BasePage(object):@classmethoddef set_driver(cls, dri):cls.d = u2.connect(dri)cls.sess = cls.d.session("com.tcl.tclplus")def get_driver(self):return self.d, self.sessdef ClickText(self, text):"""点击文本方法操作"""self.d(text=text).click()def ClickResId(self, page):"""根据resourceId 进行点击"""self.d(resourceId="com.tcl.tclplus:id/" + page).click()def ExistsResId(self, page):"""判断resid 是否存在"""return self.d(resourceId="com.tcl.tclplus:id/" + page).exists()def ExistsText(self, text):"""判断文本 是否存在"""return self.d(text=text).existsdef ClickResIdSendKeys(self, *args):"""通过ResId 定位并且 输入内容"""self.ClickResId(args[0])self.d.send_keys(args[1])def GetTextResID(self, page):"""根据RESid 来获取文本"""return self.d(resourceId="com.tcl.tclplus:id/" + page).get_text()def GetTextXpath(self, text):"""获取文本的xpath"""return self.d.xpath(f'//*[@text="{text}"]').get_text()def ClickResIdText(self, page, text):"""根据resIDhe文本进行定位"""self.d(resourceId="com.tcl.tclplus:id/" + page, text=text).click()def SwipeOrExist(self, taper, text, up="up"):"""向上滑动,判断元素是否存在,存在即可点击根据taper为True或者false 来执行文本判断和resID判断滑动语句写到循环里面,每次执行循环,然后进行判断是否存在根据参数 up 或者down 来决定滑动的方向。"""while True:if taper:self.d.swipe_ext(up, 1)if not self.d(text=text).exists:continueelse:self.ClickText(text)breakelse:self.d.swipe_ext(up, 0.5)if not self.ExistsResId(text):continueelse:self.ClickResId(text)break@classmethoddef back(cls):'''点击返回页面没有加载完的时候,会出现返回失败的情况,使用前确认页面加载完成'''time.sleep(1)cls.d.press('back')time.sleep(1)@classmethoddef identify(cls):cls.d.open_identify()def set_chromedriver(self, device_ip=None, package=None, activity=None, process=None):driver = ChromeDriver(self.d, Ports().get_ports(1)[0]). \driver(device_ip=device_ip, package=package, attach=True, activity=activity, process=process)return driver@classmethoddef watch_device(cls, watch_list):'''如果存在元素则自动点击:param watch_list: exp: watch_list=['允许','yes','跳过']'''cls.d.watchers.watched = Falsefor i in watch_list:cls.d.watcher(i).when(text=i).click(text=i)# cls.d.watcher("允许").when(text="允许").click(text="允许")print('Starting watcher,parameter is %s' % watch_list)cls.d.watchers.watched = True@classmethoddef unwatch_device(cls):'''关闭watcher '''print('Stop all watcher')cls.d.watchers.watched = False@classmethoddef get_toast_message(cls):message = cls.d.toast.get_message(3, 3)cls.d.toast.reset()return message@classmethoddef set_fastinput_ime(cls):cls.d.set_fastinput_ime(True)@classmethoddef set_original_ime(cls):cls.d.set_fastinput_ime(False)@classmethoddef screenshot(cls):"""截图并打印特定格式的输出,保证用例显示截图"""date_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))screenshot_name = cls.__qualname__ + '-' + date_time + '.PNG'path = ReportPath().get_path() + '/' + screenshot_namecls.d.screenshot(path)print('IMAGE:' + screenshot_name)@staticmethoddef find_message(elements, text):"""查找元素列表中是否存在 text"""count = elements.countwhile count > 0:count = count - 1message = elements[count].info['text']if text in message:return Trueelif count == 0:return Falseelse:return Falsedef _get_window_size(self):window = self.d.window_size()x = window[0]y = window[1]return x, y@staticmethoddef _get_element_size(element):# rect = element.info['visibleBounds']rect = element.info['bounds']# print(rect)x_center = (rect['left'] + rect['right']) / 2y_center = (rect['bottom'] + rect['top']) / 2x_left = rect['left']y_up = rect['top']x_right = rect['right']y_down = rect['bottom']return x_left, y_up, x_center, y_center, x_right, y_down# def _swipe(self, fromX, fromY, toX, toY, steps):# self.d.swipe(fromX, fromY, toX, toY, steps)## def swipe_up(self, element=None, steps=0.2):# """# swipe up# :param element: UI element, if None while swipe window of phone# :param steps: steps of swipe for Android, The lower the faster# :return: None# """# if element:# x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)# fromX = x_center# fromY = y_center# toX = x_center# toY = y_up# else:# x, y = self._get_window_size()# fromX = 0.5 * x# fromY = 0.5 * y# toX = 0.5 * x# toY = 0.25 * y## self._swipe(fromX, fromY, toX, toY, steps)## def swipe_down(self, element=None, steps=0.2):# """# swipe down# :param element: UI element, if None while swipe window of phone# :param steps: steps of swipe for Android, The lower the faster# :return: None# """# if element:# x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)## fromX = x_center# fromY = y_center# toX = x_center# toY = y_down# else:# x, y = self._get_window_size()# fromX = 0.5 * x# fromY = 0.5 * y# toX = 0.5 * x# toY = 0.75 * y## self._swipe(fromX, fromY, toX, toY, steps)## def swipe_left(self, element=None, steps=0.2):# """# swipe left# :param element: UI element, if None while swipe window of phone# :param steps: steps of swipe for Android, The lower the faster# :return: None# """# if element:# x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)# fromX = x_center# fromY = y_center# toX = x_left# toY = y_center# else:# x, y = self._get_window_size()# fromX = 0.5 * x# fromY = 0.5 * y# toX = 0.25 * x# toY = 0.5 * y# self._swipe(fromX, fromY, toX, toY, steps)## def swipe_right(self, element=None, steps=0.2):# """# swipe right# :param element: UI element, if None while swipe window of phone# :param steps: steps of swipe for Android, The lower the faster# :return: None# """# if element:# x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)# fromX = x_center# fromY = y_center# toX = x_right# toY = y_center# else:# x, y = self._get_window_size()# fromX = 0.5 * x# fromY = 0.5 * y# toX = 0.75 * x# toY = 0.5 * y# self._swipe(fromX, fromY, toX, toY, steps)## def _find_element_by_swipe(self, direction, value, element=None, steps=0.2, max_swipe=6):# """# :param direction: swip direction exp: right left up down# :param value: The value of the UI element location strategy. exp: d(text='Logina')# :param element: UI element, if None while swipe window of phone# :param steps: steps of swipe for Android, The lower the faster# :param max_swipe: the max times of swipe# :return: UI element# """# times = max_swipe# for i in range(times):# try:# if value.exists:# return value# else:# raise UiObjectNotFoundError# except UiObjectNotFoundError:# if direction == 'up':# self.swipe_up(element=element, steps=steps)# elif direction == 'down':# self.swipe_down(element=element, steps=steps)# elif direction == 'left':# self.swipe_left(element=element, steps=steps)# elif direction == 'right':# self.swipe_right(element=element, steps=steps)# if i == times - 1:# raise UiObjectNotFoundError## def find_element_by_swipe_up(self, value, element=None, steps=0.2, max_swipe=6):# return self._find_element_by_swipe('up', value,# element=element, steps=steps, max_swipe=max_swipe)## def find_element_by_swipe_down(self, value, element=None, steps=0.2, max_swipe=6):# return self._find_element_by_swipe('down', value,# element=element, steps=steps, max_swipe=max_swipe)## def find_element_by_swipe_left(self, value, element=None, steps=0.2, max_swipe=6):# return self._find_element_by_swipe('left', value,# element=element, steps=steps, max_swipe=max_swipe)## def find_element_by_swipe_right(self, value, element=None, steps=0.2, max_swipe=6):# return self._find_element_by_swipe('right', value,# element=element, steps=steps, max_swipe=max_swipe)
chromedriver.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
#
# extension for https://sites.google.com/a/chromium.org/chromedriver/
# Experimental, maybe change in the future
# Created by <hzsunshx> 2017-01-20from __future__ import absolute_importimport atexit
import six
from selenium import webdriver
import psutil as pt
import osif six.PY3:import subprocessfrom urllib.error import URLError
else:from urllib2 import URLErrorimport subprocess32 as subprocessdef getPidByName(Str):pids = pt.process_iter()pidList = []for pid in pids:if pid.name() == Str:pidList.append(int(pid.pid))return pidListclass ChromeDriver(object):def __init__(self, d, port):self._d = dself._port = portdef _launch_webdriver(self):# print("start chromedriver instance")p = subprocess.Popen(['chromedriver', '--port=' + str(self._port)])try:p.wait(timeout=2.0)return Falseexcept subprocess.TimeoutExpired:return Truedef driver(self, device_ip=None, package=None, attach=True, activity=None, process=None):"""Args:- package(string): default current running app- attach(bool): default true, Attach to an already-running app instead of launching the app with a clear data directory- activity(string): Name of the Activity hosting the WebView.- process(string): Process name of the Activity hosting the WebView (as given by ps).If not given, the process name is assumed to be the same as androidPackage.Returns:selenium driver"""app = self._d.current_app()capabilities = {'chromeOptions': {'androidDeviceSerial': device_ip or self._d.serial,'androidPackage': package or app['package'],'androidUseRunningApp': attach,'androidProcess': process or app['package'],'androidActivity': activity or app['activity'],}}try:dr = webdriver.Remote('http://localhost:%d' % self._port, capabilities)except URLError:self._launch_webdriver()dr = webdriver.Remote('http://localhost:%d' % self._port, capabilities)# always quit driver when doneatexit.register(dr.quit)return dr@staticmethoddef kill():# # for windows# pid = getPidByName('chromedriver.exe')# for i in pid:# os.popen('taskkill /PID %d /F' % i)# # for macpid = getPidByName('chromedriver')for i in pid:os.popen('kill -9 %d' % i)print('All chromedriver pid killed')# if __name__ == '__main__':import uiautomator2 as u2# d = u2.connect()# driver = ChromeDriver(d).driver()# elem = driver.find_element_by_link_text(u"登录")# elem.click()# driver.quit()# ChromeDriver.kill()
业务方法封装,包含自动安装apk
业务方法封装
LoginPage.py
# -*- coding: utf-8 -*-
import random
import string
import threading
from Common.osDriver import *
from Common.Decorator import *filepath = os.path.normpath(os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)))))class LoginPage(BasePage):# @teststep# def loginApps(self):# self.ClickText("立即登录")# time.sleep(1)# if self.ExistsText("确认"):# # if self.ExistsResId("bt_confirm"):# log.i("存在确认,先输入账号")# if self.d.device_info["brand"] == "Xiaomi": # 根据设备型号指定账号# CountUser = '13720140005'# log.i(f"取到的账号为{CountUser}")# self.ClickResIdSendKeys("ll_phone", CountUser)# self.ClickText("确认")# elif self.d.device_info["brand"] == "HUAWEI": # 根据设备型号指定账号# CountUser = '13720140008'# log.i(f"取到的账号为{CountUser}")# self.ClickResIdSendKeys("ll_phone", CountUser)# self.ClickText("确认")# # time.sleep(3)# if self.ExistsText("账号密码登录"):# self.ClickText("账号密码登录")# self.ClickResIdSendKeys("et_pwd", '123456wqw')# if not self.ExistsResId("cb_login"):# self.d.press("back")# while True:# self.ClickResId("cb_login")# self.ClickText("登录")# time.sleep(5)# message = self.sess.toast.get_message(wait_timeout=0.5) # 获取页面toast# print(f'获取到的toast为:{message}')# if message == u"请阅读并同意协议":# continue# else:# break# # @teststep# def Eor(self):# """# 判断是否存在 存在即点击# """# if self.ExistsText("允许"):# self.ClickText("允许")# if self.ExistsText("始终允许"):# self.ClickText("始终允许")# # def Type_exit(self, types):# """# 判断返回 是否为 左 或者 右# 左:确认离开 支付的状态# 右:确认离开 没有提交订单 只是点了立即购买# @object:# """# if types == 'left':# if self.ExistsResId("tv_left"):# self.ClickResId("tv_left")# elif types == 'right':# # if self.ExistsResId("tv_right"):# self.ClickText("确认离开")# elif types == 'give':# ss2 = self.d(resourceId="com.tencent.mm:id/ffp").exists()# if ss2:# self.d(text='放弃').click()# self.back()# self.d(text='确认离开').click()# elif types == 'li':# if self.ExistsResId("tv_right"):# self.ClickText("确认")# # else:# pass# # def openIndex(self, ides=''):# """# 一直点返回按钮,当出现 提示语 就结束# """# # while True:# self.back()# time.sleep(1)# self.Type_exit(ides)# message = self.sess.toast.get_message(wait_timeout=0.5) # 获取页面toast# if message == u"再按一次退出程序":# break# # @teststep# def ClearAndLogin(self):# """# 执行清除缓存 ,然后登录# """# # dev_list = []# # for d in adbutils.adb.device_list():# # dev_list.append(d)# # for i in dev_list:# # s = str(i).split("=")[1].split(")")[0]# # print(f"""取到的设备id为:{s}""")# # os.system(f"adb shell -s {s} pm clear com.tcl.tclplus")# # time.sleep(5)# pass# # @teststep# def update_nickname(self, *args):# """修改昵称"""# if self.d(text="保存").exists():# pass# else:# self.d.xpath(# '//*[@resource-id="com.tcl.tclplus:id/fl_head_view"]/android.widget.ImageView[5]').click()# try:# # self.ClickResIdSendKeys("et_nickname", args[0])# self.ClickResId("et_nickname")# time.sleep(1)# # 点旁边的“x”情况昵称# self.ClickResId("iv_clear")# self.d.send_keys(args[0])# time.sleep(1)# self.ClickText("保存")# time.sleep(1)# args[1](args[0], self.GetTextResID("tv_nickName"))# log.i("修改昵称成功")# except Exception as e:# raise log.i("修改昵称失败")# # @teststep# def tv_sms(self, *args):# """"""# try:# self.ClickResId(args[1])# time.sleep(2)# args[0](args[2], self.GetTextResID("toolbar_title"))# log.i(f"成功进入{args[2]}页面")# except Exception as e:# raise log.i(f'进入{args[2]}页面失败')# finally:# self.d.press("back")# # @teststep# def add_invoice(self, Invoice, types=''):# """# 新增发票# """# a = random.sample(string.ascii_letters, 4)# data = ''.join([str(x) for x in a])# time.sleep(1)# self.ClickText("添加发票抬头")# if Invoice == '企业':# if types == '电子发票':# self.ClickText("企业")# self.ClickResIdSendKeys("et_invoice_header", data)# self.ClickResIdSendKeys("et_tax_no", "42112588888232" + str(random.randint(1, 88)))# self.d.press("back")# self.ClickText("完成")# else:# self.ClickText("企业")# self.ClickResId("rb_VAT_invoice")# time.sleep(2)# self.ClickResIdSendKeys("et_invoice_header", data)# self.ClickResIdSendKeys("et_tax_no", "42112588888232" + str(random.randint(1, 88)))# self.ClickResIdSendKeys("et_bank_name", "建设银行坂田支行")# self.ClickResIdSendKeys("et_bank_no", "6217007200031609020")# self.d.press("back")# self.ClickResIdSendKeys("et_address", "南山区")# self.d.press("back")# self.ClickResIdSendKeys("et_mobile", "07554243060")# self.d.press("back")# self.ClickText("完成")# else:# self.ClickResIdSendKeys("et_invoice_header", data)# self.ClickResIdSendKeys("et_phone_no", "13599837022")# self.ClickText("完成")# # @teststep# def update_invoice(self, ys):# """修改发票抬头"""# ids = 1# # 第一次进入 需要点编辑# self.d.xpath(# f'//*[@resource-id="com.tcl.tclplus:id/recyclerview"]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()# while True:# ids += 1# time.sleep(2)# if self.d(text=ys).exists():# break# else:# time.sleep(2)# self.d.press('back')# time.sleep(1)# self.d.xpath(# f'//*[@resource-id="com.tcl.tclplus:id/recyclerview"]/android.view.ViewGroup[{ids}]/android.widget.ImageView[1]').click()# continue# # @teststep# def add_address_phone(self):# """# 新增地址时,填写信息# """# self.ClickResIdSendKeys("et_name", "李白")# self.ClickResIdSendKeys("et_phone", "13699837021")# self.ClickResId("tv_area")# for i in ["安徽省", "安庆市", "大观区", "山口乡"]: self.ClickText(i)# self.ClickResId("et_addr")# self.d.send_keys("安徽省安庆市")# self.ClickText("公司")# self.d.press('back')# self.ClickText("保存")# # @teststep# def binding(self, *args):# """# 绑定qq 或者微信# custom_qq,custom_wechat# """# # 如果类型是qq或者微信,先点击取消绑定,然后进行绑定# self.d.xpath(# f'//*[@resource-id="com.tcl.tclplus:id/{args[0]}"]/android.widget.FrameLayout['# '1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]').click()# time.sleep(2)# if self.d(text="继续解绑").exists():# self.ClickText("继续解绑")# else:# time.sleep(2)# if args[0] == "custom_qq":# time.sleep(2)# self.ClickText("授权登录")# time.sleep(5)# self.ClickText("确认绑定")# time.sleep(2)# args[1]("绑定成功", self.sess.toast.get_message())@teststepdef local_install(self):"""下载apk 到apk路径下,然后进行区分安装"""# 先删除该文件夹下所有的文件:然后再去下载apkautomatic().del_file('../apk')# 清空文件夹下文件之后,再去下载最新apkapk_name = automatic().download_apk()if self.d.device_info["brand"] == "HUAWEI": # 根据设备型号来执行不同的安装方式threaded1 = install_thead(self.d)threaded1.start()threaded1.app_install(path + '/apk/' + apk_name)threaded1.join() # 等待所有线程终止time.sleep(5)os.system(f"adb -s 8KE0220413004952 shell pm clear com.tcl.tclplus")self.d.app_start("com.tcl.tclplus")elif self.d.device_info["brand"] == "Xiaomi": # 根据设备型号来执行不同的安装方式os.system(f"adb -s d083a03e install -r -d " + filepath + "/apk/" + apk_name)time.sleep(2)os.system(f"adb -s d083a03e shell pm clear com.tcl.tclplus")self.d.app_start("com.tcl.tclplus")class install_thead(threading.Thread):def __init__(self, devices): # 需要传入self.d 设备的信息self.drivers = devicesthreading.Thread.__init__(self, )def usb_install(self):for i in range(3):try:self.drivers(text='继续安装').click()except:passtry:self.drivers(text='打开').click()except:passdef run(self): # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数self.usb_install()def app_install(self, app_file_path):self.drivers.app_install(app_file_path)
这里根据每个设备的self.d 的devices_info中的brand 来区分 进行不同的安装方法,采用了多线程运行,来去掉安装过程中存在警告提示等。自行添加即可。
以上就完成了 基础的定位方法封装、业务定位封装、多设备启动、多设备安装。
3.同时运行
CaseStrategy.py
import os
import unittestclass CaseStrategy:def __init__(self):self.suite_path = 'TestSuite_'self.case_path = 'TestCase'self.case_pattern = 'test*.py'def _collect_cases(self, cases, top_dir=None):suites = unittest.defaultTestLoader.discover(self.case_path,pattern=self.case_pattern, top_level_dir=top_dir)for suite in suites:for case in suite:print(f"需要执行的测试用例为:{case}")cases.addTest(case)def collect_cases(self, suite=False):"""collect casescollect cases from the giving path by case_path via the giving pattern by case_patternreturn: all cases that collected by the giving path and pattern, it is a unittest.TestSuite()"""cases = unittest.TestSuite()if suite:test_suites = []for file in os.listdir('.'):if self.suite_path in file:if os.path.isdir(file):test_suites.append(file)for test_suite in test_suites:self._collect_cases(cases, top_dir=test_suite)else:self._collect_cases(cases, top_dir=None)return cases
devices.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
多进程check_alive
Mac下需要配置 `export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`到环境变量,不然python会挂掉
'''
from Common.ReadConfig import ReadConfig
from Common.ATX_Server import ATX_Server
import uiautomator2 as u2
import subprocess
import refrom multiprocessing import Pooldef get_devices():'''get the devices from Pubilc/config.ini devices listreturn alive devices'''devices_ip = ReadConfig().get_devices_ip()print('Connect devices from config devices IP list %s' % devices_ip)pool = Pool(processes=len(devices_ip))tmp_list = []for run in devices_ip:tmp_list.append(pool.apply_async(check_alive, args=(run,)))pool.close()pool.join()devices_list = []for i in tmp_list:if i.get():devices_list.append(i.get())return devices_listdef get_online_devices():'''get the devices from ATX-Serverreturn alive devices'''devices = ATX_Server(ReadConfig().get_server_url()).online_devices()print('There has %s online devices on ATX-Server' % len(devices))if devices:pool = Pool(processes=len(devices))tmp_list = []for run in devices:tmp_list.append(pool.apply_async(check_alive, args=(run,)))pool.close()pool.join()devices_list = []for i in tmp_list:if i.get():devices_list.append(i.get())return devices_listelse:raise Exception('ATX-Server has no online device!!! ')def connect_devices():'''get the devices USB connected on PCreturn alive devices'''output = subprocess.check_output(['adb', 'devices'])pattern = re.compile(r'(?P<serial>[^\s]+)\t(?P<status>device|offline)')matches = pattern.findall(output.decode())valid_serials = [m[0] for m in matches if m[1] == 'device']if valid_serials:print('There has %s devices connected on PC: ' % len(valid_serials))pool = Pool(processes=len(valid_serials))tmp_list = []for run in valid_serials:tmp_list.append(pool.apply_async(check_alive, args=(run,)))pool.close()pool.join()devices_list = []for i in tmp_list:if i.get():devices_list.append(i.get())return devices_listif len(valid_serials) == 0:print("No available android devices detected.")return []def check_alive(device):if isinstance(device, dict):d = u2.connect(device['ip'])if d.agent_alive:d.healthcheck()if d.alive:print('%s is alive' % device['udid'])return d.device_infoelse:print('%s is not alive' % device['udid'])return Noneelse:print('The device atx_agent %s is not alive,please checkout!' % device['udid'])return Noneelse:d = u2.connect(device)if d.agent_alive:d.healthcheck()if d.alive:print('%s is alive' % device)return d.device_infoelse:print('%s is not alive' % device)return Noneelse:print('The device atx_agent %s is not alive,please checkout!' % device)return None# if __name__ == '__main__':# devices_ip = get_devices()# devices = connect_devices()# devices = get_online_devices()# print(devices_ip)## pool = Pool(processes=len(devices_ip))# tmp_list = []# for run in devices_ip:# tmp_list.append(pool.apply_async(check_alive, args=(run,)))# # alive_list.append(tmp)# pool.close()# pool.join()# print('All runs done........ ')# print(tmp_list)# for i in tmp_list:# print(i.get())# print(get_devices())# print(get_online_devices())# print(connect_devices())
driver.py
from Common.Devices import * # 多进程 check_alive ,Mac下需要配置 `export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`到环境变量from Common.RunCases import RunCases
from Common.ReportPath import ReportPath
from Common.BasePage import BasePage
from Common.Log import Log
from Common.ReadConfig import ReadConfig
from Common.chromedriver import ChromeDriver
from Common.Test_data import generate_test_data
from Common.Report import create_statistics_reportclass Drivers:@staticmethoddef _run_cases(run, cases):log = Log()log.set_logger(run.get_device()['model'], run.get_path() + '/' + 'client.log')log.i('udid: %s', run.get_device()['udid'])# set cls.path, it must be call before operate on any pagepath = ReportPath()path.set_path(run.get_path())# set cls.driver, it must be call before operate on any pagebase_page = BasePage()if 'ip' in run.get_device():base_page.set_driver(run.get_device()['ip'])else:base_page.set_driver(run.get_device()['serial'])try:# run casesbase_page.set_fastinput_ime()run.run(cases)base_page.set_original_ime()base_page.identify()except AssertionError as e:log.e('AssertionError, %s', e)def run(self, cases):# 根据method 获取android设备method = ReadConfig().get_method().strip()if method == 'SERVER':# get ATX-Server Online devices# devices = ATX_Server(ReadConfig().get_server_url()).online_devices()print('Checking available online devices from ATX-Server...')devices = get_online_devices()print('\nThere has %s online devices in ATX-Server' % len(devices))elif method == 'IP':# get devices from config devices listprint('Checking available IP devices from config... ')devices = get_devices()print('\nThere has %s devices alive in config IP list' % len(devices))elif method == 'USB':# get devices connected PC with USBprint('Checking available USB devices connected on PC... ')devices = connect_devices()print('\nThere has %s USB devices alive ' % len(devices))else:raise Exception('Config.ini method illegal:method =%s' % method)if not devices:print('There is no device found,test over.')return# generate test data data.json 准备测试数据generate_test_data(devices)print('Starting Run test >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')runs = []for i in range(len(devices)):runs.append(RunCases(devices[i]))# run on every device 开始执行测试pool = Pool(processes=len(runs))for run in runs:pool.apply_async(self._run_cases,args=(run, cases,))print('Waiting for all runs done........ ')pool.close()pool.join()print('All runs done........ ')ChromeDriver.kill()# Generate statistics report 生成统计测试报告 将所有设备的报告在一个HTML中展示create_statistics_report(runs)# if __name__ == '__main__':# print(ATX_Server(ReadConfig().get_url()).online_devices())## print(get_devices())# print(ReadConfig().get_atx_server('method'))
4.持续集成,就是Jenkins ,这里不做多说。
基于ATX的app自动化相关推荐
- pythonapp自动化_基于python的App UI自动化环境搭建
Android端Ui 自动化环境搭建 一,安装JDK.SDK 二,添加环境变量 Widows: 1.系统变量→新建 JAVA_HOME 变量 E:\Java\jdk1.7.0 jdk安装目录 2.系统 ...
- 十分钟弄懂最快的APP自动化工具uiautomator2
相信很多使用appium做过APP自动化的人都深有感触: appium运行慢.时间长 uiautomatorviewer定位元素时得关掉appium server 在低版本的appium上获取toas ...
- [Appium] App自动化-元素定位
[Appium] App自动化-元素定位及工具 一.元素定位工具简介 Web自动化是通过浏览器自带的F12键进行元素定位,但是App自动化支持三大定位工具(UIAutomatorView/Appium ...
- 4行代码,让app自动化框架支持 webview 混合应用操作
移动端 app 自动化框架很多,但是有一些框架因为不支持混合应用测试,一直没有完全流行.比较典型的是经典的 Python 框架 uiautomator2, 这个框架简单好用,没有 appium 那样复 ...
- appium手机APP自动化定位元素
目录 1 什么事手机app自动化,作用和特点 2 自动化原理: 3 定位元素 根据ID 根据CLASS NAME 根据ACCESSIBILITY ID Xpath 1 什么事手机app自动化,作用和特 ...
- [facebook-wda]搭建iOS App自动化环境
搭建iOS App自动化环境 一.测试结构介绍 手机端的WDA Runner(WebDriverAgent)类似于appium测试框架中的 UIAutomator Server,将从客户端接收到的控制 ...
- APP自动化测试之录制脚本:3.运行录制的脚本
APP自动化测试之录制脚本:3.运行录制的脚本 1.前提 基于win10专业版64位系统+jdk1.8+python3+pycharm+android SDK+appium+unittest.运行录制 ...
- 【Appium】基于 Appium 的 iOS 自动化
基于 Appium 的 iOS 自动化 一.Appium 环境搭建(macOS 10.15) 1.安装 Appium-server 2.替换并配置 WebDriverAgent 3.安装 [Appiu ...
- 手机APP自动化 Appium教程
Appium原理与安装 Appium 是一个移动 App (手机应用)自动化工具. 手机APP 自动化有什么用? 自动化完成一些重复性的任务 比如微信客服机器人 爬虫 自动化测试 Appium 自动化 ...
最新文章
- C++程序中常用的sprintf
- 性能调优-SQL TRACE
- java query api_ElasticSearch(十二) Bool Query JAVA API
- anaconda学习python_python深度学习笔记1-Anaconda软件安装
- 重装Nodejs后,webstorm代码报错问题
- PHP cURL应用实现模拟登录与采集使用方法详解
- Spring boot - 整合 Redis缓存(上)
- Hello World!!!
- 03 unix 设计哲学和流重定向
- 从入门到入土:[SEED-Lab]MD5碰撞试验|MD5collgen实验|linux|Ubuntu|MD5 Collision Attack Lab|详细讲解
- 寒假训练营第四次作业
- 读《学术研究,你的成功之道》读书笔记分享给各位
- shell脚本 文件拷贝
- 8 种流行的计算机视觉应用
- 奇异矩阵和非奇异矩阵 行列式矩阵简单理解 代数意义 几何意义 行列式的定义: 二阶行列式的几何意义: 三阶行列式的几何意义: 行列式化为对角形的几何解释: 二阶行列式乘积项的几何意
- 「数据架构」数据模型,数据字典,数据库模式 和ERD的比较
- 三消游戏算法图文详解
- 凛冬的寒风,吹开了电动车的遮羞布
- zabbix 自动发现/自定义宏
- 总结了Mybatis,原来知识点也没多少嘛
热门文章
- WIN2003安装SQL2000时验证CDKEY
- js 延迟几秒执行ifarme_延时加载JavaScript代码提高速度_javascript技巧 -
- 苹果6如何截屏_iPhone截屏操作也分三六九等?
- kuangbin J - Simpsons’ Hidden Talents
- Linux下部署tomcat,启动时8005端口无法启动
- 题解 洛谷 P4169 [Violet]天使玩偶/SJY摆棋子【CDQ分治】
- 使用ExpandableListView中的一些边边角角
- Java 使用IE浏览器下载文件,文件名乱码问题
- python解析返回值类型为xml的数据接口
- 团队活动 激励还是鸡肋?