常规做APPUI自动化时,基本上都是采取的pom模式加上关键字驱动、数据驱动、实现测试数据分离。

需求是:

(1)实现基本业务流程的测试
(2)多设备同时运行
(3)自动拉取最新apk并自动安装
(4)持续集成

分析:

业务的基本流程覆盖,相信做过接口web端的自动化或者接口自动化的同学来说,实现app ui自动化其实不难,也是相同的思路pom模式、关键字驱动、数据驱动、数据分离等。

1.先来自动拉取最新apk 并且实现自动化安装。

import os
import re
import shutil
from urllib.request import urlopen  # 用于获取网页
import requests
from bs4 import BeautifulSoup  # 用于解析网页path = os.path.normpath(os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)))))class automatic(object):def __init__(self):self.url = 'http://10.124.106.120:28759/service/rest/repository/browse/maven-releases/tcl/release/android/tclplus/'@staticmethoddef _get_url(url):"""根据传入的url来获取超链接"""html = urlopen(url)bsObj = BeautifulSoup(html, 'html.parser')t1 = bsObj.find_all('a')return t1# 获取线上最新版本号与版本时间def get_latest_version(self):"""获取最新版本号"""t1 = self._get_url(self.url)time_version_dict = []for t2 in t1:t3 = t2.get('href')if '2.0' not in t3:# 将2.0以上的版本全部过滤掉,然后加入到列表里面time_version_dict.append(str(t3).split("/")[0].strip("..").strip("http:"))# 找出最大的版本号latest_version = max(time_version_dict)print(f"最大版本号为:{latest_version}")result_url = self.url + str(latest_version)return result_url, latest_versiondef download_apk(self):"""下载apk"""global apk_nameurls, number = self.get_latest_version()t1 = self._get_url(urls)time_version_dict = []url_list = []for t2 in t1:t3 = t2.get('href')url_list.append(t3)# 利用正则表达式找到2021开头的版本号pattern = re.compile(r'(?:2021)\d+\.?\d*')time_version_dict.append(pattern.findall(t3))Ak = max(time_version_dict)[0]for url in url_list:if str(Ak) in url:file = requests.get(url, timeout=60)apk_name = f"{number}-{max(time_version_dict)[0]}-jiagu-xtest-release.apk"with open(path + '/APK/' + apk_name, 'wb') as zip_file:zip_file.write(file.content)return apk_name

主要就是发起请求,取到最新版本号,然后获取链接地址,拿到下载地址之后,来写file.content,保存apk 即可。

2.第二步,多设备链接并自动安装
Decorator.py

import timefrom functools import wraps
from Common.BasePage import BasePage
from Common.ReportPath import ReportPath
from Common.Log import Logflag = 'IMAGE:'
log = Log()def _screenshot(name):date_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))screenshot = name + '-' + date_time + '.PNG'path = ReportPath().get_path() + '/' + screenshotdriver, sess = BasePage().get_driver()driver.screenshot(path)return screenshotdef teststep(func):@wraps(func)def wrapper(*args, **kwargs):try:log.i('\t--> %s', func.__qualname__)ret = func(*args, **kwargs)return retexcept AssertionError as e:log.e('AssertionError, %s', e)log.e('\t<-- %s, %s, %s', func.__qualname__, 'AssertionError', 'Error')if flag in str(e):raise AssertionError(e)else:raise AssertionError(flag + _screenshot(func.__qualname__))except Exception as e:log.e('Exception, %s', e)log.e('\t<-- %s, %s, %s', func.__qualname__, 'Exception', 'Error')if flag in str(e):raise Exception(e)else:raise Exception(flag + _screenshot(func.__qualname__))return wrapperdef teststeps(func):@wraps(func)def wrapper(*args, **kwargs):try:log.i('  --> %s', func.__qualname__)ret = func(*args, **kwargs)log.i('  <-- %s, %s', func.__qualname__, 'Success')return retexcept AssertionError as e:log.e('AssertionError, %s', e)log.e('  <-- %s, %s, %s', func.__qualname__, 'AssertionError', 'Error')if flag in str(e):raise AssertionError(e)else:raise AssertionError(flag + _screenshot(func.__qualname__))except Exception as e:log.e('Exception, %s', e)log.e('  <-- %s, %s, %s', func.__qualname__, 'Exception', 'Error')if flag in str(e):raise Exception(e)else:raise Exception(flag + _screenshot(func.__qualname__))return wrapperdef _wrapper(func):@wraps(func)def wrapper(*args, **kwargs):try:log.i('--> %s', func.__qualname__)ret = func(*args, **kwargs)log.i('<-- %s, %s\n', func.__qualname__, 'Success')return retexcept AssertionError as e:log.e('AssertionError, %s', e)log.e('<-- %s, %s, %s\n', func.__qualname__, 'AssertionError', 'Fail')if flag in str(e):raise AssertionError(e)else:raise AssertionError(flag + _screenshot(func.__qualname__))except Exception as e:log.e('Exception, %s', e)log.e('<-- %s, %s, %s\n', func.__qualname__, 'Exception', 'Error')if flag in str(e):raise Exception(e)else:raise Exception(flag + _screenshot(func.__qualname__))return wrapperdef testcase(func):return _wrapper(func)def setup(func):return _wrapper(func)def teardown(func):return _wrapper(func)def setupclass(func):return _wrapper(func)def teardownclass(func):return _wrapper(func)

