wip: initial test

This commit is contained in:
Suyono 2023-11-22 13:23:39 +11:00
parent ce9df6364f
commit 3cf595f08e
7 changed files with 130 additions and 30 deletions

26
Cargo.lock generated
View File

@ -71,6 +71,18 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "futures-core"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]]
name = "futures-sink"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.28.0" version = "0.28.0"
@ -289,6 +301,19 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-util"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.12" version = "1.0.12"
@ -373,4 +398,5 @@ version = "0.1.0"
dependencies = [ dependencies = [
"nix", "nix",
"tokio", "tokio",
"tokio-util",
] ]

View File

@ -8,3 +8,4 @@ edition = "2021"
[dependencies] [dependencies]
nix = {version = "0.27.1", features = ["process", "signal"]} nix = {version = "0.27.1", features = ["process", "signal"]}
tokio = { version = "1.34.0", features = ["full"] } tokio = { version = "1.34.0", features = ["full"] }
tokio-util = "0.7.10"

View File

@ -1,29 +1,37 @@
mod sighandler; mod sighandler;
mod waiter; mod waiter;
mod starter; mod starter;
mod constants;
use std::error; use std::error;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio::sync::watch; use tokio_util::sync::CancellationToken;
use std::sync::{Arc, Mutex};
pub async fn start() -> Result<(), Box<dyn error::Error>> { pub async fn start() -> Result<(), Box<dyn error::Error>> {
let (tx, mut _rx) = watch::channel::<i32>(1); let sync_flag = Arc::new(Mutex::new(false));
let sig_sync_flag = sync_flag.clone();
let sighandler_cancel = CancellationToken::new();
let waiter_cancel_sighandler = sighandler_cancel.clone();
let cancel = CancellationToken::new();
let starter_cancel = cancel.clone();
let mut set: JoinSet<Result<(), Box<dyn error::Error + Send + Sync>>> = JoinSet::new(); let mut set: JoinSet<Result<(), Box<dyn error::Error + Send + Sync>>> = JoinSet::new();
set.spawn(async move { set.spawn(async move {
sighandler::sighandler(tx).await sighandler::sighandler(sig_sync_flag, cancel, sighandler_cancel).await
}); });
//TODO: start the process starter //TODO: start the process starter
starter::start_process(&mut set); starter::start_process(&mut set, starter_cancel);
//TODO: spawn_blocking for waiter //TODO: spawn_blocking for waiter
set.spawn_blocking(move || { set.spawn_blocking(move || {
waiter::wait_all(); waiter::wait_all(sync_flag, waiter_cancel_sighandler);
Ok(()) Ok(())
}); });
//TODO: we can't just return error when we got an error from a task
while let Some(res) = set.join_next().await { while let Some(res) = set.join_next().await {
match res { match res {
Ok(v) => { Ok(v) => {

View File

View File

@ -1,9 +1,10 @@
use std::error; use std::error;
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use tokio::select; use tokio::select;
use tokio::sync::watch::Sender; use std::sync::{Arc, Mutex};
use tokio_util::sync::CancellationToken;
pub async fn sighandler(s: Sender<i32>) -> Result<(), Box<dyn error::Error + Send + Sync>> { pub async fn sighandler(flag: Arc<Mutex<bool>>, cancel: CancellationToken, exit: CancellationToken) -> Result<(), Box<dyn error::Error + Send + Sync>> {
let mut sigint = signal(SignalKind::interrupt())?; let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?; let mut sigterm = signal(SignalKind::terminate())?;
let mut sigchld = signal(SignalKind::child())?; let mut sigchld = signal(SignalKind::child())?;
@ -12,20 +13,28 @@ pub async fn sighandler(s: Sender<i32>) -> Result<(), Box<dyn error::Error + Sen
select! { select! {
_ = sigint.recv() => { _ = sigint.recv() => {
println!("got SIGINT"); println!("got SIGINT");
drop(s); initiate_stop(flag.clone(), cancel.clone());
break 'signal;
}, },
_ = sigterm.recv() => { _ = sigterm.recv() => {
println!("got SIGTERM"); println!("got SIGTERM");
drop(s); initiate_stop(flag.clone(), cancel.clone());
break 'signal;
}, },
_ = sigchld.recv() => { _ = sigchld.recv() => {
// do nothing intentionally // do nothing intentionally
// return Err(())
}, },
_ = exit.cancelled() => {
break 'signal;
}
} }
} }
Ok(()) Ok(())
} }
fn initiate_stop(flag: Arc<Mutex<bool>>, cancel: CancellationToken) {
{
let mut fl = flag.lock().unwrap();
*fl = true;
}
cancel.cancel();
}

View File

@ -1,31 +1,60 @@
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio::process::Command; use tokio::process::Command;
use tokio_util::sync::CancellationToken;
use tokio::select;
use tokio::io::Result as tokio_result;
use tokio::time::sleep;
use std::time::Duration;
use std::process::ExitStatus;
use std::error; use std::error;
use nix::sys::signal::{kill, Signal};
use nix::errno::Errno;
use nix::unistd::Pid;
pub fn start_process(ts: &mut JoinSet<Result<(), Box<dyn error::Error + Send + Sync>>>) {
pub fn start_process(ts: &mut JoinSet<Result<(), Box<dyn error::Error + Send + Sync>>>, cancel: CancellationToken) {
for _j in 0..5 { for _j in 0..5 {
let cancel = cancel.clone();
ts.spawn(async move { ts.spawn(async move {
for _i in 0..5 { 'autorestart: loop {
let mut child = Command::new("sleep").arg("1") let mut child = Command::new("sleep").arg("1")
.spawn() .spawn()
.expect("failed to spawn"); .expect("failed to spawn");
match child.wait().await {
Ok(status) => { select! {
println!("starter: sleep exited: {}", status); _ = cancel.cancelled() => {
if let Some(id) = child.id() {
match kill(Pid::from_raw(id as i32), Some(Signal::SIGTERM)) {
Ok(_) => {
select! {
_ = sleep(Duration::from_secs(5)) => {
child.kill().await.expect("failed to kill process");
},
result = child.wait() => {
if let Err(e) = result_match(result) {
return Err(e);
}
break 'autorestart;
}
}
}, },
Err(e) => { Err(e) => {
if let Some(eos) = e.raw_os_error() { if e != Errno::ESRCH {
if eos != nix::Error::ECHILD as i32 {
return Err(e.into()); return Err(e.into());
}
} else { } else {
return Err(e.into()); break 'autorestart;
}
}
}
}
},
result = child.wait() => {
if let Err(e) = result_match(result) {
return Err(e);
} }
}, },
} }
// let status = child.wait().await?;
// println!("starter: sleep exited: {}", status);
} }
println!("starter: task completed"); println!("starter: task completed");
Ok(()) Ok(())
@ -34,3 +63,20 @@ pub fn start_process(ts: &mut JoinSet<Result<(), Box<dyn error::Error + Send + S
} }
println!("starter: spawning completed"); println!("starter: spawning completed");
} }
fn result_match(result: tokio_result<ExitStatus>) -> Result<(), Box<dyn error::Error + Send + Sync>> {
if let Err(e) = result {
if let Some(eos) = e.raw_os_error() {
if eos != nix::Error::ECHILD as i32 {
return Err(e.into());
}
} else {
return Err(e.into());
}
}
//TODO: remove me! this is for debug + tracing purpose
println!("starter: sleep exited");
Ok(())
}

View File

@ -1,8 +1,11 @@
use nix::errno::Errno; use nix::errno::Errno;
use nix::sys::wait::{self, WaitStatus}; use nix::sys::wait::{self, WaitStatus};
use nix::unistd::Pid; use nix::unistd::Pid;
use std::sync::{Mutex, Arc};
use std::{thread, time};
use tokio_util::sync::CancellationToken;
pub fn wait_all() { pub fn wait_all(flag: Arc<Mutex<bool>>, stop_sighandler: CancellationToken) {
'wait: loop { 'wait: loop {
match wait::waitpid(Pid::from_raw(-1), None) { match wait::waitpid(Pid::from_raw(-1), None) {
Ok(x) => { Ok(x) => {
@ -21,7 +24,14 @@ pub fn wait_all() {
dbg!(err); dbg!(err);
match err { match err {
Errno::ECHILD => { Errno::ECHILD => {
let fl = flag.lock().unwrap();
if *fl {
stop_sighandler.cancel();
break 'wait; break 'wait;
} else {
drop(fl);
thread::sleep(time::Duration::from_millis(100));
}
}, },
_ => {} _ => {}
} }