deno.land / x / deno@v1.28.2 / cli / file_watcher.rs
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use crate::colors;use crate::fs_util::canonicalize_path;
use deno_core::error::AnyError;use deno_core::error::JsError;use deno_core::futures::Future;use deno_runtime::fmt_errors::format_js_error;use log::info;use notify::event::Event as NotifyEvent;use notify::event::EventKind;use notify::Error as NotifyError;use notify::RecommendedWatcher;use notify::RecursiveMode;use notify::Watcher;use std::collections::HashSet;use std::path::PathBuf;use std::sync::Arc;use std::time::Duration;use tokio::select;use tokio::sync::mpsc;use tokio::sync::mpsc::UnboundedReceiver;use tokio::time::sleep;
const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H";const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200);
struct DebouncedReceiver { // The `recv()` call could be used in a tokio `select!` macro, // and so we store this state on the struct to ensure we don't // lose items if a `recv()` never completes received_items: HashSet<PathBuf>, receiver: UnboundedReceiver<Vec<PathBuf>>,}
impl DebouncedReceiver { fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) { let (sender, receiver) = mpsc::unbounded_channel(); ( Arc::new(sender), Self { receiver, received_items: HashSet::new(), }, ) }
async fn recv(&mut self) -> Option<Vec<PathBuf>> { if self.received_items.is_empty() { self .received_items .extend(self.receiver.recv().await?.into_iter()); }
loop { select! { items = self.receiver.recv() => { self.received_items.extend(items?); } _ = sleep(DEBOUNCE_INTERVAL) => { return Some(self.received_items.drain().collect()); } } } }}
async fn error_handler<F>(watch_future: F)where F: Future<Output = Result<(), AnyError>>,{ let result = watch_future.await; if let Err(err) = result { let error_string = match err.downcast_ref::<JsError>() { Some(e) => format_js_error(e), None => format!("{:?}", err), }; eprintln!( "{}: {}", colors::red_bold("error"), error_string.trim_start_matches("error: ") ); }}
pub enum ResolutionResult<T> { Restart { paths_to_watch: Vec<PathBuf>, result: Result<T, AnyError>, }, Ignore,}
async fn next_restart<R, T, F>( resolver: &mut R, debounced_receiver: &mut DebouncedReceiver,) -> (Vec<PathBuf>, Result<T, AnyError>)where R: FnMut(Option<Vec<PathBuf>>) -> F, F: Future<Output = ResolutionResult<T>>,{ loop { let changed = debounced_receiver.recv().await; match resolver(changed).await { ResolutionResult::Ignore => { log::debug!("File change ignored") } ResolutionResult::Restart { paths_to_watch, result, } => { return (paths_to_watch, result); } } }}
pub struct PrintConfig { /// printing watcher status to terminal. pub job_name: String, /// determine whether to clear the terminal screen pub clear_screen: bool,}
fn create_print_after_restart_fn(clear_screen: bool) -> impl Fn() { move || { if clear_screen { eprint!("{}", CLEAR_SCREEN); } info!( "{} File change detected! Restarting!", colors::intense_blue("Watcher"), ); }}
/// Creates a file watcher, which will call `resolver` with every file change.////// - `resolver` is used for resolving file paths to be watched at every restarting/// of the watcher, and can also return a value to be passed to `operation`./// It returns a [`ResolutionResult`], which can either instruct the watcher to restart or ignore the change./// This always contains paths to watch;////// - `operation` is the actual operation we want to run every time the watcher detects file/// changes. For example, in the case where we would like to bundle, then `operation` would/// have the logic for it like bundling the code.pub async fn watch_func<R, O, T, F1, F2>( mut resolver: R, mut operation: O, print_config: PrintConfig,) -> Result<(), AnyError>where R: FnMut(Option<Vec<PathBuf>>) -> F1, O: FnMut(T) -> F2, F1: Future<Output = ResolutionResult<T>>, F2: Future<Output = Result<(), AnyError>>,{ let (sender, mut receiver) = DebouncedReceiver::new_with_sender();
let PrintConfig { job_name, clear_screen, } = print_config;
// Store previous data. If module resolution fails at some point, the watcher will try to // continue watching files using these data. let mut paths_to_watch; let mut resolution_result;
let print_after_restart = create_print_after_restart_fn(clear_screen);
match resolver(None).await { ResolutionResult::Ignore => { // The only situation where it makes sense to ignore the initial 'change' // is if the command isn't supposed to do anything until something changes, // e.g. a variant of `deno test` which doesn't run the entire test suite to start with, // but instead does nothing until you make a change. // // In that case, this is probably the correct output. info!( "{} Waiting for file changes...", colors::intense_blue("Watcher"), );
let (paths, result) = next_restart(&mut resolver, &mut receiver).await; paths_to_watch = paths; resolution_result = result;
print_after_restart(); } ResolutionResult::Restart { paths_to_watch: paths, result, } => { paths_to_watch = paths; resolution_result = result; } };
info!("{} {} started.", colors::intense_blue("Watcher"), job_name,);
loop { let mut watcher = new_watcher(sender.clone())?; add_paths_to_watcher(&mut watcher, &paths_to_watch);
match resolution_result { Ok(operation_arg) => { let fut = error_handler(operation(operation_arg)); select! { (paths, result) = next_restart(&mut resolver, &mut receiver) => { if result.is_ok() { paths_to_watch = paths; } resolution_result = result;
print_after_restart(); continue; }, _ = fut => {}, };
info!( "{} {} finished. Restarting on file change...", colors::intense_blue("Watcher"), job_name, ); } Err(error) => { eprintln!("{}: {}", colors::red_bold("error"), error); info!( "{} {} failed. Restarting on file change...", colors::intense_blue("Watcher"), job_name, ); } }
let (paths, result) = next_restart(&mut resolver, &mut receiver).await; if result.is_ok() { paths_to_watch = paths; } resolution_result = result;
print_after_restart();
drop(watcher); }}
/// Creates a file watcher.////// - `operation` is the actual operation we want to run every time the watcher detects file/// changes. For example, in the case where we would like to bundle, then `operation` would/// have the logic for it like bundling the code.pub async fn watch_func2<T: Clone, O, F>( mut paths_to_watch_receiver: UnboundedReceiver<Vec<PathBuf>>, mut operation: O, operation_args: T, print_config: PrintConfig,) -> Result<(), AnyError>where O: FnMut(T) -> Result<F, AnyError>, F: Future<Output = Result<(), AnyError>>,{ let (watcher_sender, mut watcher_receiver) = DebouncedReceiver::new_with_sender();
let PrintConfig { job_name, clear_screen, } = print_config;
let print_after_restart = create_print_after_restart_fn(clear_screen);
info!("{} {} started.", colors::intense_blue("Watcher"), job_name,);
fn consume_paths_to_watch( watcher: &mut RecommendedWatcher, receiver: &mut UnboundedReceiver<Vec<PathBuf>>, ) { loop { match receiver.try_recv() { Ok(paths) => { add_paths_to_watcher(watcher, &paths); } Err(e) => match e { mpsc::error::TryRecvError::Empty => { break; } // there must be at least one receiver alive _ => unreachable!(), }, } } }
loop { let mut watcher = new_watcher(watcher_sender.clone())?; consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);
let receiver_future = async { loop { let maybe_paths = paths_to_watch_receiver.recv().await; add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap()); } }; let operation_future = error_handler(operation(operation_args.clone())?);
select! { _ = receiver_future => {}, _ = watcher_receiver.recv() => { print_after_restart(); continue; }, _ = operation_future => { // TODO(bartlomieju): print exit code here? info!( "{} {} finished. Restarting on file change...", colors::intense_blue("Watcher"), job_name, ); consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); }, };
let receiver_future = async { loop { let maybe_paths = paths_to_watch_receiver.recv().await; add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap()); } }; select! { _ = receiver_future => {}, _ = watcher_receiver.recv() => { print_after_restart(); continue; }, }; }}
fn new_watcher( sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,) -> Result<RecommendedWatcher, AnyError> { let watcher = Watcher::new( move |res: Result<NotifyEvent, NotifyError>| { if let Ok(event) = res { if matches!( event.kind, EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) ) { let paths = event .paths .iter() .filter_map(|path| canonicalize_path(path).ok()) .collect(); sender.send(paths).unwrap(); } } }, Default::default(), )?;
Ok(watcher)}
fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) { // Ignore any error e.g. `PathNotFound` for path in paths { let _ = watcher.watch(path, RecursiveMode::Recursive); } log::debug!("Watching paths: {:?}", paths);}
Version Info