#!/usr/bin/env python
# -*- coding: utf-8 -*-
[docs]__copyright__ = "Copyright (c) 2024 Nuanguang Gu(Sunny) Reserved"
[docs]__author__ = "Nuanguang Gu(Sunny)"
[docs]__email__ = "nuanguang.gu@aliyun.com"
import os
from enum import IntEnum
from threading import Event, Lock
from functools import wraps
from testbot.utilities.time import get_local_time
[docs]class StepResult(IntEnum):
"""
表示节点的状态
"""
[docs]class NodeType(IntEnum):
"""
表示节点的类型
"""
[docs]class ResultNode(object):
"""
测试结果节点
"""
def __init__(self, header, status=None, message="", parent=None,
node_type=NodeType.Other):
self.status = StepResult.INFO if status is None else status
self.header = header
self.message = message
self.children = list()
self.parent = parent
self.type = node_type
self.timestamp = get_local_time()
self.log = None
[docs] def add_child(self, header, status=StepResult.INFO, message="",
node_type=NodeType.Other):
"""
添加新的子节点并返回该节点
"""
new_node = ResultNode(header, message=message, parent=self,
node_type=node_type)
# 在case或者step类型的节点中,只允许类型是Step
if self.type in [NodeType.Step, NodeType.Case]:
new_node.type = NodeType.Step
self.children.append(new_node)
new_node.set_status(status)
return new_node
[docs] def set_status(self, status):
"""
设置当前节点的状态,并且同时更新父节点的状态
"""
if self.type == NodeType.Other:
# 对于类型是非case或者是step的节点,不作状态设置
return
if status == StepResult.INFO:
return
if self.status in [StepResult.INFO, StepResult.PASS]:
self.status = status
self.parent.set_status(status)
[docs] def add(self, status, header, message=""):
"""
简化的add方法,提供给事件驱动
"""
self.add_child(header, status, message, NodeType.Step)
if self.log:
self.log.info(header)
[docs] def get_test_point_stats(self):
stats_pass = 0
stats_fail = 0
stats_error = 0
stats_warning = 0
stats_exception = 0
for child in self.children:
child_stats = child.get_test_point_stats()
stats_pass += child_stats[0]
stats_fail += child_stats[1]
stats_error += child_stats[2]
stats_warning += child_stats[3]
stats_exception += child_stats[4]
if not any(self.children):
if self.status == StepResult.PASS:
stats_pass = 1
elif self.status == StepResult.FAIL:
stats_fail = 1
elif self.status == StepResult.ERROR:
stats_error = 1
elif self.status == StepResult.WARNING:
stats_warning = 1
elif self.status == StepResult.EXCEPTION:
stats_exception = 1
return stats_pass, stats_fail, stats_error, stats_warning, stats_exception
[docs] def get_test_case_stats(self):
stats_pass = 0
stats_fail = 0
stats_error = 0
stats_warning = 0
stats_exception = 0
if self.type == NodeType.Case:
if self.status == StepResult.PASS:
stats_pass = 1
elif self.status == StepResult.FAIL:
stats_fail = 1
elif self.status == StepResult.ERROR:
stats_error = 1
elif self.status == StepResult.WARNING:
stats_warning = 1
elif self.status == StepResult.EXCEPTION:
stats_exception = 1
else:
for child in self.children:
child_stats = child.get_test_case_stats()
stats_pass += child_stats[0]
stats_fail += child_stats[1]
stats_error += child_stats[2]
stats_warning += child_stats[3]
stats_exception += child_stats[4]
return stats_pass, stats_fail, stats_error, stats_warning, stats_exception
@property
[docs] def is_leaf(self):
return any(self.children)
[docs] def to_dict(self):
"""
将结果节点输出成字典结构
"""
rv = dict()
rv["header"] = self.header
rv["status"] = self.status.value
rv["message"] = self.message
rv["type"] = self.type.value
rv["children"] = list()
rv["timestamp"] = self.timestamp
for child in self.children:
rv["children"].append(child.to_dict())
return rv
[docs] def to_text(self, indent=0):
"""
将结果生成文本类型的结构
"""
rv = f"{self._get_intent(indent)}"
if self.type == NodeType.Case:
rv += "<测试用例> "
elif self.type == NodeType.TestList:
rv += "<测试列表> "
elif self.type == NodeType.Step:
rv += "<测试步骤> "
rv += f"【步骤】:{self.header},【失败描述】:{self.message}"
if self.type in [NodeType.Case, NodeType.Step]:
rv += self._get_dot_line(rv, 120)
rv += f"{self.status.name}"
rv += os.linesep
for child in self.children:
rv += child.to_text(indent+1)
return rv
[docs] def _get_intent(self, indent):
return " " * (indent * 2)
[docs] def _get_dot_line(self, line, line_max):
if len(line) >= line_max:
return line
else:
return "-" * (line_max - len(line))
[docs]def locker(lock):
def outer(func):
@wraps(func)
def inner(*args, **kwargs):
try:
lock.acquire()
return func(*args, **kwargs)
finally:
lock.release()
return inner
return outer
[docs]class ResultReporter(object):
def __init__(self, logger):
self.halt_on_failure = False
self.halt_on_exception = False
self.halt_event = Event()
self.root = ResultNode("Root")
self.recent_case = None
self.recent_node = self.root
self.recent_list = None
self.logger = logger
self.case_logger = None
[docs] def search_result(self, case_name):
"""
搜索给定的测试用例名称的测试结果
"""
self._search_result(self.root, case_name)
[docs] def _search_result(self, node, case_name):
if node.type == NodeType.Step:
return None
for child in node.children:
if child.header == case_name:
return child.status
else:
rv = self._search_result(child, case_name)
if rv:
return rv
else:
return None
@locker(my_lock)
[docs] def add_node(self, header, message="", status=StepResult.INFO, node_type=NodeType.Other):
self.recent_node = self.recent_node.add_child(header=header,
status=status,
message=message,
node_type=node_type)
self._log_info(f"Result Node {header}, Message: {message}")
return self.recent_node
@locker(my_lock)
[docs] def pop(self):
if self.recent_node.parent:
self.recent_node = self.recent_node.parent
@locker(my_lock)
[docs] def add_test(self, case_name):
self.recent_node = self.recent_node.add_child(header=case_name,
node_type=NodeType.Case)
self.recent_case = self.recent_node
self._log_info(f"【测试用例】 {case_name}")
@locker(my_lock)
[docs] def end_test(self):
if self.recent_case is None:
return
self.recent_node = self.recent_case.parent
self.recent_case = None
@locker(my_lock)
[docs] def add_list(self, list_name):
self.recent_node = self.recent_node.add_child(header=list_name,
node_type=NodeType.TestList)
self.recent_list = self.recent_node
self._log_info(f"【测试列表】 {list_name}")
@locker(my_lock)
[docs] def end_list(self):
if self.recent_list is None:
return
self.recent_node = self.recent_list.parent
self.recent_list = None
@locker(my_lock)
[docs] def add_step_group(self, group_name):
self.recent_node = self.recent_node.add_child(header=group_name, node_type=NodeType.Step)
self._log_info(f"【测试步骤组】 {group_name}")
@locker(my_lock)
[docs] def add_event_group(self, group_name):
rv = self.recent_node.add_child(header=group_name,
node_type=NodeType.Step)
rv.log = self.case_logger if self.case_logger is not None else self.logger
self._log_info(f"【事件】 {group_name}")
return rv
@locker(my_lock)
[docs] def end_step_group(self):
if self.recent_node.parent:
self.recent_node = self.recent_node.parent
[docs] def add_precheck_result(self, result, headline):
pass
[docs] def is_high_priority_passed(self, priority):
pass
@locker(my_lock)
[docs] def add(self, status: StepResult, headline, message=""):
self.recent_node.add_child(header=headline, message=message, node_type=NodeType.Step, status=status)
self._log_info(f"【步骤】: {headline},【结果】{status.name}:,【失败描述】:{message}")
self.halt_event.clear()
if status == StepResult.FAIL and self.halt_on_failure:
self.halt_event.wait()
elif status == StepResult.EXCEPTION and self.halt_on_exception:
self.halt_event.wait()
[docs] def _log_info(self, message):
if self.case_logger:
self.case_logger.info(message)
else:
self.logger.info(message)
if __name__ == "__main__":
import logging
[docs] rr = ResultReporter(logging)
# 添加一个测试列表节点
rr.add_list("Test List 1")
# 添加一个测试用例节点
rr.add_test("Test Case 1")
# 添加一个SETUP步骤节点
rr.add_step_group("SETUP")
# 添加一些步骤
rr.add(StepResult.PASS, "Do Something Setup", "I'm doing something")
rr.end_step_group()
# 添加一个测试步骤节点
rr.add_step_group("TEST")
rr.add_step_group("Login to website")
rr.add(StepResult.PASS, "Input Username", "Username is admin")
rr.add(StepResult.PASS, "Input Password", "Password is admin")
rr.add(StepResult.FAIL, "Login", "Login is failed")
rr.end_step_group()
# 这里我们少了一个end_group, 但是end_test会把我们带回正确的位置。
rr.end_test()
# 第二个测试用例
rr.add_test("Test Case 2")
rr.add_step_group("SETUP")
rr.add(StepResult.PASS, "Do Something Setup", "I'm doing something")
rr.end_step_group()
rr.add_step_group("TEST")
rr.add_step_group("Login to website")
rr.add(StepResult.PASS, "Input Username", "Username is admin")
rr.add(StepResult.PASS, "Input Password", "Password is admin")
rr.end_step_group()
rr.end_test()
rr.add_list("Sub Test List")
rr.add_test("Test Case 3")
rr.add_step_group("SETUP")
rr.add(StepResult.PASS, "Do Something Setup", "I'm doing something")
rr.end_step_group()
rr.add_step_group("TEST")
rr.add_step_group("Login to website")
rr.add(StepResult.PASS, "Input Username", "Username is admin")
rr.add(StepResult.PASS, "Input Password", "Password is admin")
rr.end_step_group()
rr.end_test()
rr.end_list()
rr.add_test("Test Case 4")
rr.add_step_group("SETUP")
rr.add(StepResult.PASS, "Do Something Setup", "I'm doing something")
rr.end_step_group()
rr.add_step_group("TEST")
rr.add_step_group("Login to website")
rr.add(StepResult.PASS, "Input Username", "Username is admin")
rr.add(StepResult.PASS, "Input Password", "Password is admin")
rr.end_step_group()
rr.end_test()
rr.end_list()
print(rr.root.to_dict())
print(rr.root.to_text())
tp_stats = rr.root.get_test_point_stats()
print(f"PASS: {tp_stats[0]}, FAIL: {tp_stats[1]}")
print(f"ERROR: {tp_stats[2]}, WARNING: {tp_stats[3]}, EXCEPTION: {tp_stats[4]}")
tc_stats = rr.root.get_test_case_stats()
print(f"PASS: {tc_stats[0]}, FAIL: {tc_stats[1]}")
print(f"ERROR: {tc_stats[2]}, WARNING: {tc_stats[3]}, EXCEPTION: {tc_stats[4]}")
# import json
# print(json.dumps(rr.root.to_dict(), indent=4))