diff --git a/README.md b/README.md index 886142ac..1d0f6014 100644 --- a/README.md +++ b/README.md @@ -41,12 +41,7 @@ docker run -p 8080:8080 conductoross/conductor:latest ``` The UI will be available at `http://localhost:8080` and the API at `http://localhost:8080/api` -**MacOS / Linux (one-liner):** (If you don't want to use docker, you can install and run the binary directly) -```shell -curl -sSL https://raw.githubusercontent.com/conductor-oss/conductor/main/conductor_server.sh | sh -``` - -**Conductor CLI** +**Conductor CLI** (macOS / Linux, if you don't want to use Docker) ```shell # Installs conductor cli npm install -g @conductor-oss/conductor-cli diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index ff43ecbc..83c53757 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -270,9 +270,17 @@ def __init__( self._next_restart_at: List[float] = [0.0 for _ in self.workers] # Lock to protect process list during concurrent access (monitor thread vs main thread) self._process_lock = threading.Lock() + self._processes_started = False logger.info("TaskHandler initialized") def __enter__(self): + try: + self.start_processes() + except BaseException: + # __exit__ is not called if __enter__ raises, so clean up any + # partially-spawned workers here before propagating. + self.stop_processes() + raise return self def __exit__(self, exc_type, exc_value, traceback): @@ -286,17 +294,21 @@ def stop_processes(self) -> None: with self._process_lock: self.__stop_task_runner_processes() self.__stop_metrics_provider_process() + self._processes_started = False logger.info("Stopped worker processes...") self.queue.put(None) self.logger_process.terminate() def start_processes(self) -> None: + if self._processes_started: + return logger.info("Starting worker processes...") freeze_support() self._monitor_stop_event.clear() self.__start_task_runner_processes() self.__start_metrics_provider_process() self.__start_monitor_thread() + self._processes_started = True logger.info("Started all processes") def join_processes(self) -> None: diff --git a/tests/unit/automator/test_task_handler_coverage.py b/tests/unit/automator/test_task_handler_coverage.py index b97e0083..dd85af22 100644 --- a/tests/unit/automator/test_task_handler_coverage.py +++ b/tests/unit/automator/test_task_handler_coverage.py @@ -663,12 +663,19 @@ def test_context_manager_enter(self, mock_process_class, mock_import, mock_loggi @patch('conductor.client.automator.task_handler._setup_logging_queue') @patch('importlib.import_module') - def test_context_manager_exit(self, mock_import, mock_logging): + @patch('conductor.client.automator.task_handler.Process') + def test_context_manager_exit(self, mock_process_class, mock_import, mock_logging): """Test context manager __exit__ method.""" mock_queue = Mock() mock_logger_process = Mock() mock_logging.return_value = (mock_logger_process, mock_queue) + mock_process = Mock() + mock_process.terminate = Mock() + mock_process.kill = Mock() + mock_process.is_alive = Mock(return_value=False) + mock_process_class.return_value = mock_process + worker = ClassWorker('test_task') handler = TaskHandler( workers=[worker], @@ -679,10 +686,16 @@ def test_context_manager_exit(self, mock_import, mock_logging): # Override the queue and logger_process with fresh mocks handler.queue = Mock() handler.logger_process = Mock() + handler.logger_process.is_alive = Mock(return_value=False) + handler.metrics_provider_process = Mock() + handler.metrics_provider_process.terminate = Mock() + handler.metrics_provider_process.is_alive = Mock(return_value=False) # Mock terminate on all processes for process in handler.task_runner_processes: process.terminate = Mock() + process.kill = Mock() + process.is_alive = Mock(return_value=False) with handler: pass