基础定位方法封装

基础定位方法封装

BasePage.py

import time
import uiautomator2 as u2
from Common.chromedriver import ChromeDriver
from Common.Ports import Ports
from Common.ReportPath import ReportPathclass BasePage(object):@classmethoddef set_driver(cls, dri):cls.d = u2.connect(dri)cls.sess = cls.d.session("com.tcl.tclplus")def get_driver(self):return self.d, self.sessdef ClickText(self, text):"""点击文本方法操作"""self.d(text=text).click()def ClickResId(self, page):"""根据resourceId 进行点击"""self.d(resourceId="com.tcl.tclplus:id/" + page).click()def ExistsResId(self, page):"""判断resid 是否存在"""return self.d(resourceId="com.tcl.tclplus:id/" + page).exists()def ExistsText(self, text):"""判断文本 是否存在"""return self.d(text=text).existsdef ClickResIdSendKeys(self, *args):"""通过ResId 定位并且 输入内容"""self.ClickResId(args[0])self.d.send_keys(args[1])def GetTextResID(self, page):"""根据RESid 来获取文本"""return self.d(resourceId="com.tcl.tclplus:id/" + page).get_text()def GetTextXpath(self, text):"""获取文本的xpath"""return self.d.xpath(f'//*[@text="{text}"]').get_text()def ClickResIdText(self, page, text):"""根据resIDhe文本进行定位"""self.d(resourceId="com.tcl.tclplus:id/" + page, text=text).click()def SwipeOrExist(self, taper, text, up="up"):"""向上滑动,判断元素是否存在,存在即可点击根据taper为True或者false 来执行文本判断和resID判断滑动语句写到循环里面,每次执行循环,然后进行判断是否存在根据参数 up 或者down 来决定滑动的方向。"""while True:if taper:self.d.swipe_ext(up, 1)if not self.d(text=text).exists:continueelse:self.ClickText(text)breakelse:self.d.swipe_ext(up, 0.5)if not self.ExistsResId(text):continueelse:self.ClickResId(text)break@classmethoddef back(cls):'''点击返回页面没有加载完的时候,会出现返回失败的情况,使用前确认页面加载完成'''time.sleep(1)cls.d.press('back')time.sleep(1)@classmethoddef identify(cls):cls.d.open_identify()def set_chromedriver(self, device_ip=None, package=None, activity=None, process=None):driver = ChromeDriver(self.d, Ports().get_ports(1)[0]). \driver(device_ip=device_ip, package=package, attach=True, activity=activity, process=process)return driver@classmethoddef watch_device(cls, watch_list):'''如果存在元素则自动点击:param watch_list: exp: watch_list=['允许','yes','跳过']'''cls.d.watchers.watched = Falsefor i in watch_list:cls.d.watcher(i).when(text=i).click(text=i)# cls.d.watcher("允许").when(text="允许").click(text="允许")print('Starting watcher,parameter is %s' % watch_list)cls.d.watchers.watched = True@classmethoddef unwatch_device(cls):'''关闭watcher '''print('Stop all watcher')cls.d.watchers.watched = False@classmethoddef get_toast_message(cls):message = cls.d.toast.get_message(3, 3)cls.d.toast.reset()return message@classmethoddef set_fastinput_ime(cls):cls.d.set_fastinput_ime(True)@classmethoddef set_original_ime(cls):cls.d.set_fastinput_ime(False)@classmethoddef screenshot(cls):"""截图并打印特定格式的输出,保证用例显示截图"""date_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))screenshot_name = cls.__qualname__ + '-' + date_time + '.PNG'path = ReportPath().get_path() + '/' + screenshot_namecls.d.screenshot(path)print('IMAGE:' + screenshot_name)@staticmethoddef find_message(elements, text):"""查找元素列表中是否存在 text"""count = elements.countwhile count > 0:count = count - 1message = elements[count].info['text']if text in message:return Trueelif count == 0:return Falseelse:return Falsedef _get_window_size(self):window = self.d.window_size()x = window[0]y = window[1]return x, y@staticmethoddef _get_element_size(element):# rect = element.info['visibleBounds']rect = element.info['bounds']# print(rect)x_center = (rect['left'] + rect['right']) / 2y_center = (rect['bottom'] + rect['top']) / 2x_left = rect['left']y_up = rect['top']x_right = rect['right']y_down = rect['bottom']return x_left, y_up, x_center, y_center, x_right, y_down# def _swipe(self, fromX, fromY, toX, toY, steps):#     self.d.swipe(fromX, fromY, toX, toY, steps)## def swipe_up(self, element=None, steps=0.2):#     """#     swipe up#     :param element: UI element, if None while swipe window of phone#     :param steps: steps of swipe for Android, The lower the faster#     :return: None#     """#     if element:#         x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)#         fromX = x_center#         fromY = y_center#         toX = x_center#         toY = y_up#     else:#         x, y = self._get_window_size()#         fromX = 0.5 * x#         fromY = 0.5 * y#         toX = 0.5 * x#         toY = 0.25 * y##     self._swipe(fromX, fromY, toX, toY, steps)## def swipe_down(self, element=None, steps=0.2):#     """#     swipe down#     :param element: UI element, if None while swipe window of phone#     :param steps: steps of swipe for Android, The lower the faster#     :return: None#     """#     if element:#         x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)##         fromX = x_center#         fromY = y_center#         toX = x_center#         toY = y_down#     else:#         x, y = self._get_window_size()#         fromX = 0.5 * x#         fromY = 0.5 * y#         toX = 0.5 * x#         toY = 0.75 * y##     self._swipe(fromX, fromY, toX, toY, steps)## def swipe_left(self, element=None, steps=0.2):#     """#     swipe left#     :param element: UI element, if None while swipe window of phone#     :param steps: steps of swipe for Android, The lower the faster#     :return: None#     """#     if element:#         x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)#         fromX = x_center#         fromY = y_center#         toX = x_left#         toY = y_center#     else:#         x, y = self._get_window_size()#         fromX = 0.5 * x#         fromY = 0.5 * y#         toX = 0.25 * x#         toY = 0.5 * y#     self._swipe(fromX, fromY, toX, toY, steps)## def swipe_right(self, element=None, steps=0.2):#     """#     swipe right#     :param element: UI element, if None while swipe window of phone#     :param steps: steps of swipe for Android, The lower the faster#     :return: None#     """#     if element:#         x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)#         fromX = x_center#         fromY = y_center#         toX = x_right#         toY = y_center#     else:#         x, y = self._get_window_size()#         fromX = 0.5 * x#         fromY = 0.5 * y#         toX = 0.75 * x#         toY = 0.5 * y#     self._swipe(fromX, fromY, toX, toY, steps)## def _find_element_by_swipe(self, direction, value, element=None, steps=0.2, max_swipe=6):#     """#     :param direction: swip direction exp: right left up down#     :param value: The value of the UI element location strategy. exp: d(text='Logina')#     :param element: UI element, if None while swipe window of phone#     :param steps: steps of swipe for Android, The lower the faster#     :param max_swipe: the max times of swipe#     :return: UI element#     """#     times = max_swipe#     for i in range(times):#         try:#             if value.exists:#                 return value#             else:#                 raise UiObjectNotFoundError#         except UiObjectNotFoundError:#             if direction == 'up':#                 self.swipe_up(element=element, steps=steps)#             elif direction == 'down':#                 self.swipe_down(element=element, steps=steps)#             elif direction == 'left':#                 self.swipe_left(element=element, steps=steps)#             elif direction == 'right':#                 self.swipe_right(element=element, steps=steps)#             if i == times - 1:#                 raise UiObjectNotFoundError## def find_element_by_swipe_up(self, value, element=None, steps=0.2, max_swipe=6):#     return self._find_element_by_swipe('up', value,#                                        element=element, steps=steps, max_swipe=max_swipe)## def find_element_by_swipe_down(self, value, element=None, steps=0.2, max_swipe=6):#     return self._find_element_by_swipe('down', value,#                                        element=element, steps=steps, max_swipe=max_swipe)## def find_element_by_swipe_left(self, value, element=None, steps=0.2, max_swipe=6):#     return self._find_element_by_swipe('left', value,#                                        element=element, steps=steps, max_swipe=max_swipe)## def find_element_by_swipe_right(self, value, element=None, steps=0.2, max_swipe=6):#     return self._find_element_by_swipe('right', value,#                                        element=element, steps=steps, max_swipe=max_swipe)

