diff --git a/resources/flashgrep/flashgrep.exe b/resources/flashgrep/flashgrep.exe index feb9d4035..9b1078a40 100644 Binary files a/resources/flashgrep/flashgrep.exe and b/resources/flashgrep/flashgrep.exe differ diff --git a/src/apps/desktop/src/api/app_state.rs b/src/apps/desktop/src/api/app_state.rs index 1a3855a5c..9579b05b5 100644 --- a/src/apps/desktop/src/api/app_state.rs +++ b/src/apps/desktop/src/api/app_state.rs @@ -1,5 +1,6 @@ //! Application state management +use crate::api::workspace_activation::spawn_workspace_background_warmup; use bitfun_core::agentic::side_question::SideQuestionRuntime; use bitfun_core::agentic::{agents, tools}; use bitfun_core::infrastructure::ai::{AIClient, AIClientFactory}; @@ -201,55 +202,11 @@ impl AppState { .map(|workspace| workspace.root_path.clone()); if let Some(workspace_path) = initial_workspace_path.clone() { - let skip_startup_snapshot_restore = initial_workspace - .as_ref() - .map(|workspace| { - matches!( - workspace.workspace_kind, - bitfun_core::service::workspace::WorkspaceKind::Remote - ) - }) - .unwrap_or(false); - if skip_startup_snapshot_restore { - log::debug!( - "Skipping snapshot restore on startup for remote workspace: path={}", - workspace_path.display() - ); - } else { - if let Err(e) = - bitfun_core::service::snapshot::initialize_snapshot_manager_for_workspace( - workspace_path.clone(), - None, - ) - .await - { - log::warn!( - "Failed to restore snapshot system on startup: path={}, error={}", - workspace_path.display(), - e - ); - } - } if let Err(e) = ai_rules_service.set_workspace(workspace_path).await { log::warn!("Failed to restore AI rules workspace on startup: {}", e); } } - if let Some(workspace_info) = initial_workspace { - if workspace_info.workspace_kind != workspace::WorkspaceKind::Remote { - if let Err(e) = workspace_search_service - .open_repo(&workspace_info.root_path) - .await - { - log::warn!( - "Failed to restore workspace search repository session on startup: path={}, error={}", - workspace_info.root_path.display(), - e - ); - } - } - } - // Initialize SSH Remote services synchronously so they're ready before app starts let ssh_data_dir = dirs::data_local_dir() .unwrap_or_else(|| std::path::PathBuf::from(".")) @@ -358,6 +315,10 @@ impl AppState { announcement_scheduler, }; + if let Some(workspace_info) = initial_workspace { + spawn_workspace_background_warmup(&app_state, workspace_info); + } + log::info!("AppState initialized successfully"); Ok(app_state) } diff --git a/src/apps/desktop/src/api/commands.rs b/src/apps/desktop/src/api/commands.rs index 3e770377d..0eb615b29 100644 --- a/src/apps/desktop/src/api/commands.rs +++ b/src/apps/desktop/src/api/commands.rs @@ -12,6 +12,7 @@ use crate::api::search_api::{ group_search_results, search_file_contents_via_workspace_search, search_metadata_from_content_result, should_use_workspace_search, SearchMetadataResponse, }; +use crate::api::workspace_activation::spawn_workspace_background_warmup; use bitfun_core::infrastructure::{ BatchedFileSearchProgressSink, FileSearchResult, FileSearchResultGroup, FileTreeNode, SearchMatchType, @@ -608,8 +609,18 @@ async fn clear_active_workspace_context(state: &State<'_, AppState>, app: &AppHa #[cfg(not(target_os = "macos"))] let _ = app; + let previous_workspace_path = state.workspace_path.read().await.clone(); *state.workspace_path.write().await = None; + if let Some(previous_workspace_path) = previous_workspace_path { + let root_str = previous_workspace_path.to_string_lossy().to_string(); + if !is_remote_path(root_str.trim()).await { + state + .workspace_search_service + .schedule_repo_release(previous_workspace_path); + } + } + if let Some(ref pool) = state.js_worker_pool { pool.stop_all().await; } @@ -649,34 +660,6 @@ async fn apply_active_workspace_context( // Remote workspace roots are POSIX paths on the SSH host — not writable local directories on // Windows. Snapshot hooks already skip file tracking for registered remote paths; avoid // creating `/.bitfun` (or drive root) here which fails with access denied. - let root_str = workspace_info.root_path.to_string_lossy().to_string(); - let skip_local_snapshot = workspace_info.workspace_kind == WorkspaceKind::Remote - || is_remote_path(root_str.trim()).await; - if !skip_local_snapshot { - if let Err(e) = bitfun_core::service::snapshot::initialize_snapshot_manager_for_workspace( - workspace_info.root_path.clone(), - None, - ) - .await - { - warn!( - "Failed to initialize snapshot system: path={}, error={}", - workspace_info.root_path.display(), - e - ); - } - } else { - debug!( - "Skipping local snapshot manager init for remote/non-local workspace root_path={}", - workspace_info.root_path.display() - ); - } - - state - .agent_registry - .load_custom_subagents(&workspace_info.root_path) - .await; - if let Err(e) = state .ai_rules_service .set_workspace(workspace_info.root_path.clone()) @@ -689,19 +672,7 @@ async fn apply_active_workspace_context( ); } - if workspace_info.workspace_kind != WorkspaceKind::Remote { - if let Err(e) = state - .workspace_search_service - .open_repo(&workspace_info.root_path) - .await - { - warn!( - "Failed to open workspace search repository session: path={}, error={}", - workspace_info.root_path.display(), - e - ); - } - } + spawn_workspace_background_warmup(&*state, workspace_info.clone()); #[cfg(target_os = "macos")] { diff --git a/src/apps/desktop/src/api/mod.rs b/src/apps/desktop/src/api/mod.rs index ffe5f97fe..5f7f81a83 100644 --- a/src/apps/desktop/src/api/mod.rs +++ b/src/apps/desktop/src/api/mod.rs @@ -43,5 +43,6 @@ pub mod subagent_api; pub mod system_api; pub mod terminal_api; pub mod tool_api; +pub mod workspace_activation; pub use app_state::{AppState, AppStatistics, HealthStatus, RemoteWorkspace}; diff --git a/src/apps/desktop/src/api/workspace_activation.rs b/src/apps/desktop/src/api/workspace_activation.rs new file mode 100644 index 000000000..03682c2dc --- /dev/null +++ b/src/apps/desktop/src/api/workspace_activation.rs @@ -0,0 +1,118 @@ +use crate::api::app_state::AppState; +use bitfun_core::service::remote_ssh::workspace_state::is_remote_path; +use bitfun_core::service::workspace::{WorkspaceInfo, WorkspaceKind}; +use log::{debug, info, warn}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::RwLock; + +pub fn spawn_workspace_background_warmup(state: &AppState, workspace_info: WorkspaceInfo) { + let workspace_path = state.workspace_path.clone(); + let agent_registry = state.agent_registry.clone(); + let workspace_search_service = state.workspace_search_service.clone(); + + tokio::spawn(async move { + warm_workspace_background_services( + workspace_path, + agent_registry, + workspace_search_service, + workspace_info, + ) + .await; + }); +} + +async fn warm_workspace_background_services( + workspace_path: Arc>>, + agent_registry: Arc, + workspace_search_service: Arc, + workspace_info: WorkspaceInfo, +) { + let started_at = Instant::now(); + let target_path = workspace_info.root_path.clone(); + let root_str = target_path.to_string_lossy().to_string(); + let skip_local_snapshot = workspace_info.workspace_kind == WorkspaceKind::Remote + || is_remote_path(root_str.trim()).await; + + if !skip_local_snapshot && is_workspace_active(&workspace_path, &target_path).await { + let snapshot_started_at = Instant::now(); + if let Err(error) = + bitfun_core::service::snapshot::initialize_snapshot_manager_for_workspace( + target_path.clone(), + None, + ) + .await + { + warn!( + "Failed to initialize snapshot system during workspace warmup: path={}, error={}", + target_path.display(), + error + ); + } else { + debug!( + "Workspace snapshot warmup completed: path={}, elapsed_ms={}", + target_path.display(), + snapshot_started_at.elapsed().as_millis() + ); + } + } + + if is_workspace_active(&workspace_path, &target_path).await { + let subagents_started_at = Instant::now(); + agent_registry.load_custom_subagents(&target_path).await; + debug!( + "Workspace custom subagent warmup completed: path={}, elapsed_ms={}", + target_path.display(), + subagents_started_at.elapsed().as_millis() + ); + } + + if workspace_info.workspace_kind != WorkspaceKind::Remote + && is_workspace_active(&workspace_path, &target_path).await + { + let search_started_at = Instant::now(); + match workspace_search_service.open_repo(&target_path).await { + Ok(_) => { + let still_active = is_workspace_active(&workspace_path, &target_path).await; + if !still_active { + workspace_search_service.schedule_repo_release(target_path.clone()); + debug!( + "Released flashgrep warmup session for inactive workspace: path={}", + target_path.display() + ); + } + info!( + "Workspace search warmup completed: path={}, elapsed_ms={}, active_after_open={}", + target_path.display(), + search_started_at.elapsed().as_millis(), + still_active + ); + } + Err(error) => { + warn!( + "Failed to open workspace search repository session during warmup: path={}, error={}", + target_path.display(), + error + ); + } + } + } + + debug!( + "Workspace background warmup completed: path={}, total_elapsed_ms={}", + target_path.display(), + started_at.elapsed().as_millis() + ); +} + +async fn is_workspace_active( + workspace_path: &Arc>>, + target_path: &Path, +) -> bool { + workspace_path + .read() + .await + .as_ref() + .is_some_and(|current| current == target_path) +} diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index cb0cd81d1..549b52265 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -1017,18 +1017,11 @@ fn perform_process_exit_cleanup() -> bool { } }); - match shutdown_thread { - Ok(handle) => { - if handle.join().is_err() { - log::warn!("Workspace search shutdown thread panicked"); - } - } - Err(error) => { - log::warn!( - "Failed to spawn workspace search shutdown thread: {}", - error - ); - } + if let Err(error) = shutdown_thread { + log::warn!( + "Failed to spawn workspace search shutdown thread: {}", + error + ); } } bitfun_core::util::process_manager::cleanup_all_processes(); diff --git a/src/crates/core/src/service/search/flashgrep/client.rs b/src/crates/core/src/service/search/flashgrep/client.rs new file mode 100644 index 000000000..2eab5592d --- /dev/null +++ b/src/crates/core/src/service/search/flashgrep/client.rs @@ -0,0 +1,751 @@ +use std::{ + collections::HashMap, + ffi::OsString, + process::Stdio, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use serde::Serialize; +use tokio::{ + io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, + process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}, + sync::{oneshot, Mutex}, + time::{sleep, timeout}, +}; + +use super::{ + error::{AppError, Result}, + protocol::{ + ClientCapabilities, ClientInfo, GlobParams, InitializeParams, RepoRef, Request, + RequestEnvelope, Response, ResponseEnvelope, SearchParams, ServerMessage, TaskRef, + }, + types::{ + GlobOutcome, GlobRequest, OpenRepoParams, RepoStatus, SearchOutcome, SearchRequest, + TaskStatus, + }, +}; + +const JSONRPC_VERSION: &str = "2.0"; +const CLIENT_NAME: &str = "bitfun-workspace-search"; +const REPO_CLOSE_TIMEOUT: Duration = Duration::from_secs(2); +const SHUTDOWN_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2); + +type PendingResponseSender = oneshot::Sender>; +type PendingResponses = HashMap; + +#[derive(Debug, Clone)] +pub(crate) struct ManagedClient { + daemon_program: Option, + start_timeout: Duration, + retry_interval: Duration, + shutting_down: Arc, + state: Arc>, + start_guard: Arc>, +} + +#[derive(Debug)] +pub(crate) struct RepoSession { + repo_id: String, + client: ManagedClient, +} + +#[derive(Debug, Default)] +struct ManagedClientState { + daemon: Option>, +} + +#[derive(Debug)] +struct AsyncDaemonClient { + child: Mutex>, + writer: Mutex>, + shared: Arc, + next_id: AtomicU64, + reader_task: Mutex>>, + stderr_task: Mutex>>, +} + +#[derive(Debug, Default)] +struct DaemonShared { + pending: Mutex, + closed: AtomicBool, +} + +impl Default for ManagedClient { + fn default() -> Self { + Self { + daemon_program: None, + start_timeout: Duration::from_secs(10), + retry_interval: Duration::from_millis(100), + shutting_down: Arc::new(AtomicBool::new(false)), + state: Arc::new(Mutex::new(ManagedClientState::default())), + start_guard: Arc::new(Mutex::new(())), + } + } +} + +impl ManagedClient { + pub(crate) fn new() -> Self { + Self::default() + } + + pub(crate) fn with_daemon_program(mut self, program: impl Into) -> Self { + self.daemon_program = Some(program.into()); + self + } + + pub(crate) fn with_start_timeout(mut self, timeout: Duration) -> Self { + self.start_timeout = timeout; + self + } + + pub(crate) fn with_retry_interval(mut self, interval: Duration) -> Self { + self.retry_interval = interval; + self + } + + pub(crate) async fn open_repo(&self, params: OpenRepoParams) -> Result { + match self + .send_request_with_restart(Request::OpenRepo { params }) + .await? + { + Response::RepoOpened { repo_id, .. } => Ok(RepoSession { + repo_id, + client: self.clone(), + }), + other => unexpected_response("open_repo", other), + } + } + + pub(crate) async fn shutdown_daemon(&self) -> Result<()> { + self.shutting_down.store(true, Ordering::Relaxed); + let daemon = self.state.lock().await.daemon.take(); + if let Some(daemon) = daemon { + daemon.shutdown().await?; + } + Ok(()) + } + + async fn send_request_with_restart(&self, request: Request) -> Result { + self.send_request_with_restart_timeout(request, None).await + } + + async fn send_request_with_restart_timeout( + &self, + request: Request, + timeout: Option, + ) -> Result { + if self.is_shutting_down() { + return Err(AppError::Protocol( + "flashgrep stdio backend is shutting down".into(), + )); + } + + let daemon = self.get_or_start_daemon().await?; + match daemon + .send_request_with_timeout(request.clone(), timeout) + .await + { + Ok(response) => Ok(response), + Err(error) + if !self.is_shutting_down() && should_restart_daemon(&error, daemon.as_ref()) => + { + self.clear_daemon_if_current(&daemon).await; + if let Err(shutdown_error) = daemon.shutdown().await { + log::debug!( + "Flashgrep stdio daemon shutdown after transport error failed: {}", + shutdown_error + ); + } + let restarted = self.get_or_start_daemon().await?; + restarted.send_request_with_timeout(request, timeout).await + } + Err(error) => Err(error), + } + } + + async fn get_or_start_daemon(&self) -> Result> { + if self.is_shutting_down() { + return Err(AppError::Protocol( + "flashgrep stdio backend is shutting down".into(), + )); + } + + if let Some(daemon) = self.current_daemon().await { + return Ok(daemon); + } + + let _start_guard = self.start_guard.lock().await; + if self.is_shutting_down() { + return Err(AppError::Protocol( + "flashgrep stdio backend is shutting down".into(), + )); + } + if let Some(daemon) = self.current_daemon().await { + return Ok(daemon); + } + + let deadline = Instant::now() + self.start_timeout; + loop { + match AsyncDaemonClient::spawn(self.daemon_program.clone()).await { + Ok(daemon) => { + let daemon = Arc::new(daemon); + self.state.lock().await.daemon = Some(daemon.clone()); + return Ok(daemon); + } + Err(error) if Instant::now() < deadline => { + sleep(self.retry_interval).await; + let _ = error; + } + Err(error) => return Err(error), + } + } + } + + async fn current_daemon(&self) -> Option> { + let mut state = self.state.lock().await; + match state.daemon.clone() { + Some(daemon) if !daemon.is_closed() => Some(daemon), + Some(_) => { + state.daemon = None; + None + } + None => None, + } + } + + async fn clear_daemon_if_current(&self, current: &Arc) { + let mut state = self.state.lock().await; + if state + .daemon + .as_ref() + .is_some_and(|daemon| Arc::ptr_eq(daemon, current)) + { + state.daemon = None; + } + } + + fn is_shutting_down(&self) -> bool { + self.shutting_down.load(Ordering::Relaxed) + } +} + +impl RepoSession { + pub(crate) async fn status(&self) -> Result { + self.send_repo_request( + "get_repo_status", + Request::GetRepoStatus { + params: self.repo_ref(), + }, + |response| match response { + Response::RepoStatus { status } => Ok(status), + other => unexpected_response("get_repo_status", other), + }, + None, + ) + .await + } + + pub(crate) async fn search(&self, request: SearchRequest) -> Result { + self.send_repo_request( + "search", + Request::Search { + params: SearchParams { + repo_id: self.repo_id.clone(), + query: request.query, + scope: request.scope, + consistency: request.consistency, + allow_scan_fallback: request.allow_scan_fallback, + }, + }, + |response| match response { + Response::SearchCompleted { + backend, + status, + results, + .. + } => Ok(SearchOutcome { + backend, + status, + results, + }), + other => unexpected_response("search", other), + }, + None, + ) + .await + } + + pub(crate) async fn glob(&self, request: GlobRequest) -> Result { + self.send_repo_request( + "glob", + Request::Glob { + params: GlobParams { + repo_id: self.repo_id.clone(), + scope: request.scope, + }, + }, + |response| match response { + Response::GlobCompleted { status, paths, .. } => Ok(GlobOutcome { status, paths }), + other => unexpected_response("glob", other), + }, + None, + ) + .await + } + + pub(crate) async fn index_build(&self) -> Result { + self.send_repo_request( + "base_snapshot/build", + Request::BaseSnapshotBuild { + params: self.repo_ref(), + }, + |response| match response { + Response::TaskStarted { task } => Ok(task), + other => unexpected_response("base_snapshot/build", other), + }, + None, + ) + .await + } + + pub(crate) async fn index_rebuild(&self) -> Result { + self.send_repo_request( + "base_snapshot/rebuild", + Request::BaseSnapshotRebuild { + params: self.repo_ref(), + }, + |response| match response { + Response::TaskStarted { task } => Ok(task), + other => unexpected_response("base_snapshot/rebuild", other), + }, + None, + ) + .await + } + + pub(crate) async fn task_status(&self, task_id: impl Into) -> Result { + self.send_repo_request( + "task/status", + Request::TaskStatus { + params: TaskRef { + task_id: task_id.into(), + }, + }, + |response| match response { + Response::TaskStatus { task } => Ok(task), + other => unexpected_response("task/status", other), + }, + None, + ) + .await + } + + pub(crate) async fn close(&self) -> Result<()> { + self.send_repo_request( + "close_repo", + Request::CloseRepo { + params: self.repo_ref(), + }, + |response| match response { + Response::RepoClosed { .. } => Ok(()), + other => unexpected_response("close_repo", other), + }, + Some(REPO_CLOSE_TIMEOUT), + ) + .await + } + + fn repo_ref(&self) -> RepoRef { + RepoRef { + repo_id: self.repo_id.clone(), + } + } + + async fn send_repo_request( + &self, + _method: &'static str, + request: Request, + decode: impl FnOnce(Response) -> Result, + timeout: Option, + ) -> Result { + let response = self + .client + .send_request_with_restart_timeout(request, timeout) + .await?; + decode(response) + } +} + +impl AsyncDaemonClient { + async fn spawn(daemon_program: Option) -> Result { + let program = daemon_program + .or_else(|| std::env::var_os("FLASHGREP_DAEMON_BIN")) + .unwrap_or_else(|| OsString::from("flashgrep")); + + let mut command = Command::new(program); + command + .arg("serve") + .arg("--stdio") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true); + + let mut child = command.spawn()?; + let stdin = child.stdin.take().ok_or_else(|| { + AppError::Protocol("flashgrep stdio backend did not provide stdin".into()) + })?; + let stdout = child.stdout.take().ok_or_else(|| { + AppError::Protocol("flashgrep stdio backend did not provide stdout".into()) + })?; + let stderr = child.stderr.take(); + + let client = Self { + child: Mutex::new(Some(child)), + writer: Mutex::new(BufWriter::new(stdin)), + shared: Arc::new(DaemonShared::default()), + next_id: AtomicU64::new(1), + reader_task: Mutex::new(None), + stderr_task: Mutex::new(None), + }; + + client.spawn_reader_task(stdout).await; + client.spawn_stderr_task(stderr).await; + client.initialize().await?; + Ok(client) + } + + fn is_closed(&self) -> bool { + self.shared.closed.load(Ordering::Relaxed) + } + + async fn initialize(&self) -> Result<()> { + match self + .send_request_with_timeout( + Request::Initialize { + params: InitializeParams { + client_info: Some(ClientInfo { + name: CLIENT_NAME.to_string(), + version: Some(env!("CARGO_PKG_VERSION").to_string()), + }), + capabilities: ClientCapabilities::default(), + }, + }, + None, + ) + .await? + { + Response::InitializeResult { .. } => self.send_notification(Request::Initialized).await, + other => unexpected_response("initialize", other), + } + } + + async fn send_request_with_timeout( + &self, + request: Request, + request_timeout: Option, + ) -> Result { + if self.is_closed() { + return Err(AppError::Protocol( + "flashgrep stdio backend is not running".into(), + )); + } + + let request_name = request_name(&request); + let request_id = self.next_id.fetch_add(1, Ordering::Relaxed); + let envelope = RequestEnvelope { + jsonrpc: JSONRPC_VERSION.to_string(), + id: Some(request_id), + request, + }; + let (sender, receiver) = oneshot::channel(); + self.shared.pending.lock().await.insert(request_id, sender); + + if let Err(error) = self.write_envelope(&envelope).await { + self.shared.pending.lock().await.remove(&request_id); + return Err(error); + } + + let response = match request_timeout { + Some(duration) => match timeout(duration, receiver).await { + Ok(result) => result.map_err(|_| { + AppError::Protocol( + "flashgrep stdio backend closed without sending a response".into(), + ) + })??, + Err(_) => { + self.shared.pending.lock().await.remove(&request_id); + return Err(AppError::Protocol(format!( + "flashgrep stdio backend request timed out: {request_name}" + ))); + } + }, + None => receiver.await.map_err(|_| { + AppError::Protocol( + "flashgrep stdio backend closed without sending a response".into(), + ) + })??, + }; + decode_response(request_id, response) + } + + async fn send_notification(&self, request: Request) -> Result<()> { + let envelope = RequestEnvelope { + jsonrpc: JSONRPC_VERSION.to_string(), + id: None, + request, + }; + self.write_envelope(&envelope).await + } + + async fn write_envelope(&self, envelope: &RequestEnvelope) -> Result<()> { + let mut writer = self.writer.lock().await; + write_content_length_message(&mut writer, envelope).await + } + + async fn shutdown(&self) -> Result<()> { + let shutdown_result = if self.is_closed() { + Ok(()) + } else { + self.send_request_with_timeout(Request::Shutdown, Some(SHUTDOWN_REQUEST_TIMEOUT)) + .await + .map(|_| ()) + }; + + self.mark_closed(); + self.reject_pending("flashgrep stdio backend is shutting down") + .await; + + let wait_result = self.wait_for_child_exit().await; + self.stop_background_tasks().await; + + shutdown_result?; + wait_result + } + + fn mark_closed(&self) { + self.shared.closed.store(true, Ordering::Relaxed); + } + + async fn wait_for_child_exit(&self) -> Result<()> { + let mut child = self.child.lock().await.take(); + let Some(child) = child.as_mut() else { + return Ok(()); + }; + + match timeout(SHUTDOWN_TIMEOUT, child.wait()).await { + Ok(wait_result) => { + wait_result?; + Ok(()) + } + Err(_) => { + child.kill().await?; + child.wait().await?; + Ok(()) + } + } + } + + async fn stop_background_tasks(&self) { + if let Some(handle) = self.reader_task.lock().await.take() { + handle.abort(); + let _ = handle.await; + } + if let Some(handle) = self.stderr_task.lock().await.take() { + handle.abort(); + let _ = handle.await; + } + } + + async fn spawn_reader_task(&self, stdout: ChildStdout) { + let shared = self.shared.clone(); + let handle = tokio::spawn(async move { + let mut reader = BufReader::new(stdout); + let result = reader_loop(&mut reader, &shared).await; + shared.closed.store(true, Ordering::Relaxed); + match result { + Ok(()) => { + reject_pending_requests( + &shared.pending, + "flashgrep stdio backend closed its stdout pipe", + ) + .await; + } + Err(error) => { + reject_pending_requests( + &shared.pending, + format!("flashgrep stdio backend reader failed: {error}"), + ) + .await; + } + } + }); + + *self.reader_task.lock().await = Some(handle); + } + + async fn spawn_stderr_task(&self, stderr: Option) { + let Some(stderr) = stderr else { + return; + }; + + let handle = tokio::spawn(async move { + let mut reader = BufReader::new(stderr); + let mut line = String::new(); + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => break, + Ok(_) => log::debug!("flashgrep stdio daemon stderr: {}", line.trim_end()), + Err(error) => { + log::debug!("flashgrep stdio daemon stderr read failed: {}", error); + break; + } + } + } + }); + + *self.stderr_task.lock().await = Some(handle); + } + + async fn reject_pending(&self, message: impl Into) { + reject_pending_requests(&self.shared.pending, message.into()).await; + } +} + +async fn reader_loop( + reader: &mut BufReader, + shared: &Arc, +) -> Result<()> { + while let Some(message) = read_content_length_message(reader).await? { + match message { + ServerMessage::Response(response) => { + let Some(request_id) = response.id else { + continue; + }; + if let Some(sender) = shared.pending.lock().await.remove(&request_id) { + let _ = sender.send(Ok(response)); + } + } + ServerMessage::Notification(_) => {} + } + } + Ok(()) +} + +async fn reject_pending_requests(pending: &Mutex, message: impl Into) { + let message = message.into(); + let mut pending = pending.lock().await; + if pending.is_empty() { + return; + } + + for (_, sender) in pending.drain() { + let _ = sender.send(Err(AppError::Protocol(message.clone()))); + } +} + +async fn read_content_length_message( + reader: &mut BufReader, +) -> Result> { + let mut content_length = None; + + loop { + let mut line = String::new(); + let read = reader.read_line(&mut line).await?; + if read == 0 { + return Ok(None); + } + if line == "\r\n" || line == "\n" { + break; + } + + let trimmed = line.trim_end_matches(['\r', '\n']); + let Some((name, value)) = trimmed.split_once(':') else { + continue; + }; + if name.trim().eq_ignore_ascii_case("Content-Length") { + let length = value.trim().parse::().map_err(|error| { + AppError::Protocol(format!("invalid Content-Length header: {error}")) + })?; + content_length = Some(length); + } + } + + let content_length = + content_length.ok_or_else(|| AppError::Protocol("missing Content-Length header".into()))?; + let mut body = vec![0u8; content_length]; + reader.read_exact(&mut body).await?; + serde_json::from_slice(&body) + .map_err(|error| AppError::Protocol(format!("failed to decode daemon message: {error}"))) +} + +async fn write_content_length_message( + writer: &mut BufWriter, + message: &impl Serialize, +) -> Result<()> { + let body = serde_json::to_vec(message) + .map_err(|error| AppError::Protocol(format!("failed to encode request: {error}")))?; + writer + .write_all(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes()) + .await?; + writer.write_all(&body).await?; + writer.flush().await?; + Ok(()) +} + +fn request_name(request: &Request) -> &'static str { + match request { + Request::Initialize { .. } => "initialize", + Request::Initialized => "initialized", + Request::Ping => "ping", + Request::BaseSnapshotBuild { .. } => "base_snapshot/build", + Request::BaseSnapshotRebuild { .. } => "base_snapshot/rebuild", + Request::TaskStatus { .. } => "task/status", + Request::OpenRepo { .. } => "open_repo", + Request::GetRepoStatus { .. } => "get_repo_status", + Request::Search { .. } => "search", + Request::Glob { .. } => "glob", + Request::CloseRepo { .. } => "close_repo", + Request::Shutdown => "shutdown", + } +} + +fn decode_response(request_id: u64, response: ResponseEnvelope) -> Result { + if response.id != Some(request_id) { + return Err(AppError::Protocol(format!( + "daemon response id mismatch: expected {request_id:?}, got {:?}", + response.id + ))); + } + + if response.jsonrpc != JSONRPC_VERSION { + return Err(AppError::Protocol(format!( + "unsupported daemon jsonrpc version: {}", + response.jsonrpc + ))); + } + + if let Some(error) = response.error { + return Err(AppError::Protocol(error.message)); + } + + response + .result + .ok_or_else(|| AppError::Protocol("daemon response missing result".into())) +} + +fn should_restart_daemon(error: &AppError, daemon: &AsyncDaemonClient) -> bool { + daemon.is_closed() || matches!(error, AppError::Io(_)) +} + +fn unexpected_response(method: &str, response: Response) -> Result { + Err(AppError::Protocol(format!( + "unexpected {method} response: {response:?}" + ))) +} diff --git a/src/crates/core/src/service/search/flashgrep/daemon/managed.rs b/src/crates/core/src/service/search/flashgrep/daemon/managed.rs deleted file mode 100644 index f2e8d9662..000000000 --- a/src/crates/core/src/service/search/flashgrep/daemon/managed.rs +++ /dev/null @@ -1,275 +0,0 @@ -use std::{ - ffi::OsString, - fs::{self, OpenOptions}, - io::Write, - path::{Path, PathBuf}, - process::{Command, Stdio}, - time::{Duration, Instant}, -}; - -use serde::Deserialize; - -use crate::service::search::flashgrep::error::{AppError, Result}; - -use super::protocol::{OpenRepoParams, Request, RequestEnvelope, Response, ResponseEnvelope}; - -const DEFAULT_DAEMON_STATE_FILE: &str = "daemon-state.json"; -const DEFAULT_DAEMON_START_LOCK_FILE: &str = "daemon-state.lock"; -const MIN_STALE_STARTUP_LOCK_AGE: Duration = Duration::from_secs(30); - -#[derive(Debug, Clone)] -pub(crate) struct ManagedDaemonClient { - daemon_program: Option, - start_timeout: Duration, - retry_interval: Duration, -} - -#[derive(Debug, Clone)] -pub(crate) struct OpenedRepo { - pub addr: String, - pub repo_id: String, -} - -#[derive(Debug, Clone, Deserialize)] -struct DaemonStateFile { - addr: String, -} - -impl Default for ManagedDaemonClient { - fn default() -> Self { - Self { - daemon_program: None, - start_timeout: Duration::from_secs(10), - retry_interval: Duration::from_millis(100), - } - } -} - -impl ManagedDaemonClient { - pub(crate) fn new() -> Self { - Self::default() - } - - pub(crate) fn with_daemon_program(mut self, program: impl Into) -> Self { - self.daemon_program = Some(program.into()); - self - } - - pub(crate) fn with_start_timeout(mut self, timeout: Duration) -> Self { - self.start_timeout = timeout; - self - } - - pub(crate) fn with_retry_interval(mut self, interval: Duration) -> Self { - self.retry_interval = interval; - self - } - - pub(crate) fn open_repo(&self, params: OpenRepoParams) -> Result { - let state_file = daemon_state_file_path_from_open(¶ms)?; - let lock_file = daemon_start_lock_file_path(&state_file); - if let Ok(repo) = self.try_open_repo(&state_file, ¶ms) { - return Ok(repo); - } - - let started = Instant::now(); - loop { - if let Ok(repo) = self.try_open_repo(&state_file, ¶ms) { - return Ok(repo); - } - - if let Some(_guard) = - self.try_acquire_startup_lock(&lock_file, MIN_STALE_STARTUP_LOCK_AGE)? - { - if let Ok(repo) = self.try_open_repo(&state_file, ¶ms) { - return Ok(repo); - } - self.spawn_daemon(&state_file)?; - loop { - match self.try_open_repo(&state_file, ¶ms) { - Ok(repo) => return Ok(repo), - Err(error) if started.elapsed() < self.start_timeout => { - let _ = error; - std::thread::sleep(self.retry_interval); - } - Err(error) => return Err(error), - } - } - } - - match self.try_open_repo(&state_file, ¶ms) { - Ok(repo) => return Ok(repo), - Err(error) if started.elapsed() < self.start_timeout => { - let _ = error; - std::thread::sleep(self.retry_interval); - } - Err(error) => return Err(error), - } - } - } - - fn try_open_repo(&self, state_file: &Path, params: &OpenRepoParams) -> Result { - let state = read_state_file(state_file)?; - match send_request( - &state.addr, - Request::OpenRepo { - params: params.clone(), - }, - )? { - Response::RepoOpened { repo_id, status: _ } => Ok(OpenedRepo { - addr: state.addr, - repo_id, - }), - other => Err(AppError::Protocol(format!( - "unexpected open_repo response: {other:?}" - ))), - } - } - - fn try_acquire_startup_lock( - &self, - lock_file: &Path, - stale_after: Duration, - ) -> Result> { - if let Some(parent) = lock_file.parent() { - fs::create_dir_all(parent)?; - } - - match OpenOptions::new() - .write(true) - .create_new(true) - .open(lock_file) - { - Ok(mut file) => { - let _ = writeln!(file, "pid={}", std::process::id()); - Ok(Some(StartupLockGuard { - path: lock_file.to_path_buf(), - })) - } - Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => { - if startup_lock_is_stale(lock_file, stale_after) { - match fs::remove_file(lock_file) { - Ok(()) => self.try_acquire_startup_lock(lock_file, stale_after), - Err(remove_error) - if remove_error.kind() == std::io::ErrorKind::NotFound => - { - Ok(None) - } - Err(remove_error) => Err(remove_error.into()), - } - } else { - Ok(None) - } - } - Err(error) => Err(error.into()), - } - } - - fn spawn_daemon(&self, state_file: &Path) -> Result<()> { - if state_file.exists() { - fs::remove_file(state_file)?; - } - - let program = self - .daemon_program - .clone() - .or_else(|| std::env::var_os("FLASHGREP_DAEMON_BIN")) - .unwrap_or_else(|| OsString::from("flashgrep")); - - let mut command = Command::new(program); - command - .arg("serve") - .arg("--bind") - .arg("127.0.0.1:0") - .arg("--state-file") - .arg(state_file) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()); - command.spawn()?; - Ok(()) - } -} - -struct StartupLockGuard { - path: PathBuf, -} - -impl Drop for StartupLockGuard { - fn drop(&mut self) { - let _ = fs::remove_file(&self.path); - } -} - -fn read_state_file(path: &Path) -> Result { - let contents = fs::read_to_string(path)?; - serde_json::from_str(&contents) - .map_err(|error| AppError::Protocol(format!("invalid daemon state file: {error}"))) -} - -fn send_request(addr: &str, request: Request) -> Result { - let envelope = RequestEnvelope { - jsonrpc: "2.0".into(), - id: Some(1), - request, - }; - - let stream = std::net::TcpStream::connect(addr)?; - let reader_stream = stream.try_clone()?; - let mut reader = std::io::BufReader::new(reader_stream); - let mut writer = std::io::BufWriter::new(stream); - - serde_json::to_writer(&mut writer, &envelope) - .map_err(|error| AppError::Protocol(format!("failed to encode request: {error}")))?; - writer.write_all(b"\n")?; - writer.flush()?; - - let mut line = String::new(); - let read = std::io::BufRead::read_line(&mut reader, &mut line)?; - if read == 0 { - return Err(AppError::Protocol( - "daemon closed connection without a response".into(), - )); - } - - let response: ResponseEnvelope = serde_json::from_str(&line) - .map_err(|error| AppError::Protocol(format!("failed to decode response: {error}")))?; - - if response.jsonrpc != "2.0" { - return Err(AppError::Protocol(format!( - "unsupported daemon jsonrpc version: {}", - response.jsonrpc - ))); - } - - if let Some(error) = response.error { - return Err(AppError::Protocol(error.message)); - } - - response - .result - .ok_or_else(|| AppError::Protocol("daemon response missing result".into())) -} - -fn daemon_state_file_path_from_open(params: &OpenRepoParams) -> Result { - let storage_root = params - .storage_root - .clone() - .unwrap_or_else(|| params.repo_path.join(".flashgrep-index-engine")); - Ok(storage_root.join(DEFAULT_DAEMON_STATE_FILE)) -} - -fn daemon_start_lock_file_path(state_file: &Path) -> PathBuf { - state_file - .parent() - .map(|parent| parent.join(DEFAULT_DAEMON_START_LOCK_FILE)) - .unwrap_or_else(|| PathBuf::from(DEFAULT_DAEMON_START_LOCK_FILE)) -} - -fn startup_lock_is_stale(path: &Path, stale_after: Duration) -> bool { - fs::metadata(path) - .and_then(|metadata| metadata.modified()) - .ok() - .and_then(|modified| modified.elapsed().ok()) - .is_some_and(|age| age >= stale_after) -} diff --git a/src/crates/core/src/service/search/flashgrep/daemon/mod.rs b/src/crates/core/src/service/search/flashgrep/daemon/mod.rs deleted file mode 100644 index 3cb766c54..000000000 --- a/src/crates/core/src/service/search/flashgrep/daemon/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod managed; -pub(crate) mod protocol; - -pub(crate) use managed::{ManagedDaemonClient, OpenedRepo}; diff --git a/src/crates/core/src/service/search/flashgrep/mod.rs b/src/crates/core/src/service/search/flashgrep/mod.rs index 69272d096..a506a56e1 100644 --- a/src/crates/core/src/service/search/flashgrep/mod.rs +++ b/src/crates/core/src/service/search/flashgrep/mod.rs @@ -1,3 +1,13 @@ -pub mod daemon; +mod client; pub mod error; -pub mod sdk; +mod protocol; +mod types; + +pub(crate) use client::{ManagedClient, RepoSession}; +pub(crate) use protocol::{FileMatch, MatchLocation, SearchHit, SearchLine}; +pub(crate) use types::{ + ConsistencyMode, DirtyFileStats, FileCount, GlobRequest, OpenRepoParams, PathScope, QuerySpec, + RefreshPolicyConfig, RepoConfig, RepoPhase, RepoStatus, SearchBackend, SearchModeConfig, + SearchRequest, SearchResults, TaskKind, TaskPhase, TaskState, TaskStatus, + WorkspaceOverlayStatus, +}; diff --git a/src/crates/core/src/service/search/flashgrep/daemon/protocol.rs b/src/crates/core/src/service/search/flashgrep/protocol.rs similarity index 83% rename from src/crates/core/src/service/search/flashgrep/daemon/protocol.rs rename to src/crates/core/src/service/search/flashgrep/protocol.rs index 47bf24e60..667831bca 100644 --- a/src/crates/core/src/service/search/flashgrep/daemon/protocol.rs +++ b/src/crates/core/src/service/search/flashgrep/protocol.rs @@ -20,6 +20,11 @@ pub(crate) struct RequestEnvelope { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "method", rename_all = "snake_case")] pub(crate) enum Request { + Initialize { + params: InitializeParams, + }, + Initialized, + Ping, #[serde(rename = "base_snapshot/build")] BaseSnapshotBuild { params: RepoRef, @@ -44,6 +49,35 @@ pub(crate) enum Request { Glob { params: GlobParams, }, + CloseRepo { + params: RepoRef, + }, + Shutdown, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub(crate) struct InitializeParams { + #[serde(default)] + pub client_info: Option, + #[serde(default)] + pub capabilities: ClientCapabilities, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct ClientInfo { + pub name: String, + #[serde(default)] + pub version: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub(crate) struct ClientCapabilities { + #[serde(default)] + pub progress: bool, + #[serde(default)] + pub status_notifications: bool, + #[serde(default)] + pub task_notifications: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -236,6 +270,22 @@ pub(crate) struct ResponseEnvelope { pub error: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct NotificationEnvelope { + #[serde(default = "default_jsonrpc_version")] + pub jsonrpc: String, + pub method: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub params: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub(crate) enum ServerMessage { + Response(ResponseEnvelope), + Notification(NotificationEnvelope), +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct ErrorResponse { pub code: i64, @@ -247,6 +297,16 @@ pub(crate) struct ErrorResponse { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub(crate) enum Response { + InitializeResult { + protocol_version: u32, + server_info: ServerInfo, + capabilities: ServerCapabilities, + search: SearchProtocolCapabilities, + }, + InitializedAck, + Pong { + now_unix_secs: u64, + }, RepoOpened { repo_id: String, status: RepoStatus, @@ -272,6 +332,38 @@ pub(crate) enum Response { status: RepoStatus, paths: Vec, }, + RepoClosed { + repo_id: String, + }, + ShutdownAck, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct ServerInfo { + pub name: String, + pub version: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct ServerCapabilities { + pub workspace_open: bool, + pub workspace_ensure: bool, + pub workspace_list: bool, + pub workspace_refresh: bool, + pub base_snapshot_build: bool, + pub base_snapshot_rebuild: bool, + pub task_status: bool, + pub task_cancel: bool, + pub search_query: bool, + pub glob_query: bool, + pub progress_notifications: bool, + pub status_notifications: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct SearchProtocolCapabilities { + pub consistency_modes: Vec, + pub search_modes: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/crates/core/src/service/search/flashgrep/sdk/tokio.rs b/src/crates/core/src/service/search/flashgrep/sdk/tokio.rs deleted file mode 100644 index c5d1896bc..000000000 --- a/src/crates/core/src/service/search/flashgrep/sdk/tokio.rs +++ /dev/null @@ -1,345 +0,0 @@ -use std::sync::atomic::{AtomicU64, Ordering}; - -use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, - net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, - }, - sync::Mutex, - task, -}; - -use crate::service::search::flashgrep::{ - daemon::{ - protocol::{ - GlobParams, RepoRef, Request, RequestEnvelope, Response, ResponseEnvelope, - SearchParams, TaskRef, - }, - ManagedDaemonClient, OpenedRepo, - }, - error::{AppError, Result}, - sdk::{ - GlobOutcome, GlobRequest, OpenRepoParams, RepoStatus, SearchOutcome, SearchRequest, - TaskStatus, - }, -}; - -#[derive(Debug, Clone)] -pub(crate) struct ManagedClient { - inner: ManagedDaemonClient, -} - -#[derive(Debug)] -pub(crate) struct RepoSession { - repo_id: String, - client: AsyncDaemonClient, -} - -#[derive(Debug)] -struct AsyncDaemonClient { - addr: String, - next_id: AtomicU64, - connection: Mutex>, -} - -#[derive(Debug)] -struct AsyncDaemonConnection { - reader: BufReader, - writer: BufWriter, -} - -impl Default for ManagedClient { - fn default() -> Self { - Self { - inner: ManagedDaemonClient::new(), - } - } -} - -impl ManagedClient { - pub(crate) fn new() -> Self { - Self::default() - } - - pub(crate) fn with_daemon_program(mut self, program: impl Into) -> Self { - self.inner = self.inner.with_daemon_program(program); - self - } - - pub(crate) fn with_start_timeout(mut self, timeout: std::time::Duration) -> Self { - self.inner = self.inner.with_start_timeout(timeout); - self - } - - pub(crate) fn with_retry_interval(mut self, interval: std::time::Duration) -> Self { - self.inner = self.inner.with_retry_interval(interval); - self - } - - pub(crate) async fn open_repo(&self, params: OpenRepoParams) -> Result { - let inner = self.inner.clone(); - let opened = task::spawn_blocking(move || inner.open_repo(params)) - .await - .map_err(|error| { - AppError::Protocol(format!("async open_repo task failed: {error}")) - })??; - Ok(RepoSession::from_opened(opened)) - } -} - -impl RepoSession { - fn from_opened(opened: OpenedRepo) -> Self { - Self { - client: AsyncDaemonClient::new(opened.addr), - repo_id: opened.repo_id, - } - } - - pub(crate) async fn status(&self) -> Result { - match self - .client - .get_repo_status_isolated(self.repo_id.clone()) - .await? - { - Response::RepoStatus { status } => Ok(status), - other => unexpected_response("get_repo_status", other), - } - } - - pub(crate) async fn search(&self, request: SearchRequest) -> Result { - match self - .client - .search(SearchParams { - repo_id: self.repo_id.clone(), - query: request.query, - scope: request.scope, - consistency: request.consistency, - allow_scan_fallback: request.allow_scan_fallback, - }) - .await? - { - Response::SearchCompleted { - repo_id: _, - backend, - consistency_applied: _, - status, - results, - } => Ok(SearchOutcome { - backend, - status, - results, - }), - other => unexpected_response("search", other), - } - } - - pub(crate) async fn glob(&self, request: GlobRequest) -> Result { - match self - .client - .glob(GlobParams { - repo_id: self.repo_id.clone(), - scope: request.scope, - }) - .await? - { - Response::GlobCompleted { - repo_id: _, - status, - paths, - } => Ok(GlobOutcome { status, paths }), - other => unexpected_response("glob", other), - } - } - - pub(crate) async fn index_build(&self) -> Result { - match self - .client - .base_snapshot_build(self.repo_id.clone()) - .await? - { - Response::TaskStarted { task } => Ok(task), - other => unexpected_response("base_snapshot/build", other), - } - } - - pub(crate) async fn index_rebuild(&self) -> Result { - match self - .client - .base_snapshot_rebuild(self.repo_id.clone()) - .await? - { - Response::TaskStarted { task } => Ok(task), - other => unexpected_response("base_snapshot/rebuild", other), - } - } - - pub(crate) async fn task_status(&self, task_id: impl Into) -> Result { - match self.client.task_status(task_id).await? { - Response::TaskStatus { task } => Ok(task), - other => unexpected_response("task/status", other), - } - } -} - -impl AsyncDaemonClient { - fn new(addr: impl Into) -> Self { - Self { - addr: addr.into(), - next_id: AtomicU64::new(1), - connection: Mutex::new(None), - } - } - - async fn search(&self, params: SearchParams) -> Result { - self.send_isolated(Request::Search { params }).await - } - - async fn glob(&self, params: GlobParams) -> Result { - self.send(Request::Glob { params }).await - } - - async fn get_repo_status_isolated(&self, repo_id: impl Into) -> Result { - self.send_isolated(Request::GetRepoStatus { - params: RepoRef { - repo_id: repo_id.into(), - }, - }) - .await - } - - async fn base_snapshot_build(&self, repo_id: impl Into) -> Result { - self.send(Request::BaseSnapshotBuild { - params: RepoRef { - repo_id: repo_id.into(), - }, - }) - .await - } - - async fn base_snapshot_rebuild(&self, repo_id: impl Into) -> Result { - self.send(Request::BaseSnapshotRebuild { - params: RepoRef { - repo_id: repo_id.into(), - }, - }) - .await - } - - async fn task_status(&self, task_id: impl Into) -> Result { - self.send(Request::TaskStatus { - params: TaskRef { - task_id: task_id.into(), - }, - }) - .await - } - - async fn send(&self, request: Request) -> Result { - let request_id = self.next_id.fetch_add(1, Ordering::Relaxed); - let envelope = RequestEnvelope { - jsonrpc: "2.0".into(), - id: Some(request_id), - request, - }; - - let mut connection = self.connection.lock().await; - let response = match self.send_with_connection(&mut connection, &envelope).await { - Ok(response) => response, - Err(_) => { - *connection = None; - self.send_with_connection(&mut connection, &envelope) - .await? - } - }; - - decode_response(request_id, response) - } - - async fn send_isolated(&self, request: Request) -> Result { - let request_id = self.next_id.fetch_add(1, Ordering::Relaxed); - let envelope = RequestEnvelope { - jsonrpc: "2.0".into(), - id: Some(request_id), - request, - }; - - let mut connection = Some(self.connect().await?); - let response = self - .send_with_connection(&mut connection, &envelope) - .await?; - decode_response(request_id, response) - } - - async fn send_with_connection( - &self, - connection: &mut Option, - envelope: &RequestEnvelope, - ) -> Result { - let connection = match connection { - Some(connection) => connection, - None => { - *connection = Some(self.connect().await?); - connection - .as_mut() - .expect("connection must exist after successful connect") - } - }; - - let payload = serde_json::to_vec(envelope) - .map_err(|error| AppError::Protocol(format!("failed to encode request: {error}")))?; - connection.writer.write_all(&payload).await?; - connection.writer.write_all(b"\n").await?; - connection.writer.flush().await?; - - let mut line = String::new(); - let read = connection.reader.read_line(&mut line).await?; - if read == 0 { - return Err(AppError::Protocol( - "daemon closed connection without a response".into(), - )); - } - - serde_json::from_str(&line) - .map_err(|error| AppError::Protocol(format!("failed to decode response: {error}"))) - } - - async fn connect(&self) -> Result { - let stream = TcpStream::connect(&self.addr).await?; - let (reader, writer) = stream.into_split(); - Ok(AsyncDaemonConnection { - reader: BufReader::new(reader), - writer: BufWriter::new(writer), - }) - } -} - -fn decode_response(request_id: u64, response: ResponseEnvelope) -> Result { - if response.id != Some(request_id) { - return Err(AppError::Protocol(format!( - "daemon response id mismatch: expected {request_id:?}, got {:?}", - response.id - ))); - } - - if response.jsonrpc != "2.0" { - return Err(AppError::Protocol(format!( - "unsupported daemon jsonrpc version: {}", - response.jsonrpc - ))); - } - - if let Some(error) = response.error { - return Err(AppError::Protocol(error.message)); - } - - response - .result - .ok_or_else(|| AppError::Protocol("daemon response missing result".into())) -} - -fn unexpected_response(method: &str, response: Response) -> Result { - Err(AppError::Protocol(format!( - "unexpected {method} response: {response:?}" - ))) -} diff --git a/src/crates/core/src/service/search/flashgrep/sdk/mod.rs b/src/crates/core/src/service/search/flashgrep/types.rs similarity index 95% rename from src/crates/core/src/service/search/flashgrep/sdk/mod.rs rename to src/crates/core/src/service/search/flashgrep/types.rs index 06f6bdf01..9e577ef14 100644 --- a/src/crates/core/src/service/search/flashgrep/sdk/mod.rs +++ b/src/crates/core/src/service/search/flashgrep/types.rs @@ -1,6 +1,4 @@ -pub mod tokio; - -pub(crate) use crate::service::search::flashgrep::daemon::protocol::{ +pub(crate) use super::protocol::{ ConsistencyMode, DirtyFileStats, FileCount, OpenRepoParams, PathScope, QuerySpec, RefreshPolicyConfig, RepoConfig, RepoPhase, RepoStatus, SearchBackend, SearchModeConfig, SearchResults, TaskKind, TaskPhase, TaskState, TaskStatus, WorkspaceOverlayStatus, diff --git a/src/crates/core/src/service/search/service.rs b/src/crates/core/src/service/search/service.rs index fe21f105f..1e1c7e624 100644 --- a/src/crates/core/src/service/search/service.rs +++ b/src/crates/core/src/service/search/service.rs @@ -1,17 +1,19 @@ use crate::infrastructure::{FileSearchOutcome, FileSearchResult, SearchMatchType}; use crate::service::config::{get_global_config_service, types::WorkspaceConfig}; -use crate::service::search::flashgrep::sdk::tokio::{ManagedClient, RepoSession}; -use crate::service::search::flashgrep::sdk::{ - ConsistencyMode, GlobRequest, OpenRepoParams, PathScope, QuerySpec, RefreshPolicyConfig, - RepoConfig, SearchRequest, SearchResults, +use crate::service::search::flashgrep::{ + ConsistencyMode, GlobRequest, ManagedClient, OpenRepoParams, PathScope, QuerySpec, + RefreshPolicyConfig, RepoConfig, RepoSession, SearchRequest, SearchResults, }; use crate::util::errors::{BitFunError, BitFunResult}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::ffi::OsString; use std::path::{Path, PathBuf}; -use std::sync::{Arc, OnceLock}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, OnceLock, +}; use std::time::{Duration, Instant}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use super::types::{ ContentSearchOutputMode, ContentSearchRequest, ContentSearchResult, GlobSearchRequest, @@ -22,10 +24,19 @@ use super::types::{ static GLOBAL_WORKSPACE_SEARCH_SERVICE: OnceLock> = OnceLock::new(); const DEFAULT_TOP_K_TOKENS: usize = 6; +const DEFAULT_SESSION_IDLE_GRACE: Duration = Duration::from_secs(45); + +#[derive(Debug, Clone)] +struct SessionEntry { + session: Arc, + activity_epoch: Arc, +} pub struct WorkspaceSearchService { client: ManagedClient, - sessions: RwLock>>, + sessions: RwLock>, + open_guards: Mutex>>>, + session_idle_grace: Duration, } impl WorkspaceSearchService { @@ -47,6 +58,8 @@ impl WorkspaceSearchService { Self { client, sessions: RwLock::new(HashMap::new()), + open_guards: Mutex::new(HashMap::new()), + session_idle_grace: DEFAULT_SESSION_IDLE_GRACE, } } @@ -242,21 +255,58 @@ impl WorkspaceSearchService { }) } + pub fn schedule_repo_release(self: &Arc, repo_root: impl AsRef) { + let Ok(repo_root) = normalize_repo_root(repo_root.as_ref()) else { + return; + }; + let service = Arc::clone(self); + tokio::spawn(async move { + service.release_repo_after_grace(repo_root).await; + }); + } + pub async fn shutdown_all_daemons(&self) { - self.sessions.write().await.clear(); + let released_sessions = self.sessions.write().await.drain().count(); + self.open_guards.lock().await.clear(); + if released_sessions > 0 { + log::info!( + "Workspace search shutdown releasing sessions via daemon shutdown: count={}", + released_sessions + ); + } + if let Err(error) = self.client.shutdown_daemon().await { + log::debug!("Workspace search daemon shutdown skipped: {}", error); + } } async fn get_or_open_session(&self, repo_root: &Path) -> BitFunResult> { let repo_root = normalize_repo_root(repo_root)?; + let repo_guard = { + let mut guards = self.open_guards.lock().await; + guards + .entry(repo_root.clone()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + let _repo_guard = repo_guard.lock().await; + if let Some(existing) = self.sessions.read().await.get(&repo_root).cloned() { - if existing.status().await.is_ok() { - return Ok(existing); + existing.activity_epoch.fetch_add(1, Ordering::Relaxed); + if existing.session.status().await.is_ok() { + return Ok(existing.session); } log::warn!( "Workspace search session became unhealthy, reopening repository session: path={}", repo_root.display() ); self.sessions.write().await.remove(&repo_root); + if let Err(error) = existing.session.close().await { + log::debug!( + "Workspace search repo close after unhealthy session failed: path={}, error={}", + repo_root.display(), + error + ); + } } let repo_config = repo_config_for_workspace_search().await; @@ -267,19 +317,19 @@ impl WorkspaceSearchService { refresh: RefreshPolicyConfig::default(), }; - let session = Arc::new( - self.client - .open_repo(params) - .await - .map_err(map_flashgrep_error( - "Failed to open flashgrep repository session", - ))?, - ); + let entry = + SessionEntry { + session: Arc::new(self.client.open_repo(params).await.map_err( + map_flashgrep_error("Failed to open flashgrep repository session"), + )?), + activity_epoch: Arc::new(AtomicU64::new(1)), + }; let mut sessions = self.sessions.write().await; Ok(sessions .entry(repo_root) - .or_insert_with(|| session.clone()) + .or_insert_with(|| entry.clone()) + .session .clone()) } @@ -307,6 +357,46 @@ impl WorkspaceSearchService { active_task: active_task.map(Into::into), }) } + + async fn release_repo_after_grace(self: Arc, repo_root: PathBuf) { + let Some(expected_epoch) = self + .sessions + .read() + .await + .get(&repo_root) + .map(|entry| entry.activity_epoch.load(Ordering::Relaxed)) + else { + return; + }; + + tokio::time::sleep(self.session_idle_grace).await; + + let entry = { + let mut sessions = self.sessions.write().await; + let Some(entry) = sessions.get(&repo_root) else { + return; + }; + if entry.activity_epoch.load(Ordering::Relaxed) != expected_epoch { + return; + } + sessions.remove(&repo_root) + }; + + if let Some(entry) = entry { + log::info!( + "Releasing idle workspace search repository session: path={}", + repo_root.display() + ); + if let Err(error) = entry.session.close().await { + log::warn!( + "Failed to release idle workspace search repository session: path={}, error={}", + repo_root.display(), + error + ); + } + self.open_guards.lock().await.remove(&repo_root); + } + } } impl Default for WorkspaceSearchService { @@ -330,7 +420,11 @@ fn resolve_daemon_program() -> Option { let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let workspace_root = manifest_dir.join("../../.."); - let binary_name = if cfg!(windows) { "flashgrep.exe" } else { "flashgrep" }; + let binary_name = if cfg!(windows) { + "flashgrep.exe" + } else { + "flashgrep" + }; let profile = std::env::var("PROFILE").ok(); for candidate in daemon_binary_candidates(&workspace_root, binary_name, profile.as_deref()) { @@ -339,7 +433,9 @@ fn resolve_daemon_program() -> Option { } } - which::which("flashgrep").ok().map(|path| path.into_os_string()) + which::which("flashgrep") + .ok() + .map(|path| path.into_os_string()) } fn daemon_binary_candidates( diff --git a/src/crates/core/src/service/search/types.rs b/src/crates/core/src/service/search/types.rs index ebab27c02..77e734a66 100644 --- a/src/crates/core/src/service/search/types.rs +++ b/src/crates/core/src/service/search/types.rs @@ -1,14 +1,12 @@ use crate::infrastructure::FileSearchOutcome; -use crate::service::search::flashgrep::daemon::protocol::{ - FileMatch as FlashgrepFileMatch, MatchLocation as FlashgrepMatchLocation, - SearchHit as FlashgrepSearchHit, SearchLine as FlashgrepSearchLine, -}; -use crate::service::search::flashgrep::sdk::{ +use crate::service::search::flashgrep::{ DirtyFileStats as FlashgrepDirtyFileStats, FileCount as FlashgrepFileCount, + FileMatch as FlashgrepFileMatch, MatchLocation as FlashgrepMatchLocation, RepoPhase as FlashgrepRepoPhase, RepoStatus as FlashgrepRepoStatus, - SearchBackend as FlashgrepSearchBackend, SearchModeConfig, TaskKind as FlashgrepTaskKind, - TaskPhase as FlashgrepTaskPhase, TaskState as FlashgrepTaskState, TaskStatus as FlashgrepTaskStatus, - WorkspaceOverlayStatus as FlashgrepWorkspaceOverlayStatus, + SearchBackend as FlashgrepSearchBackend, SearchHit as FlashgrepSearchHit, + SearchLine as FlashgrepSearchLine, SearchModeConfig, TaskKind as FlashgrepTaskKind, + TaskPhase as FlashgrepTaskPhase, TaskState as FlashgrepTaskState, + TaskStatus as FlashgrepTaskStatus, WorkspaceOverlayStatus as FlashgrepWorkspaceOverlayStatus, }; use serde::{Deserialize, Serialize}; use std::path::PathBuf; @@ -85,7 +83,7 @@ pub enum WorkspaceSearchRepoPhase { NeedsIndex, Building, Ready, - Stale, + TrackingChanges, Refreshing, Limited, } @@ -97,7 +95,7 @@ impl From for WorkspaceSearchRepoPhase { FlashgrepRepoPhase::MissingBaseSnapshot => Self::NeedsIndex, FlashgrepRepoPhase::BuildingBaseSnapshot => Self::Building, FlashgrepRepoPhase::ReadyClean => Self::Ready, - FlashgrepRepoPhase::ReadyDirty => Self::Stale, + FlashgrepRepoPhase::ReadyDirty => Self::TrackingChanges, FlashgrepRepoPhase::RebuildingBaseSnapshot => Self::Refreshing, FlashgrepRepoPhase::Degraded => Self::Limited, } diff --git a/src/web-ui/src/app/components/NavPanel/sections/workspaces/WorkspaceItem.tsx b/src/web-ui/src/app/components/NavPanel/sections/workspaces/WorkspaceItem.tsx index 164f9bdd9..a04adb9d3 100644 --- a/src/web-ui/src/app/components/NavPanel/sections/workspaces/WorkspaceItem.tsx +++ b/src/web-ui/src/app/components/NavPanel/sections/workspaces/WorkspaceItem.tsx @@ -200,7 +200,7 @@ const WorkspaceItem: React.FC = ({ || Boolean(repoStatus?.rebuildRecommended) ) { tone = 'yellow'; - } else if (phase === 'ready' || phase === 'stale') { + } else if (phase === 'ready' || phase === 'tracking_changes') { tone = 'green'; } diff --git a/src/web-ui/src/app/components/panels/FilesPanel.tsx b/src/web-ui/src/app/components/panels/FilesPanel.tsx index 4e461c705..e1f0e3e43 100644 --- a/src/web-ui/src/app/components/panels/FilesPanel.tsx +++ b/src/web-ui/src/app/components/panels/FilesPanel.tsx @@ -55,7 +55,8 @@ function getIndexPhaseBadgeVariant(phase?: WorkspaceSearchRepoPhase): 'neutral' switch (phase) { case 'ready': return 'success'; - case 'stale': + case 'tracking_changes': + return 'info'; case 'needs_index': return 'warning'; case 'building': diff --git a/src/web-ui/src/infrastructure/api/service-api/tauri-commands.ts b/src/web-ui/src/infrastructure/api/service-api/tauri-commands.ts index ce0eb8705..60cc76059 100644 --- a/src/web-ui/src/infrastructure/api/service-api/tauri-commands.ts +++ b/src/web-ui/src/infrastructure/api/service-api/tauri-commands.ts @@ -261,7 +261,7 @@ export type WorkspaceSearchRepoPhase = | 'needs_index' | 'building' | 'ready' - | 'stale' + | 'tracking_changes' | 'refreshing' | 'limited'; diff --git a/src/web-ui/src/locales/en-US/panels/files.json b/src/web-ui/src/locales/en-US/panels/files.json index 5468ec529..7b6025eaf 100644 --- a/src/web-ui/src/locales/en-US/panels/files.json +++ b/src/web-ui/src/locales/en-US/panels/files.json @@ -30,7 +30,7 @@ "needs_index": "Needs Index", "building": "Building", "ready": "Ready", - "stale": "Stale", + "tracking_changes": "Tracking Changes", "refreshing": "Refreshing", "limited": "Limited", "unknown": "Unknown" @@ -40,7 +40,7 @@ "needs_index": "Search works with fallback now. Build the index for faster content search.", "building": "Building the workspace index.", "ready": "Managed index is ready. Content search can use the indexed backend.", - "stale": "Managed index is usable, but workspace changes suggest a rebuild soon.", + "tracking_changes": "Managed index is usable and workspace changes are being tracked. Rebuild is only needed when one is recommended.", "refreshing": "Refreshing the workspace index with the latest file changes.", "limited": "Managed index is limited. Search can still fall back, but rebuild is recommended.", "unavailable": "Search index status is unavailable right now." diff --git a/src/web-ui/src/locales/zh-CN/panels/files.json b/src/web-ui/src/locales/zh-CN/panels/files.json index d601df76a..479c37e63 100644 --- a/src/web-ui/src/locales/zh-CN/panels/files.json +++ b/src/web-ui/src/locales/zh-CN/panels/files.json @@ -30,7 +30,7 @@ "needs_index": "未建索引", "building": "建立中", "ready": "索引可用", - "stale": "索引已旧", + "tracking_changes": "跟踪变更中", "refreshing": "刷新中", "limited": "受限", "unknown": "未知" @@ -40,7 +40,7 @@ "needs_index": "当前搜索仍可通过 fallback 使用,建立索引后内容搜索会更快。", "building": "正在为当前工作区建立索引。", "ready": "索引已就绪,内容搜索可优先走索引后端。", - "stale": "索引当前可用,但工作区变更已提示应尽快重建。", + "tracking_changes": "索引当前可用,工作区变更正在被跟踪。只有出现建议重建时,才需要关注重建。", "refreshing": "正在根据最新文件变更刷新索引。", "limited": "索引能力受限。搜索仍可 fallback,但建议重建。", "unavailable": "当前无法获取索引状态。"