多进程执行分布式自动化测试

场景:
进入搜狗,输入搜索关键字进行搜索
利用多进程分布式实现

成都创新互联公司企业建站,十多年网站建设经验,专注于网站建设技术,精于网页设计,有多年建站和网站代运营经验,设计师为客户打造网络企业风格,提供周到的建站售前咨询和贴心的售后服务。对于成都网站设计、成都做网站中不同领域进行深入了解和探索,创新互联在网站建设中充分了解客户行业的需求,以灵动的思维在网页中充分展现,通过对客户行业精准市场调研,为客户提供的解决方案。

from multiprocessing import Pool
import os, time
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from multiprocessing import Manager, current_process
import traceback

#定义测试行为函数:此处为打开搜狗搜索内容
def node_task(name, lock, arg, successTestCases, failTestCases):
    """

    :param name: 执行进程名
    :param lock:进程间的共享资源锁
    :param arg:node节点计算机、浏览器字典 ,如:{"node": "http://127.0.0.1:6666/wd/hub", "browserName": "chrome"}
    :param successTestCases:成功执行用例列表
    :param failTestCases:失败用例列表
    :return:返回成功执行、失败执行的用例列表
    """
    procName = current_process().name

    print("当前进程名:",procName)
    time.sleep(1.2)
    #获取节点计算机地址
    node_host = arg["node"]
    #获取浏览器
    browser = arg["browserName"]
    print(arg["node"])
    print(arg["browserName"])
    print('Run task %s (%s)...\n' % (name, os.getpid()))
    #获取用例执行的开始时间
    start = time.time()
    #获取driver对象
    driver = webdriver.Remote(
        #节点计算机
        command_executor="%s" %arg["node"],
        desired_capabilities={
            #浏览器
            "browserName": "%s" %arg["browserName"],
            "video": "True",
            "platform": "WINDOWS"})

    try:
        driver.maximize_window()
        driver.get("http://www.sogou.com")
        assert "搜狗" in driver.title
        element = driver.find_element_by_id("query")
        element.send_keys("测试开发")
        element.send_keys(Keys.RETURN)
        time.sleep(3)
        assert "测试开发" in driver.page_source
        #获取共享锁
        lock.acquire()
        #用例执行成功,用例加入成功列表
        successTestCases.append("TestCase: " + str(name))
        #释放共享锁
        lock.release()
        print("TestCase: " + str(name) + " done!")

    except AssertionError as e:
        #断言失败,打印异常信息
        print("AssertionError occur!" " testCase " + str(name))
        print(traceback.print_exc())
        #保存异常现场图片
        driver.save_screenshot("e:\\screenshot" + str(name) + ".png")
        #获取共享锁
        lock.acquire()
        #存储失败用例信息
        failTestCases.append("TestCase: " + str(name))
        #释放共享锁
        lock.release()
        print("测试用例执行失败")
    except Exception as e:
        print("Exception occur!")
        print(traceback.print_exc())
        driver.save_screenshot("e:\\screenshot" + str(name) + ".png")
        #获取共享锁,存储失败用例信息,并释放锁
        with lock:
            failTestCases.append("TestCase: " + str(name))
        print("测试 用例执行失败")

    finally:
        #退出驱动,并关闭所有窗口
        driver.quit()
    end = time.time()
    #打印用例执行时间
    print("Tast %s run %.2f seconds." % (name, (end - start)))

#封装多进程执行函数
def run(nodeSeq):
    """
    :param nodeSeq: 节点机器和浏览器字典,列表
    :return: 返回成功、失败用例列表successTestCases, failTestCases
    """
    #定义共享manager对象
    manager = Manager()
    #创建进程间共享的用例执行成功列表
    successTestCases = manager.list([])
    # 创建进程间共享的用例执行失败列表
    failTestCases = manager.list([])

    #创建一个共享资源锁,各进程共享
    lock = manager.Lock()
    #打印父进程ID
    print("Parent process %s." % os.getpid())
    #创建包含3个进程的进程池
    p = Pool(processes=3)
    #获取用例数量
    testCaseNumber = len(nodeSeq)
    #循环索引遍历用例列表,多进程执行node_task函数(搜狗搜索)
    for i in range(testCaseNumber):
        p.apply_async(node_task, args=(i + 1, lock, nodeSeq[i], successTestCases, failTestCases))

    print("Waiting for all subprocesses done...")
    #关闭p
    p.close()
    #等待各个子进程执行结束
    p.join()

    return successTestCases, failTestCases

#封装写测试结果函数
def resultReport(testCaseNumber, successTestCases, failTestCases):
    """

    :param testCaseNumber: 用例个数
    :param successTestCases: 成功执行列表
    :param failTestCases: 失败执行列表
    :return: 无
    """
    print("测试报告: \n")
    print("共执行测试用例:" + str(testCaseNumber) + "个\n")
    print("执行成功的测试用例: " + str(len(successTestCases)) + "个")
    if len(successTestCases) > 0:
        for t in successTestCases:
            print(t)
    else:
        print("没有执行成功的测试用例")
    print("执行失败的测试用例: " + str(len(failTestCases)) + "个")
    if len(failTestCases) > 0:
        for t in failTestCases:
            print(t)
    else:
        print("没有执行失败的测试用例")

if __name__ == "__main__":
    nodeList = [
        {"node": "http://127.0.0.1:6666/wd/hub", "browserName": "internet explorer"},
        {"node": "http://127.0.0.1:6666/wd/hub", "browserName": "chrome"},
        {"node": "http://127.0.0.1:6666/wd/hub", "browserName": "firefox"}]

    testCaseNumber = len(nodeList)
    #执行多进程函数
    successTestCases, failTestCases = run(nodeList)
    print("All processes done")
    #写测试结果
    resultReport(testCaseNumber, successTestCases, failTestCases)

名称栏目:多进程执行分布式自动化测试
标题路径:http://csdahua.cn/article/gicesp.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流