chromedriver.py

# !/usr/bin/env python
# -*- coding: utf-8 -*-
#
# extension for https://sites.google.com/a/chromium.org/chromedriver/
# Experimental, maybe change in the future
# Created by <hzsunshx> 2017-01-20from __future__ import absolute_importimport atexit
import six
from selenium import webdriver
import psutil as pt
import osif six.PY3:import subprocessfrom urllib.error import URLError
else:from urllib2 import URLErrorimport subprocess32 as subprocessdef getPidByName(Str):pids = pt.process_iter()pidList = []for pid in pids:if pid.name() == Str:pidList.append(int(pid.pid))return pidListclass ChromeDriver(object):def __init__(self, d, port):self._d = dself._port = portdef _launch_webdriver(self):# print("start chromedriver instance")p = subprocess.Popen(['chromedriver', '--port=' + str(self._port)])try:p.wait(timeout=2.0)return Falseexcept subprocess.TimeoutExpired:return Truedef driver(self, device_ip=None, package=None, attach=True, activity=None, process=None):"""Args:- package(string): default current running app- attach(bool): default true, Attach to an already-running app instead of launching the app with a clear data directory- activity(string): Name of the Activity hosting the WebView.- process(string): Process name of the Activity hosting the WebView (as given by ps).If not given, the process name is assumed to be the same as androidPackage.Returns:selenium driver"""app = self._d.current_app()capabilities = {'chromeOptions': {'androidDeviceSerial': device_ip or self._d.serial,'androidPackage': package or app['package'],'androidUseRunningApp': attach,'androidProcess': process or app['package'],'androidActivity': activity or app['activity'],}}try:dr = webdriver.Remote('http://localhost:%d' % self._port, capabilities)except URLError:self._launch_webdriver()dr = webdriver.Remote('http://localhost:%d' % self._port, capabilities)# always quit driver when doneatexit.register(dr.quit)return dr@staticmethoddef kill():# # for windows# pid = getPidByName('chromedriver.exe')# for i in pid:#     os.popen('taskkill /PID %d /F' % i)# # for macpid = getPidByName('chromedriver')for i in pid:os.popen('kill -9 %d' % i)print('All chromedriver pid killed')# if __name__ == '__main__':import uiautomator2 as u2# d = u2.connect()# driver = ChromeDriver(d).driver()# elem = driver.find_element_by_link_text(u"登录")# elem.click()# driver.quit()# ChromeDriver.kill()

