Add typing for TeePopen

This commit is contained in:
Mikhail f. Shiryaev 2022-11-14 18:59:01 +01:00
parent 8838102657
commit 02b8da2a0f
No known key found for this signature in database
GPG Key ID: 4B02ED204C7D93F4

View File

@ -3,6 +3,7 @@
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
from time import sleep
from typing import Optional
import logging
import os
import sys
@ -18,7 +19,7 @@ class TeePopen:
self.command = command
self.log_file = log_file
self.env = env
self.process = None
self._process = None # type: Optional[Popen]
self.timeout = timeout
def _check_timeout(self):
@ -51,7 +52,7 @@ class TeePopen:
return self
def __exit__(self, t, value, traceback):
for line in self.process.stdout:
for line in self.process.stdout: # type: ignore
sys.stdout.write(line)
self.log_file.write(line)
@ -59,8 +60,18 @@ class TeePopen:
self.log_file.close()
def wait(self):
for line in self.process.stdout:
for line in self.process.stdout: # type: ignore
sys.stdout.write(line)
self.log_file.write(line)
return self.process.wait()
@property
def process(self) -> Popen:
if self._process is not None:
return self._process
raise AttributeError("process is not created yet")
@process.setter
def process(self, process: Popen) -> None:
self._process = process