Use tokio in asusctl

This commit is contained in:
Luke D. Jones
2022-09-22 22:36:16 +12:00
parent 3b9cf474a7
commit 9608d190b9
15 changed files with 148 additions and 209 deletions
+43 -97
View File
@@ -23,12 +23,10 @@ use std::time::Duration;
use crate::error::RogError;
use async_trait::async_trait;
use config::Config;
use inotify::{Inotify, WatchMask};
use log::warn;
use logind_zbus::manager::ManagerProxy;
use smol::{stream::StreamExt, Executor, Timer};
use zbus::{Connection, SignalContext};
use tokio::time;
use zbus::{export::futures_util::StreamExt, Connection, SignalContext};
use zvariant::ObjectPath;
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -65,6 +63,8 @@ pub trait ZbusAdd {
/// - `notify_<name>(SignalContext, SomeValue)`
///
/// In most cases if `SomeValue` is stored in a config then `<name>()` getter is expected to update it.
/// The getter should *never* write back to the path or attribute that is being watched or an
/// infinite loop will occur.
///
/// # Example
///
@@ -78,28 +78,24 @@ pub trait ZbusAdd {
macro_rules! task_watch_item {
($name:ident $self_inner:ident) => {
concat_idents::concat_idents!(fn_name = watch_, $name {
async fn fn_name<'a>(
async fn fn_name(
&self,
executor: &mut Executor<'a>,
signal_ctxt: SignalContext<'a>,
signal_ctxt: SignalContext<'static>,
) -> Result<(), RogError> {
use zbus::export::futures_util::StreamExt;
let ctrl = self.clone();
concat_idents::concat_idents!(watch_fn = monitor_, $name {
let mut watch = self.$self_inner.watch_fn()?;
executor
.spawn(async move {
let mut buffer = [0; 1024];
tokio::spawn(async move {
let mut buffer = [0; 32];
watch.event_stream(&mut buffer).unwrap().for_each(|_| async {
let value = ctrl.$name();
dbg!(value);
concat_idents::concat_idents!(notif_fn = notify_, $name {
Self::notif_fn(&signal_ctxt, value).await.unwrap();
});
}).await;
})
.detach();
});
});
Ok(())
}
@@ -112,59 +108,23 @@ macro_rules! task_watch_item {
pub trait CtrlTask {
/// Implement to set up various tasks that may be required, using the `Executor`.
/// No blocking loops are allowed, or they must be run on a separate thread.
async fn create_tasks<'a>(
&self,
executor: &mut Executor<'a>,
signal: SignalContext<'a>,
) -> Result<(), RogError>;
/// Free method to run a task when the path is modified
///
/// Not very useful if you need to also do a zbus notification.
fn create_tasks_inotify(
&self,
executor: &mut Executor,
path: &str,
mut task: impl FnMut() + Send + 'static,
) -> Result<(), RogError> {
let mut inotify = Inotify::init()?;
inotify.add_watch(path, WatchMask::MODIFY)?;
let mut buffer = [0; 1024];
executor
.spawn(async move {
loop {
if let Ok(events) = inotify.read_events_blocking(&mut buffer) {
for _ in events {
task()
}
}
}
})
.detach();
Ok(())
}
async fn create_tasks(&self, signal: SignalContext<'static>) -> Result<(), RogError>;
/// Create a timed repeating task
async fn repeating_task(
&self,
millis: u64,
executor: &mut Executor,
mut task: impl FnMut() + Send + 'static,
) {
let timer = Timer::interval(Duration::from_millis(millis));
executor
.spawn(async move {
timer.for_each(|_| task()).await;
})
.detach();
async fn repeating_task(&self, millis: u64, mut task: impl FnMut() + Send + 'static) {
let mut timer = time::interval(Duration::from_millis(millis));
tokio::spawn(async move {
timer.tick().await;
task();
});
}
/// Free helper method to create tasks to run on: sleep, wake, shutdown, boot
///
/// The closures can potentially block, so execution time should be the minimal possible
/// such as save a variable.
async fn create_sys_event_tasks(
&self,
executor: &mut Executor,
mut on_sleep: impl FnMut() + Send + 'static,
mut on_wake: impl FnMut() + Send + 'static,
mut on_shutdown: impl FnMut() + Send + 'static,
@@ -178,54 +138,40 @@ pub trait CtrlTask {
.await
.expect("Controller could not create ManagerProxy");
executor
.spawn(async move {
if let Ok(notif) = manager.receive_prepare_for_sleep().await {
notif
.for_each(|event| {
if let Ok(args) = event.args() {
if args.start {
on_sleep();
} else if !args.start() {
on_wake();
}
}
})
.await;
tokio::spawn(async move {
if let Ok(mut notif) = manager.receive_prepare_for_sleep().await {
while let Some(event) = notif.next().await {
if let Ok(args) = event.args() {
if args.start {
on_sleep();
} else if !args.start() {
on_wake();
}
}
}
})
.detach();
}
});
let manager = ManagerProxy::new(&connection)
.await
.expect("Controller could not create ManagerProxy");
executor
.spawn(async move {
if let Ok(notif) = manager.receive_prepare_for_shutdown().await {
notif
.for_each(|event| {
if let Ok(args) = event.args() {
if args.start {
on_shutdown();
} else if !args.start() {
on_boot();
}
}
})
.await;
tokio::spawn(async move {
if let Ok(mut notif) = manager.receive_prepare_for_shutdown().await {
while let Some(event) = notif.next().await {
if let Ok(args) = event.args() {
if args.start {
on_shutdown();
} else if !args.start() {
on_boot();
}
}
}
})
.detach();
}
});
}
}
pub trait CtrlTaskComplex {
type A;
fn do_task(&mut self, config: &mut Config, event: Self::A);
}
pub trait GetSupported {
type A;