业务方法封装,包含自动安装apk

业务方法封装

LoginPage.py

# -*- coding: utf-8 -*-
import random
import string
import threading
from Common.osDriver import *
from Common.Decorator import *filepath = os.path.normpath(os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)))))class LoginPage(BasePage):# @teststep# def loginApps(self):#     self.ClickText("立即登录")#     time.sleep(1)#     if self.ExistsText("确认"):#         # if self.ExistsResId("bt_confirm"):#         log.i("存在确认,先输入账号")#         if self.d.device_info["brand"] == "Xiaomi":  # 根据设备型号指定账号#             CountUser = '13720140005'#             log.i(f"取到的账号为{CountUser}")#             self.ClickResIdSendKeys("ll_phone", CountUser)#             self.ClickText("确认")#         elif self.d.device_info["brand"] == "HUAWEI":  # 根据设备型号指定账号#             CountUser = '13720140008'#             log.i(f"取到的账号为{CountUser}")#             self.ClickResIdSendKeys("ll_phone", CountUser)#             self.ClickText("确认")# #     time.sleep(3)#     if self.ExistsText("账号密码登录"):#         self.ClickText("账号密码登录")#     self.ClickResIdSendKeys("et_pwd", '123456wqw')#     if not self.ExistsResId("cb_login"):#         self.d.press("back")#     while True:#         self.ClickResId("cb_login")#         self.ClickText("登录")#         time.sleep(5)#         message = self.sess.toast.get_message(wait_timeout=0.5)  # 获取页面toast#         print(f'获取到的toast为:{message}')#         if message == u"请阅读并同意协议":#             continue#         else:#             break# # @teststep# def Eor(self):#     """#     判断是否存在 存在即点击#     """#     if self.ExistsText("允许"):#         self.ClickText("允许")#     if self.ExistsText("始终允许"):#         self.ClickText("始终允许")# # def Type_exit(self, types):#     """#     判断返回 是否为 左 或者 右#     左:确认离开  支付的状态#     右:确认离开  没有提交订单 只是点了立即购买#     @object:#     """#     if types == 'left':#         if self.ExistsResId("tv_left"):#             self.ClickResId("tv_left")#     elif types == 'right':# #         if self.ExistsResId("tv_right"):#             self.ClickText("确认离开")#     elif types == 'give':#         ss2 = self.d(resourceId="com.tencent.mm:id/ffp").exists()#         if ss2:#             self.d(text='放弃').click()#             self.back()#             self.d(text='确认离开').click()#     elif types == 'li':#         if self.ExistsResId("tv_right"):#             self.ClickText("确认")# #     else:#         pass# # def openIndex(self, ides=''):#     """#     一直点返回按钮,当出现 提示语 就结束#     """# #     while True:#         self.back()#         time.sleep(1)#         self.Type_exit(ides)#         message = self.sess.toast.get_message(wait_timeout=0.5)  # 获取页面toast#         if message == u"再按一次退出程序":#             break# # @teststep# def ClearAndLogin(self):#     """#     执行清除缓存 ,然后登录#     """#     # dev_list = []#     # for d in adbutils.adb.device_list():#     #     dev_list.append(d)#     # for i in dev_list:#     #     s = str(i).split("=")[1].split(")")[0]#     #     print(f"""取到的设备id为:{s}""")#     #     os.system(f"adb shell -s {s} pm clear com.tcl.tclplus")#     # time.sleep(5)#     pass# # @teststep# def update_nickname(self, *args):#     """修改昵称"""#     if self.d(text="保存").exists():#         pass#     else:#         self.d.xpath(#             '//*[@resource-id="com.tcl.tclplus:id/fl_head_view"]/android.widget.ImageView[5]').click()#     try:#         # self.ClickResIdSendKeys("et_nickname", args[0])#         self.ClickResId("et_nickname")#         time.sleep(1)#         # 点旁边的“x”情况昵称#         self.ClickResId("iv_clear")#         self.d.send_keys(args[0])#         time.sleep(1)#         self.ClickText("保存")#         time.sleep(1)#         args[1](args[0], self.GetTextResID("tv_nickName"))#         log.i("修改昵称成功")#     except Exception as e:#         raise log.i("修改昵称失败")# # @teststep# def tv_sms(self, *args):#     """"""#     try:#         self.ClickResId(args[1])#         time.sleep(2)#         args[0](args[2], self.GetTextResID("toolbar_title"))#         log.i(f"成功进入{args[2]}页面")#     except Exception as e:#         raise log.i(f'进入{args[2]}页面失败')#     finally:#         self.d.press("back")# # @teststep# def add_invoice(self, Invoice, types=''):#     """#     新增发票#     """#     a = random.sample(string.ascii_letters, 4)#     data = ''.join([str(x) for x in a])#     time.sleep(1)#     self.ClickText("添加发票抬头")#     if Invoice == '企业':#         if types == '电子发票':#             self.ClickText("企业")#             self.ClickResIdSendKeys("et_invoice_header", data)#             self.ClickResIdSendKeys("et_tax_no", "42112588888232" + str(random.randint(1, 88)))#             self.d.press("back")#             self.ClickText("完成")#         else:#             self.ClickText("企业")#             self.ClickResId("rb_VAT_invoice")#             time.sleep(2)#             self.ClickResIdSendKeys("et_invoice_header", data)#             self.ClickResIdSendKeys("et_tax_no", "42112588888232" + str(random.randint(1, 88)))#             self.ClickResIdSendKeys("et_bank_name", "建设银行坂田支行")#             self.ClickResIdSendKeys("et_bank_no", "6217007200031609020")#             self.d.press("back")#             self.ClickResIdSendKeys("et_address", "南山区")#             self.d.press("back")#             self.ClickResIdSendKeys("et_mobile", "07554243060")#             self.d.press("back")#             self.ClickText("完成")#     else:#         self.ClickResIdSendKeys("et_invoice_header", data)#         self.ClickResIdSendKeys("et_phone_no", "13599837022")#         self.ClickText("完成")# # @teststep# def update_invoice(self, ys):#     """修改发票抬头"""#     ids = 1#     # 第一次进入 需要点编辑#     self.d.xpath(#         f'//*[@resource-id="com.tcl.tclplus:id/recyclerview"]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()#     while True:#         ids += 1#         time.sleep(2)#         if self.d(text=ys).exists():#             break#         else:#             time.sleep(2)#             self.d.press('back')#             time.sleep(1)#             self.d.xpath(#                 f'//*[@resource-id="com.tcl.tclplus:id/recyclerview"]/android.view.ViewGroup[{ids}]/android.widget.ImageView[1]').click()#             continue# # @teststep# def add_address_phone(self):#     """#      新增地址时,填写信息#     """#     self.ClickResIdSendKeys("et_name", "李白")#     self.ClickResIdSendKeys("et_phone", "13699837021")#     self.ClickResId("tv_area")#     for i in ["安徽省", "安庆市", "大观区", "山口乡"]: self.ClickText(i)#     self.ClickResId("et_addr")#     self.d.send_keys("安徽省安庆市")#     self.ClickText("公司")#     self.d.press('back')#     self.ClickText("保存")# # @teststep# def binding(self, *args):#     """#     绑定qq 或者微信#     custom_qq,custom_wechat#     """#     # 如果类型是qq或者微信,先点击取消绑定,然后进行绑定#     self.d.xpath(#         f'//*[@resource-id="com.tcl.tclplus:id/{args[0]}"]/android.widget.FrameLayout['#         '1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]').click()#     time.sleep(2)#     if self.d(text="继续解绑").exists():#         self.ClickText("继续解绑")#     else:#         time.sleep(2)#         if args[0] == "custom_qq":#             time.sleep(2)#             self.ClickText("授权登录")#         time.sleep(5)#         self.ClickText("确认绑定")#         time.sleep(2)#         args[1]("绑定成功", self.sess.toast.get_message())@teststepdef local_install(self):"""下载apk 到apk路径下,然后进行区分安装"""# 先删除该文件夹下所有的文件:然后再去下载apkautomatic().del_file('../apk')# 清空文件夹下文件之后,再去下载最新apkapk_name = automatic().download_apk()if self.d.device_info["brand"] == "HUAWEI":  # 根据设备型号来执行不同的安装方式threaded1 = install_thead(self.d)threaded1.start()threaded1.app_install(path + '/apk/' + apk_name)threaded1.join()  # 等待所有线程终止time.sleep(5)os.system(f"adb -s 8KE0220413004952 shell pm clear com.tcl.tclplus")self.d.app_start("com.tcl.tclplus")elif self.d.device_info["brand"] == "Xiaomi":  # 根据设备型号来执行不同的安装方式os.system(f"adb -s d083a03e install -r -d " + filepath + "/apk/" + apk_name)time.sleep(2)os.system(f"adb -s d083a03e shell pm clear com.tcl.tclplus")self.d.app_start("com.tcl.tclplus")class install_thead(threading.Thread):def __init__(self, devices):  # 需要传入self.d 设备的信息self.drivers = devicesthreading.Thread.__init__(self, )def usb_install(self):for i in range(3):try:self.drivers(text='继续安装').click()except:passtry:self.drivers(text='打开').click()except:passdef run(self):  # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数self.usb_install()def app_install(self, app_file_path):self.drivers.app_install(app_file_path)

