From 05dde284a74916a8ff07d6090902737f6a4d69ff Mon Sep 17 00:00:00 2001 From: Suyono Date: Fri, 1 Dec 2023 13:26:09 +1100 Subject: [PATCH] wip: ready to test --- src/init/config.rs | 72 ++++++++++---- src/init/daemon.rs | 54 +++++++++- src/init/daemon/starter.rs | 143 ++++++++++++++++++++++++--- src/init/daemon/starter/time_calc.rs | 77 --------------- src/init/error.rs | 6 ++ 5 files changed, 240 insertions(+), 112 deletions(-) delete mode 100644 src/init/daemon/starter/time_calc.rs diff --git a/src/init/config.rs b/src/init/config.rs index 6fee1f2..b300cca 100644 --- a/src/init/config.rs +++ b/src/init/config.rs @@ -3,11 +3,14 @@ use std::env; use std::path::PathBuf; use std::io::{BufReader, BufRead}; use crate::init::error as wingmate_error; +use anyhow::anyhow; use nix::unistd::{access, AccessFlags}; use lazy_static::lazy_static; use regex::Regex; use anyhow::Context; +pub const MAX_TERM_WAIT_TIME_SECS: u64 = 5; + const CRON_REGEX_STR: &'static str = r"^\s*(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S.*\S)\s*$"; const MINUTE: &'static str = "minute"; const HOUR: &'static str = "hour"; @@ -30,8 +33,7 @@ pub enum Command { pub enum CronTimeFieldSpec { Any, Exact(u8), - MultiOccurrence(Vec), - Every(u8) + MultiOccurrence(Vec) } #[derive(Debug)] @@ -124,7 +126,7 @@ impl Config { let mut match_str = cap.name(MINUTE).ok_or::( wingmate_error::CronParseError::FieldMatch { cron_line: String::from(&l), field_name: String::from(MINUTE) } )?; - let minute = Self::to_cron_time_field_spec(&match_str).map_err(|e| { + let minute = Self::to_cron_time_field_spec(&match_str, 60u8).map_err(|e| { wingmate_error::CronParseError::Parse { source: e, cron_line: String::from(&l), @@ -136,7 +138,7 @@ impl Config { match_str = cap.name(HOUR).ok_or::( wingmate_error::CronParseError::FieldMatch { cron_line: String::from(&l), field_name: String::from(HOUR) } )?; - let hour = Self::to_cron_time_field_spec(&match_str).map_err(|e| { + let hour = Self::to_cron_time_field_spec(&match_str, 24u8).map_err(|e| { wingmate_error::CronParseError::Parse { source: e, cron_line: String::from(&l), @@ -148,7 +150,7 @@ impl Config { match_str = cap.name(DAY_OF_MONTH_ABBRV).ok_or::( wingmate_error::CronParseError::FieldMatch { cron_line: String::from(&l), field_name: String::from(DAY_OF_MONTH) } )?; - let dom = Self::to_cron_time_field_spec(&match_str).map_err(|e| { + let dom = Self::to_cron_time_field_spec(&match_str, 31u8).map_err(|e| { wingmate_error::CronParseError::Parse { source: e, cron_line: String::from(&l), @@ -160,7 +162,7 @@ impl Config { match_str = cap.name(MONTH).ok_or::( wingmate_error::CronParseError::FieldMatch { cron_line: String::from(&l), field_name: String::from(MONTH) } )?; - let month = Self::to_cron_time_field_spec(&match_str).map_err(|e| { + let month = Self::to_cron_time_field_spec(&match_str, 12u8).map_err(|e| { wingmate_error::CronParseError::Parse { source: e, cron_line: String::from(&l), @@ -172,7 +174,7 @@ impl Config { match_str = cap.name(DAY_OF_WEEK_ABBRV).ok_or::( wingmate_error::CronParseError::FieldMatch { cron_line: String::from(&l), field_name: String::from(DAY_OF_WEEK) } )?; - let dow = Self::to_cron_time_field_spec(&match_str).map_err(|e| { + let dow = Self::to_cron_time_field_spec(&match_str, 7u8).map_err(|e| { wingmate_error::CronParseError::Parse { source: e, cron_line: String::from(&l), @@ -200,26 +202,41 @@ impl Config { Ok(ret_vec) } - fn to_cron_time_field_spec(match_str: ®ex::Match) -> Result { + fn to_cron_time_field_spec(match_str: ®ex::Match, max: u8) -> Result { let field = match_str.as_str(); if field == "*" { return Ok(CronTimeFieldSpec::Any); } else if field.starts_with("*/") { let every = field[2..].parse::().context("parsing on field matching \"every\" pattern")?; - return Ok(CronTimeFieldSpec::Every(every)); + if every >= max { + return Err(anyhow!("invalid value {}", every)); + } + let mut next_value = every; + let mut multi: Vec = Vec::new(); + while next_value < max { + multi.push(next_value); + next_value += every; + } + return Ok(CronTimeFieldSpec::MultiOccurrence(multi)); } else if field.contains(",") { let multi: Vec<&str> = field.split(",").collect(); let mut multi_occurrence: Vec = Vec::new(); for m in multi { let ur = m.parse::().context("parsing on field matching \"multi occurrence\" pattern")?; + if ur >= max { + return Err(anyhow!("invalid value {}", field)); + } multi_occurrence.push(ur); } return Ok(CronTimeFieldSpec::MultiOccurrence(multi_occurrence)); } else { let n = field.parse::().context("parsing on field matching \"exact\" pattern")?; + if n >= max { + return Err(anyhow!("invalid value {}", n)); + } return Ok(CronTimeFieldSpec::Exact(n)); } } @@ -300,20 +317,35 @@ impl Clone for Crontab { } } +impl CronTimeFieldSpec { + pub fn is_match(&self, current: u8) -> bool { + match self { + Self::Any => { return true; }, + Self::Exact(x) => { return *x == current; }, + Self::MultiOccurrence(v) => { + for i in v { + if *i == current { + return true; + } + } + } + } + false + } +} + impl Clone for CronTimeFieldSpec { fn clone(&self) -> Self { match self { - CronTimeFieldSpec::Any => CronTimeFieldSpec::Any, - CronTimeFieldSpec::Every(x) => CronTimeFieldSpec::Every(*x), - CronTimeFieldSpec::Exact(x) => CronTimeFieldSpec::Exact(*x), - CronTimeFieldSpec::MultiOccurrence(x) => { - CronTimeFieldSpec::MultiOccurrence(x.clone()) + Self::Any => Self::Any, + Self::Exact(x) => Self::Exact(*x), + Self::MultiOccurrence(x) => { + Self::MultiOccurrence(x.clone()) }, } } } - struct CronFieldCmpHelper<'a>(u8, u8, Option<&'a Vec>); impl PartialEq for CronTimeFieldSpec { fn eq(&self, other: &Self) -> bool { @@ -321,16 +353,14 @@ impl PartialEq for CronTimeFieldSpec { let rhs: CronFieldCmpHelper; match self { CronTimeFieldSpec::Any => { lhs = CronFieldCmpHelper(0, 0, None); } - CronTimeFieldSpec::Every(x) => { lhs = CronFieldCmpHelper(1, *x, None); } - CronTimeFieldSpec::Exact(x) => { lhs = CronFieldCmpHelper(2, *x, None); } - CronTimeFieldSpec::MultiOccurrence(v) => { lhs = CronFieldCmpHelper(3, 0, Some(v)); } + CronTimeFieldSpec::Exact(x) => { lhs = CronFieldCmpHelper(1, *x, None); } + CronTimeFieldSpec::MultiOccurrence(v) => { lhs = CronFieldCmpHelper(1, 0, Some(v)); } } match other { CronTimeFieldSpec::Any => { rhs = CronFieldCmpHelper(0, 0, None); } - CronTimeFieldSpec::Every(x) => { rhs = CronFieldCmpHelper(1, *x, None); } - CronTimeFieldSpec::Exact(x) => { rhs = CronFieldCmpHelper(2, *x, None); } - CronTimeFieldSpec::MultiOccurrence(v) => { rhs = CronFieldCmpHelper(3, 0, Some(v)); } + CronTimeFieldSpec::Exact(x) => { rhs = CronFieldCmpHelper(1, *x, None); } + CronTimeFieldSpec::MultiOccurrence(v) => { rhs = CronFieldCmpHelper(2, 0, Some(v)); } } if lhs.0 == rhs.0 { diff --git a/src/init/daemon.rs b/src/init/daemon.rs index a8324a7..3b7acba 100644 --- a/src/init/daemon.rs +++ b/src/init/daemon.rs @@ -3,9 +3,13 @@ mod waiter; mod starter; mod constants; +use tokio::{select, pin}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use std::sync::{Arc, Mutex}; +use std::time::{Duration,Instant}; +use nix::sys::signal::{Signal, kill}; +use nix::unistd::Pid; use crate::init::config; use crate::init::error as wmerr; use crate::init::error::WingmateInitError; @@ -16,12 +20,18 @@ pub async fn start(cfg: config::Config) -> Result<(), WingmateInitError> { let sighandler_cancel = CancellationToken::new(); let waiter_cancel_sighandler = sighandler_cancel.clone(); + let signal_pump_stop = sighandler_cancel.clone(); let cancel = CancellationToken::new(); let starter_service_cancel = cancel.clone(); let starter_cron_cancel = cancel.clone(); + let signal_pump_start = cancel.clone(); let mut set: JoinSet> = JoinSet::new(); + set.spawn(async move { + signal_pump(signal_pump_start, signal_pump_stop).await + }); + set.spawn(async move { sighandler::sighandler(sig_sync_flag, cancel, sighandler_cancel).await }); @@ -48,7 +58,6 @@ pub async fn start(cfg: config::Config) -> Result<(), WingmateInitError> { return Err(ev); } } - // return Err(ev as Box); } }, Err(e) => { @@ -58,5 +67,48 @@ pub async fn start(cfg: config::Config) -> Result<(), WingmateInitError> { } } + Ok(()) +} + +async fn signal_pump(start: CancellationToken, stop: CancellationToken) -> Result<(), WingmateInitError> { + const TERM_MODE: u8 = 0; + const KILL_MODE: u8 = 1; + const ALL_CHILDREN_PID: i32 = -1; + + start.cancelled().await; + + let stop_time = Instant::now(); + let mut wait_time_millis: u64 = 100; + let mut mode = TERM_MODE; + + 'signal: loop { + let stop = stop.clone(); + let s = tokio::time::sleep(Duration::from_millis(wait_time_millis)); + pin!(s); + + select! { + () = &mut s => { + if mode == TERM_MODE { + if let Err(e) = kill(Pid::from_raw(ALL_CHILDREN_PID), Signal::SIGTERM) { + eprintln!("daemon: sending TERM signal got {}", e); + } + } else { + if let Err(e) = kill(Pid::from_raw(ALL_CHILDREN_PID), Signal::SIGKILL) { + eprintln!("daemon: sending KILL signal got {}", e); + } + } + + let time_peek = Instant::now(); + if time_peek.saturating_duration_since(stop_time).as_secs() >= config::MAX_TERM_WAIT_TIME_SECS && mode == TERM_MODE { + wait_time_millis = 10; + mode = KILL_MODE; + } + } + _ = stop.cancelled() => { + break 'signal; + } + } + } + Ok(()) } \ No newline at end of file diff --git a/src/init/daemon/starter.rs b/src/init/daemon/starter.rs index 9aed693..59038c3 100644 --- a/src/init/daemon/starter.rs +++ b/src/init/daemon/starter.rs @@ -1,22 +1,22 @@ -mod time_calc; - use tokio::task::JoinSet; use tokio::process::{Command, Child}; use tokio_util::sync::CancellationToken; use tokio::select; use tokio::io::Result as tokio_result; -use tokio::time::sleep; +use tokio::time::{sleep, interval}; use std::time::Duration; use std::process::ExitStatus; use nix::sys::signal::{kill, Signal}; use nix::errno::Errno; use nix::unistd::Pid; -use anyhow::Context; -use time::OffsetDateTime; -use crate::init::config::{self, CronTimeFieldSpec}; +use anyhow::{Context, anyhow}; +use time::{OffsetDateTime, Duration as TimeDur, Weekday}; +use crate::init::config; use crate::init::error::{WingmateInitError, CronConfigError}; +const CRON_TRIGGER_WAIT_SECS: u64 = 20; + pub fn start_services(ts: &mut JoinSet>, cfg: &config::Config, cancel: CancellationToken) -> Result<(), WingmateInitError> { @@ -54,7 +54,7 @@ pub fn start_services(ts: &mut JoinSet>, cfg: &con match kill(Pid::from_raw(id as i32), Some(Signal::SIGTERM)) { Ok(_) => { select! { - _ = sleep(Duration::from_secs(5)) => { + _ = sleep(Duration::from_secs(config::MAX_TERM_WAIT_TIME_SECS)) => { child.kill().await.expect("failed to kill process"); }, result = child.wait() => { @@ -112,9 +112,8 @@ pub fn start_cron(ts: &mut JoinSet>, cfg: &config: -> Result<(), WingmateInitError> { for c_ in cfg.get_cron_iter() { - let shell = cfg.get_shell().ok_or::(WingmateInitError::NoShellAvailable)?; let cron = c_.clone(); - let cancel = cancel.clone(); + let in_loop_cancel = cancel.clone(); ts.spawn(async move { if cron.day_of_month != config::CronTimeFieldSpec::Any @@ -123,19 +122,137 @@ pub fn start_cron(ts: &mut JoinSet>, cfg: &config: } // let cron = cron.clone(); + let mut cron_interval = interval(Duration::from_secs(CRON_TRIGGER_WAIT_SECS)); + let mut cron_procs: JoinSet> = JoinSet::new(); let mut last_running: Option = None; 'continuous: loop { let cron = cron.clone(); + let cron_proc_cancel = in_loop_cancel.clone(); - time_calc::wait_calc(cron.clone(), &last_running); - select! { - _ = cancel.cancelled() => { - break 'continuous; + let mut flag = true; + + if let Ok(local_time) = OffsetDateTime::now_local() { + if let Some(last) = last_running { + if local_time - last < TimeDur::minutes(1) { + flag = false; + } else { + flag = flag && cron.minute.is_match(local_time.minute()) && + cron.hour.is_match(local_time.hour()) && + cron.day_of_month.is_match(local_time.day()) && + cron.day_of_week.is_match(weekday_map(local_time.weekday())); + } + } + + if flag { + last_running = Some(local_time); + cron_procs.spawn(async move { + run_cron_command(cron.command.clone(), cron_proc_cancel).await + }); + } + } + + if cron_procs.is_empty() { + select! { + _ = in_loop_cancel.cancelled() => { + break 'continuous; + }, + _ = cron_interval.tick() => {}, + } + } else { + 'task: while !cron_procs.is_empty() { + select! { + opt_res = cron_procs.join_next() => { + if let Some(res) = opt_res { + if let Err(e) = res { + eprintln!("running cron got problem {:?}", e); + } + } + }, + _ = in_loop_cancel.cancelled() => { + while let Some(res) = cron_procs.join_next().await { + if let Err(e) = res { + eprintln!("running cron got problem {:?}", e); + } + } + break 'continuous; + }, + _ = cron_interval.tick() => { + break 'task; + }, + } } } } Ok(()) }); } + Ok(()) +} + +fn weekday_map(wd: Weekday) -> u8 { + match wd { + Weekday::Sunday => 0, + Weekday::Monday => 1, + Weekday::Tuesday => 2, + Weekday::Wednesday => 3, + Weekday::Thursday => 4, + Weekday::Friday => 5, + Weekday::Saturday => 6 + } +} + +async fn run_cron_command(command: String, cancel: CancellationToken) -> Result<(), WingmateInitError> { + let mut args: Vec<&str> = Vec::new(); + for part in command.split(' ') { + if part.len() > 0 { + args.push(part); + } + } + + if args.is_empty() { + return Err(WingmateInitError::Other { source: anyhow!("parsed as empty: {}", command) }); + } + + let cmd = args.swap_remove(0); + let mut child: Child; + if args.is_empty() { + child = Command::new(cmd).spawn().map_err(|e| { + WingmateInitError::SpawnError { source: e, message: command } + })?; + } else { + child = Command::new(cmd).args(args.as_slice()).spawn().map_err(|e| { + WingmateInitError::SpawnError { source: e, message: command } + })?; + } + + select! { + _ = cancel.cancelled() => { + if let Some(id) = child.id() { + match kill(Pid::from_raw(id as i32), Some(Signal::SIGTERM)) { + Ok(_) => { + if let Err(e) = result_match(child.wait().await) { + return Err(WingmateInitError::ChildExit { source: e }); + } + }, + Err(e) => { + match e { + Errno::ESRCH => { + return Err(WingmateInitError::ChildNotFound); + }, + _ => { + return Err(WingmateInitError::FromNix { source: e }); + } + } + } + } + } + }, + result = child.wait() => { + if let Err(e) = result_match(result) { + return Err(WingmateInitError::ChildExit { source: e }); + } + } + } + Ok(()) } \ No newline at end of file diff --git a/src/init/daemon/starter/time_calc.rs b/src/init/daemon/starter/time_calc.rs deleted file mode 100644 index 714541d..0000000 --- a/src/init/daemon/starter/time_calc.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::time::Duration; -use anyhow::Context; -use time::OffsetDateTime; -use crate::init::config::{Crontab,CronTimeFieldSpec}; -use crate::init::error; - -const MINUTE: i8 = 0; -const HOUR: i8 = 1; -const DAY_OF_MONTH: i8 = 2; -const MONTH: i8 = 3; -const DAY_OF_WEEK: i8 = 4; - -struct CronField { - spec: CronTimeFieldSpec, - tag: i8, -} - -fn convert_cron_spec_to_vec(cron: Crontab) -> Vec { - let mut res: Vec = Vec::with_capacity(5); - - res.push(CronField { spec: cron.minute, tag: MINUTE }); - res.push(CronField { spec: cron.hour, tag: HOUR }); - res.push(CronField { spec: cron.day_of_month, tag: DAY_OF_MONTH }); - res.push(CronField { spec: cron.month, tag: MONTH }); - res.push(CronField { spec: cron.day_of_week, tag: DAY_OF_WEEK }); - res -} - -pub fn wait_calc(cron: Crontab, last_running: &Option) -> Result { - let local_clock = OffsetDateTime::now_local() - .context("getting current time in local timezone") - .map_err(|e| { error::CronConfigError::Other { source: e } })?; - - match last_running { - Some(t) => { - let vec_cron = convert_cron_spec_to_vec(cron); - for vc in vec_cron { - - } - - // match cron.minute { - // CronTimeFieldSpec::Any => {}, - // CronTimeFieldSpec::Every(x) => {}, - // CronTimeFieldSpec::Exact(x) => {}, - // CronTimeFieldSpec::MultiOccurrence(v) => {} - // } - // match cron.hour { - // CronTimeFieldSpec::Any => {}, - // CronTimeFieldSpec::Every(x) => {}, - // CronTimeFieldSpec::Exact(x) => {}, - // CronTimeFieldSpec::MultiOccurrence(v) => {} - // } - // match cron.day_of_month { - // CronTimeFieldSpec::Any => {}, - // CronTimeFieldSpec::Every(x) => {}, - // CronTimeFieldSpec::Exact(x) => {}, - // CronTimeFieldSpec::MultiOccurrence(v) => {} - // } - // match cron.month { - // CronTimeFieldSpec::Any => {}, - // CronTimeFieldSpec::Every(x) => {}, - // CronTimeFieldSpec::Exact(x) => {}, - // CronTimeFieldSpec::MultiOccurrence(v) => {} - // } - // match cron.day_of_week { - // CronTimeFieldSpec::Any => {}, - // CronTimeFieldSpec::Every(x) => {}, - // CronTimeFieldSpec::Exact(x) => {}, - // CronTimeFieldSpec::MultiOccurrence(v) => {} - // } - }, - None => { - } - } - - Ok(Duration::from_secs(1)) //PLACEHOLDER -} \ No newline at end of file diff --git a/src/init/error.rs b/src/init/error.rs index 8cdf1a4..2634896 100644 --- a/src/init/error.rs +++ b/src/init/error.rs @@ -57,6 +57,12 @@ pub enum WingmateInitError { source: CronConfigError, }, + #[error("from nix")] + FromNix { + #[source] + source: nix::Error, + }, + #[error("tripped over")] Other { #[source]