Source code for siriushla.widgets.dialog.progress_dialog

"""Dialog that handle thread that implement a task interface.

Task QThread Interface:
- itemDone: Signal
- size: method that return task size
- exit_task: set quit_thread flag True
"""
import time
from qtpy.QtCore import Signal, QThread
from qtpy.QtWidgets import QDialog, QVBoxLayout, QLabel, QPushButton, \
    QProgressBar


[docs] class ProgressDialog(QDialog): """Progress dialog. Parameters ---------- label - string or list of strings task - QThread or list of QThread (implementing task interface) parent - QObject """ canceled = Signal() def __init__(self, label, task, parent=None): """Constructor.""" super().__init__(parent) self._label = label self._task = task if (isinstance(self._label, (list, tuple)) and isinstance(self._task, (list, tuple)) and len(self._label) == len(self._task)) or \ (isinstance(self._label, str) and isinstance(self._task, QThread)): pass else: raise ValueError() self._setup_ui() def _setup_ui(self): self.layout = QVBoxLayout() self.dlg_label = QLabel(self) self.cur_item_label = QLabel(self) self.progress_bar = QProgressBar(self) self.cancel = QPushButton('Cancel', self) self.layout.addWidget(self.dlg_label) self.layout.addWidget(self.cur_item_label) self.layout.addWidget(self.progress_bar) self.layout.addWidget(self.cancel) self.setLayout(self.layout) # Set initial value self.progress_bar.setValue(0) # Set progress bar limits and connect signals self.progress_bar.setMinimum(0) if hasattr(self._task, '__iter__'): self.dlg_label.setText(self._label[0]) self._task[0].currentItem.connect(self.cur_item_label.setText) self._task[0].itemDone.connect(self.inc_value) pb_max = self._task[0].size() for i in range(1, len(self._task)): self._task[i].currentItem.connect(self.cur_item_label.setText) self._task[i].itemDone.connect(self.inc_value) self._task[i-1].completed.connect(self._update_label) self._task[i-1].completed.connect(self._task[i].start) self._task[i-1].finished.connect(self._task[i-1].deleteLater) pb_max += self._task[i].size() self.progress_bar.setMaximum(pb_max) self._task[-1].completed.connect(lambda: self._exit_dlg(1)) self._task[-1].finished.connect(self._task[-1].deleteLater) else: self.dlg_label.setText(self._label) self._task.currentItem.connect(self.cur_item_label.setText) self._task.itemDone.connect(self.inc_value) self._task.finished.connect(self._task.deleteLater) self._task.completed.connect(lambda: self._exit_dlg(1)) self.progress_bar.setMaximum(self._task.size()) self.cancel.pressed.connect(self.canceled.emit) self.canceled.connect(lambda: self._exit_dlg(0)) self.progress_bar.valueChanged.connect(self._is_finished) def _update_label(self): next_idx = self._label.index(self.dlg_label.text()) + 1 if next_idx < len(self._label): self.dlg_label.setText(self._label[next_idx]) def _exit_dlg(self, result): if hasattr(self._task, '__iter__'): for task in self._task: task.exit_task() self._wait_task(self._task[-1]) else: self._task.exit_task() self._wait_task(self._task) if result == 1: self.accept() elif result == 0: self.reject() def _wait_task(self, task): init = time.time() try: while task.isRunning(): time.sleep(0.1) if time.time() - init > 10: self._exit_dlg(0) raise Exception('Thread will not leave') except RuntimeError: pass def _is_finished(self): if self.progress_bar.value() == self.progress_bar.maximum(): self._exit_dlg(1)
[docs] def set_value(self, value): """Set progress bar value.""" self.progress_bar.setValue(value)
[docs] def inc_value(self): """Increase value.""" self.progress_bar.setValue(self.progress_bar.value()+1)
[docs] def exec_(self): """Override.""" if hasattr(self._task, '__iter__'): self._task[0].start() else: self._task.start() return super().exec_()