这里根据每个设备的self.d 的devices_info中的brand 来区分 进行不同的安装方法,采用了多线程运行,来去掉安装过程中存在警告提示等。自行添加即可。

以上就完成了 基础的定位方法封装、业务定位封装、多设备启动、多设备安装。

3.同时运行
CaseStrategy.py

import os
import unittestclass CaseStrategy:def __init__(self):self.suite_path = 'TestSuite_'self.case_path = 'TestCase'self.case_pattern = 'test*.py'def _collect_cases(self, cases, top_dir=None):suites = unittest.defaultTestLoader.discover(self.case_path,pattern=self.case_pattern, top_level_dir=top_dir)for suite in suites:for case in suite:print(f"需要执行的测试用例为:{case}")cases.addTest(case)def collect_cases(self, suite=False):"""collect casescollect cases from the giving path by case_path via the giving pattern by case_patternreturn: all cases that collected by the giving path and pattern, it is a unittest.TestSuite()"""cases = unittest.TestSuite()if suite:test_suites = []for file in os.listdir('.'):if self.suite_path in file:if os.path.isdir(file):test_suites.append(file)for test_suite in test_suites:self._collect_cases(cases, top_dir=test_suite)else:self._collect_cases(cases, top_dir=None)return cases

devices.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
多进程check_alive
Mac下需要配置  `export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`到环境变量,不然python会挂掉
'''
from Common.ReadConfig import ReadConfig
from Common.ATX_Server import ATX_Server
import uiautomator2 as u2
import subprocess
import refrom multiprocessing import Pooldef get_devices():'''get the devices from Pubilc/config.ini devices listreturn alive devices'''devices_ip = ReadConfig().get_devices_ip()print('Connect devices from config devices IP list %s' % devices_ip)pool = Pool(processes=len(devices_ip))tmp_list = []for run in devices_ip:tmp_list.append(pool.apply_async(check_alive, args=(run,)))pool.close()pool.join()devices_list = []for i in tmp_list:if i.get():devices_list.append(i.get())return devices_listdef get_online_devices():'''get the devices from ATX-Serverreturn alive devices'''devices = ATX_Server(ReadConfig().get_server_url()).online_devices()print('There has %s online devices on ATX-Server' % len(devices))if devices:pool = Pool(processes=len(devices))tmp_list = []for run in devices:tmp_list.append(pool.apply_async(check_alive, args=(run,)))pool.close()pool.join()devices_list = []for i in tmp_list:if i.get():devices_list.append(i.get())return devices_listelse:raise Exception('ATX-Server has no online device!!! ')def connect_devices():'''get the devices USB connected on PCreturn alive devices'''output = subprocess.check_output(['adb', 'devices'])pattern = re.compile(r'(?P<serial>[^\s]+)\t(?P<status>device|offline)')matches = pattern.findall(output.decode())valid_serials = [m[0] for m in matches if m[1] == 'device']if valid_serials:print('There has %s devices connected on PC: ' % len(valid_serials))pool = Pool(processes=len(valid_serials))tmp_list = []for run in valid_serials:tmp_list.append(pool.apply_async(check_alive, args=(run,)))pool.close()pool.join()devices_list = []for i in tmp_list:if i.get():devices_list.append(i.get())return devices_listif len(valid_serials) == 0:print("No available android devices detected.")return []def check_alive(device):if isinstance(device, dict):d = u2.connect(device['ip'])if d.agent_alive:d.healthcheck()if d.alive:print('%s is alive' % device['udid'])return d.device_infoelse:print('%s is not alive' % device['udid'])return Noneelse:print('The device atx_agent %s  is not alive,please checkout!' % device['udid'])return Noneelse:d = u2.connect(device)if d.agent_alive:d.healthcheck()if d.alive:print('%s is alive' % device)return d.device_infoelse:print('%s is not alive' % device)return Noneelse:print('The device atx_agent %s  is not alive,please checkout!' % device)return None# if __name__ == '__main__':# devices_ip = get_devices()# devices = connect_devices()# devices = get_online_devices()# print(devices_ip)## pool = Pool(processes=len(devices_ip))# tmp_list = []# for run in devices_ip:#     tmp_list.append(pool.apply_async(check_alive, args=(run,)))#     # alive_list.append(tmp)# pool.close()# pool.join()# print('All runs done........ ')# print(tmp_list)# for i in tmp_list:#     print(i.get())# print(get_devices())# print(get_online_devices())# print(connect_devices())

driver.py

from Common.Devices import *    # 多进程 check_alive ,Mac下需要配置  `export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`到环境变量from Common.RunCases import RunCases
from Common.ReportPath import ReportPath
from Common.BasePage import BasePage
from Common.Log import Log
from Common.ReadConfig import ReadConfig
from Common.chromedriver import ChromeDriver
from Common.Test_data import generate_test_data
from Common.Report import create_statistics_reportclass Drivers:@staticmethoddef _run_cases(run, cases):log = Log()log.set_logger(run.get_device()['model'], run.get_path() + '/' + 'client.log')log.i('udid: %s', run.get_device()['udid'])# set cls.path, it must be call before operate on any pagepath = ReportPath()path.set_path(run.get_path())# set cls.driver, it must be call before operate on any pagebase_page = BasePage()if 'ip' in run.get_device():base_page.set_driver(run.get_device()['ip'])else:base_page.set_driver(run.get_device()['serial'])try:# run casesbase_page.set_fastinput_ime()run.run(cases)base_page.set_original_ime()base_page.identify()except AssertionError as e:log.e('AssertionError, %s', e)def run(self, cases):# 根据method 获取android设备method = ReadConfig().get_method().strip()if method == 'SERVER':# get ATX-Server Online devices# devices = ATX_Server(ReadConfig().get_server_url()).online_devices()print('Checking available online devices from ATX-Server...')devices = get_online_devices()print('\nThere has %s online devices in ATX-Server' % len(devices))elif method == 'IP':# get  devices from config devices listprint('Checking available IP devices from config... ')devices = get_devices()print('\nThere has %s  devices alive in config IP list' % len(devices))elif method == 'USB':# get  devices connected PC with USBprint('Checking available USB devices connected on PC... ')devices = connect_devices()print('\nThere has %s  USB devices alive ' % len(devices))else:raise Exception('Config.ini method illegal:method =%s' % method)if not devices:print('There is no device found,test over.')return# generate test data data.json 准备测试数据generate_test_data(devices)print('Starting Run test >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')runs = []for i in range(len(devices)):runs.append(RunCases(devices[i]))# run on every device 开始执行测试pool = Pool(processes=len(runs))for run in runs:pool.apply_async(self._run_cases,args=(run, cases,))print('Waiting for all runs done........ ')pool.close()pool.join()print('All runs done........ ')ChromeDriver.kill()#  Generate statistics report  生成统计测试报告 将所有设备的报告在一个HTML中展示create_statistics_report(runs)# if __name__ == '__main__':# print(ATX_Server(ReadConfig().get_url()).online_devices())## print(get_devices())# print(ReadConfig().get_atx_server('method'))

4.持续集成,就是Jenkins ,这里不做多说。

基于ATX的app自动化相关推荐

  1. pythonapp自动化_基于python的App UI自动化环境搭建

    Android端Ui 自动化环境搭建 一,安装JDK.SDK 二,添加环境变量 Widows: 1.系统变量→新建 JAVA_HOME 变量 E:\Java\jdk1.7.0 jdk安装目录 2.系统 ...

  2. 十分钟弄懂最快的APP自动化工具uiautomator2

    相信很多使用appium做过APP自动化的人都深有感触: appium运行慢.时间长 uiautomatorviewer定位元素时得关掉appium server 在低版本的appium上获取toas ...

  3. [Appium] App自动化-元素定位

    [Appium] App自动化-元素定位及工具 一.元素定位工具简介 Web自动化是通过浏览器自带的F12键进行元素定位,但是App自动化支持三大定位工具(UIAutomatorView/Appium ...

  4. 4行代码,让app自动化框架支持 webview 混合应用操作

    移动端 app 自动化框架很多,但是有一些框架因为不支持混合应用测试,一直没有完全流行.比较典型的是经典的 Python 框架 uiautomator2, 这个框架简单好用,没有 appium 那样复 ...

  5. appium手机APP自动化定位元素

    目录 1 什么事手机app自动化,作用和特点 2 自动化原理: 3 定位元素 根据ID 根据CLASS NAME 根据ACCESSIBILITY ID Xpath 1 什么事手机app自动化,作用和特 ...

  6. [facebook-wda]搭建iOS App自动化环境

    搭建iOS App自动化环境 一.测试结构介绍 手机端的WDA Runner(WebDriverAgent)类似于appium测试框架中的 UIAutomator Server,将从客户端接收到的控制 ...

  7. APP自动化测试之录制脚本:3.运行录制的脚本

    APP自动化测试之录制脚本:3.运行录制的脚本 1.前提 基于win10专业版64位系统+jdk1.8+python3+pycharm+android SDK+appium+unittest.运行录制 ...

  8. 【Appium】基于 Appium 的 iOS 自动化

    基于 Appium 的 iOS 自动化 一.Appium 环境搭建(macOS 10.15) 1.安装 Appium-server 2.替换并配置 WebDriverAgent 3.安装 [Appiu ...

  9. 手机APP自动化 Appium教程

    Appium原理与安装 Appium 是一个移动 App (手机应用)自动化工具. 手机APP 自动化有什么用? 自动化完成一些重复性的任务 比如微信客服机器人 爬虫 自动化测试 Appium 自动化 ...

最新文章

  1. C++程序中常用的sprintf
  2. 性能调优-SQL TRACE
  3. java query api_ElasticSearch(十二) Bool Query JAVA API
  4. anaconda学习python_python深度学习笔记1-Anaconda软件安装
  5. 重装Nodejs后,webstorm代码报错问题
  6. PHP cURL应用实现模拟登录与采集使用方法详解
  7. Spring boot - 整合 Redis缓存(上)
  8. Hello World!!!
  9. 03 unix 设计哲学和流重定向
  10. 从入门到入土:[SEED-Lab]MD5碰撞试验|MD5collgen实验|linux|Ubuntu|MD5 Collision Attack Lab|详细讲解
  11. 寒假训练营第四次作业
  12. 读《学术研究,你的成功之道》读书笔记分享给各位
  13. shell脚本 文件拷贝
  14. 8 种流行的计算机视觉应用
  15. 奇异矩阵和非奇异矩阵 行列式矩阵简单理解 代数意义 几何意义 行列式的定义: 二阶行列式的几何意义: 三阶行列式的几何意义: 行列式化为对角形的几何解释: 二阶行列式乘积项的几何意
  16. 「数据架构」数据模型,数据字典,数据库模式 和ERD的比较
  17. 三消游戏算法图文详解
  18. 凛冬的寒风,吹开了电动车的遮羞布
  19. zabbix 自动发现/自定义宏
  20. 总结了Mybatis,原来知识点也没多少嘛

热门文章

  1. WIN2003安装SQL2000时验证CDKEY
  2. js 延迟几秒执行ifarme_延时加载JavaScript代码提高速度_javascript技巧 -
  3. 苹果6如何截屏_iPhone截屏操作也分三六九等?
  4. kuangbin J - Simpsons’ Hidden Talents
  5. Linux下部署tomcat,启动时8005端口无法启动
  6. 题解 洛谷 P4169 [Violet]天使玩偶/SJY摆棋子【CDQ分治】
  7. 使用ExpandableListView中的一些边边角角
  8. Java 使用IE浏览器下载文件,文件名乱码问题
  9. python解析返回值类型为xml的数据接口
  10. 团队活动 激励还是鸡肋?