社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
参考别人的:https://blog.csdn.net/u013040887/article/details/80878561
重写Thread线程类参考官方文档:https://docs.python.org/zh-cn/3/library/threading.html#module-threading
我自己写的代码:
"""
功能:封装一个自定义的线程管理类【兼容线程的启动和停止】
重写Thread线程类参考官方文档:https://docs.python.org/zh-cn/3/library/threading.html#module-threading
"""
from threading import Thread, Event
import time
from app.libs.utils.util_global import UtilGlobal
class OwnThread(Thread):
def __init__(self, target, name=None, group=None,
args=(), kwargs=None, daemon=None):
Thread.__init__(self, group=group, target=target, name=name,
args=args, kwargs=kwargs, daemon=daemon)
self.target = target
self.args = args
self.kwargs = UtilGlobal.check_none_type_to_empty_dict_else_original(kwargs)
self.__running = Event()
self.__running.set()
def run(self):
"""
功能:重写父类的run方法【当外部调用own_thread.start()开启线程就会自动运行run方法】
特殊说明:父类run方法的三个属性【_target、_args、_kwargs】都是私有属性【下划线开头】,无法继承!
所以这里不能直接super().run(),而是直接调用target方法即可
"""
while self.__running.isSet():
UtilGlobal.check_is_function_or_method_type(self.target)
self.target(*self.args, **self.kwargs)
time.sleep(1)
def stop(self):
"""
功能:封装一个停止线程的方法
:return:
"""
self.__running.clear()
def test():
print("执行线程中...")
a = OwnThread(target=test, name="test")
a.start()
time.sleep(5)
a.stop()
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!