Skip to main content

thedes_tui_core/
audio.rs

1use std::{borrow::Cow, collections::HashMap, mem};
2
3use thedes_async_util::{
4    non_blocking,
5    timer::{TickSession, Timer},
6};
7use thiserror::Error;
8use tokio::task;
9use tokio_util::sync::CancellationToken;
10
11use crate::{
12    audio::device::{AudioDevice, AudioSinkDevice},
13    runtime,
14};
15
16pub mod device;
17
18pub type Volume = u8;
19
20#[derive(Debug, Error)]
21pub enum PlayNowError {
22    #[error("Device failed to play")]
23    DevicePlayNow(#[from] device::PlayNowError),
24    #[error("Device failed to open sink")]
25    DeviceOpenSink(#[from] device::OpenSinkError),
26    #[error("Device failed to set volume")]
27    DeviceSetVolume(#[from] device::SetVolumeError),
28}
29
30#[derive(Debug, Error)]
31pub enum Error {
32    #[error("Device failed to play")]
33    DevicePlayNow(#[from] device::PlayNowError),
34    #[error("Device failed to set volume")]
35    DeviceSetVolume(#[from] device::SetVolumeError),
36    #[error("Device failed to pause a sink")]
37    DevicePause(#[from] device::PauseSinkError),
38    #[error("Device failed to resume a sink")]
39    DeviceResume(#[from] device::ResumeSinkError),
40    #[error("Device failed to clear a sink")]
41    DeviceClear(#[from] device::ClearSinkError),
42    #[error("Device failed to check play status")]
43    DeviceIsPlaying(#[from] device::CheckPlayStatusError),
44    #[error("Device failed to open sink")]
45    DeviceOpenSink(#[from] device::OpenSinkError),
46}
47
48#[derive(Debug, Error)]
49#[error(transparent)]
50pub struct FlushError {
51    inner: non_blocking::spsc::unbounded::SendError<Vec<Command>>,
52}
53
54impl FlushError {
55    fn new(
56        inner: non_blocking::spsc::unbounded::SendError<Vec<Command>>,
57    ) -> Self {
58        Self { inner }
59    }
60
61    pub fn into_bounced_commands(self) -> Vec<Command> {
62        self.inner.into_message()
63    }
64}
65
66#[derive(Debug)]
67pub enum Command {
68    EnterRepeated(Cow<'static, str>, Cow<'static, [u8]>, PlayOptions),
69    LeaveRepeated(Cow<'static, str>),
70    PlayOnce(Cow<'static, str>, Cow<'static, [u8]>, PlayOptions),
71    Pause(Cow<'static, str>),
72    Resume(Cow<'static, str>),
73    Clear(Cow<'static, str>),
74    SetVolume(Cow<'static, str>, u8),
75}
76
77impl Command {
78    pub fn new_enter_repeated(
79        name: impl Into<Cow<'static, str>>,
80        bytes: impl Into<Cow<'static, [u8]>>,
81    ) -> Self {
82        Self::new_enter_repeated_with(name, bytes, PlayOptions::default())
83    }
84
85    pub fn new_enter_repeated_with(
86        name: impl Into<Cow<'static, str>>,
87        bytes: impl Into<Cow<'static, [u8]>>,
88        options: PlayOptions,
89    ) -> Self {
90        Self::EnterRepeated(name.into(), bytes.into(), options)
91    }
92
93    pub fn new_leave_repeated(name: impl Into<Cow<'static, str>>) -> Self {
94        Self::LeaveRepeated(name.into())
95    }
96
97    pub fn new_play_once(
98        name: impl Into<Cow<'static, str>>,
99        bytes: impl Into<Cow<'static, [u8]>>,
100    ) -> Self {
101        Self::new_play_once_with(name, bytes, PlayOptions::default())
102    }
103
104    pub fn new_play_once_with(
105        name: impl Into<Cow<'static, str>>,
106        bytes: impl Into<Cow<'static, [u8]>>,
107        options: PlayOptions,
108    ) -> Self {
109        Self::PlayOnce(name.into(), bytes.into(), options)
110    }
111
112    pub fn new_pause(name: impl Into<Cow<'static, str>>) -> Self {
113        Self::Pause(name.into())
114    }
115
116    pub fn new_resume(name: impl Into<Cow<'static, str>>) -> Self {
117        Self::Resume(name.into())
118    }
119
120    pub fn new_clear(name: impl Into<Cow<'static, str>>) -> Self {
121        Self::Clear(name.into())
122    }
123
124    pub fn new_set_volume(
125        name: impl Into<Cow<'static, str>>,
126        level: u8,
127    ) -> Self {
128        Self::SetVolume(name.into(), level)
129    }
130}
131
132#[derive(Debug)]
133pub(crate) struct OpenResources {
134    pub device: Box<dyn AudioDevice>,
135    pub timer: Timer,
136    pub cancel_token: CancellationToken,
137}
138
139#[derive(Debug)]
140pub(crate) struct AudioHandles {
141    pub controller: AudioControllerHandle,
142}
143
144#[derive(Debug)]
145pub struct Config {
146    _private: (),
147}
148
149impl Config {
150    pub fn new() -> Self {
151        Self { _private: () }
152    }
153
154    pub(crate) fn open(
155        self,
156        resources: OpenResources,
157        join_set: &mut runtime::JoinSet,
158    ) -> AudioHandles {
159        let (command_sender, command_receiver) =
160            non_blocking::spsc::unbounded::channel();
161
162        join_set.spawn(async move {
163            let mut reactor = Reactor::new(resources, command_receiver);
164            reactor.run().await
165        });
166
167        AudioHandles { controller: AudioControllerHandle::new(command_sender) }
168    }
169}
170
171#[derive(Debug)]
172pub struct AudioControllerHandle {
173    command_sender: non_blocking::spsc::unbounded::Sender<Vec<Command>>,
174    command_queue: Vec<Command>,
175}
176
177impl AudioControllerHandle {
178    fn new(
179        command_sender: non_blocking::spsc::unbounded::Sender<Vec<Command>>,
180    ) -> Self {
181        Self { command_sender, command_queue: Vec::new() }
182    }
183
184    pub fn is_connected(&self) -> bool {
185        self.command_sender.is_connected()
186    }
187
188    pub fn queue<I>(&mut self, commands: I)
189    where
190        I: IntoIterator<Item = Command>,
191    {
192        self.command_queue.extend(commands);
193    }
194
195    pub fn flush(&mut self) -> Result<(), FlushError> {
196        let commands = mem::take(&mut self.command_queue);
197        self.command_sender.send(commands).map_err(FlushError::new)
198    }
199}
200
201#[derive(Debug, Clone)]
202pub struct PlayOptions {
203    pub relative_volume: Volume,
204}
205
206impl Default for PlayOptions {
207    fn default() -> Self {
208        Self { relative_volume: Volume::MAX }
209    }
210}
211
212impl PlayOptions {
213    pub fn apply_playing(
214        &self,
215        sink: &mut dyn AudioSinkDevice,
216        group_float_volume: f32,
217    ) -> Result<(), Error> {
218        let max = self.relative_volume as f32 / Volume::MAX as f32;
219        let compressed_volume = group_float_volume * max;
220        sink.set_volume(compressed_volume)?;
221        Ok(())
222    }
223}
224
225#[derive(Debug, Clone)]
226struct RepeatedSinkFrame {
227    bytes: Cow<'static, [u8]>,
228    options: PlayOptions,
229}
230
231#[derive(Debug)]
232struct RepeatedSink {
233    inner: Box<dyn AudioSinkDevice>,
234    curr: RepeatedSinkFrame,
235    prev: Vec<RepeatedSinkFrame>,
236}
237
238impl RepeatedSink {
239    pub fn play(&mut self, group_float_volume: f32) -> Result<(), Error> {
240        self.apply_options_playing(group_float_volume)?;
241        self.inner.play_now(self.curr.bytes.clone())?;
242        Ok(())
243    }
244
245    pub fn apply_options_playing(
246        &mut self,
247        group_float_volume: f32,
248    ) -> Result<(), Error> {
249        self.curr.options.apply_playing(&mut *self.inner, group_float_volume)
250    }
251}
252
253#[derive(Debug)]
254struct OnceSink {
255    inner: Box<dyn AudioSinkDevice>,
256    options: PlayOptions,
257}
258
259#[derive(Debug)]
260struct AudioSinkGroup {
261    once_sinks: Vec<Option<Box<OnceSink>>>,
262    repeated_sink: Option<Box<RepeatedSink>>,
263    paused: bool,
264    volume: Volume,
265}
266
267impl AudioSinkGroup {
268    fn new() -> Self {
269        Self {
270            once_sinks: Vec::new(),
271            paused: false,
272            repeated_sink: None,
273            volume: 127,
274        }
275    }
276
277    fn float_volume(&self) -> f32 {
278        f32::from(self.volume) / 255.0
279    }
280
281    fn set_volume(&mut self, volume: Volume) -> Result<(), Error> {
282        self.volume = volume;
283        let volume = self.float_volume();
284        if let Some(sink) = &mut self.repeated_sink {
285            sink.apply_options_playing(volume)?;
286        }
287        let once_sinks =
288            self.once_sinks.iter_mut().filter_map(|sink| sink.as_mut());
289        for sink in once_sinks {
290            sink.options.apply_playing(&mut *sink.inner, volume)?;
291        }
292        Ok(())
293    }
294
295    fn enter_repeated(
296        &mut self,
297        device: &mut dyn AudioDevice,
298        bytes: Cow<'static, [u8]>,
299        options: PlayOptions,
300    ) -> Result<(), Error> {
301        let frame = RepeatedSinkFrame { options, bytes };
302        match self.repeated_sink.take() {
303            Some(mut sink) => {
304                let prev = mem::replace(&mut sink.curr, frame);
305                sink.prev.push(prev);
306                sink.inner.clear()?;
307                sink.play(self.float_volume())?;
308                self.repeated_sink = Some(sink);
309            },
310            None => {
311                let inner = device.open_sink()?;
312                let mut sink = Box::new(RepeatedSink {
313                    curr: frame,
314                    prev: Vec::new(),
315                    inner,
316                });
317                sink.play(self.float_volume())?;
318                self.repeated_sink = Some(sink);
319            },
320        };
321        Ok(())
322    }
323
324    pub fn leave_repeated(&mut self) -> Result<(), Error> {
325        if let Some(mut sink) = self.repeated_sink.take() {
326            if let Some(prev) = sink.prev.pop() {
327                sink.curr = prev;
328                sink.play(self.float_volume())?;
329                self.repeated_sink = Some(sink);
330            }
331        }
332        Ok(())
333    }
334
335    fn play_once(
336        &mut self,
337        device: &mut dyn AudioDevice,
338        bytes: Cow<'static, [u8]>,
339        options: PlayOptions,
340    ) -> Result<(), Error> {
341        let mut inner = device.open_sink()?;
342        inner.play_now(bytes.clone())?;
343        options.apply_playing(&mut inner, self.float_volume())?;
344        let sink = OnceSink { inner, options };
345        if let Some(entry) = self
346            .once_sinks
347            .iter_mut()
348            .find(|maybe_device| maybe_device.is_none())
349        {
350            *entry = Some(Box::new(sink));
351        } else {
352            self.once_sinks.push(Some(Box::new(sink)));
353        }
354        if self.paused {
355            self.resume()?;
356        }
357        Ok(())
358    }
359
360    fn pause(&mut self) -> Result<(), Error> {
361        self.paused = true;
362        if let Some(sink) = &mut self.repeated_sink {
363            sink.inner.pause()?;
364        }
365        for maybe_sink in &mut self.once_sinks {
366            if let Some(sink) = maybe_sink {
367                sink.inner.pause()?;
368            }
369        }
370        Ok(())
371    }
372
373    fn resume(&mut self) -> Result<(), Error> {
374        self.paused = false;
375        let float_volume = self.float_volume();
376        if let Some(sink) = &mut self.repeated_sink {
377            sink.inner.resume()?;
378            sink.apply_options_playing(float_volume)?;
379        }
380        for maybe_sink in &mut self.once_sinks {
381            if let Some(sink) = maybe_sink {
382                sink.inner.resume()?;
383            }
384        }
385        Ok(())
386    }
387
388    fn clear(&mut self) -> Result<(), Error> {
389        self.paused = true;
390        if let Some(mut sink) = self.repeated_sink.take() {
391            sink.inner.pause()?;
392        }
393        for maybe_sink in self.once_sinks.drain(..) {
394            if let Some(mut sink) = maybe_sink {
395                sink.inner.pause()?;
396            }
397        }
398        Ok(())
399    }
400
401    fn collect_garbage(&mut self) {
402        if self.paused {
403            return;
404        }
405
406        for maybe_sink in &mut self.once_sinks {
407            if !maybe_sink
408                .as_ref()
409                .is_some_and(|sink| sink.inner.is_playing().is_ok_and(|is| is))
410            {
411                *maybe_sink = None;
412            }
413        }
414        if let Some(last_some) =
415            self.once_sinks.iter().rposition(Option::is_some)
416        {
417            self.once_sinks.drain(last_some + 1 ..);
418        }
419    }
420
421    fn revive_repeated(&mut self) -> Result<(), Error> {
422        let float_volume = self.float_volume();
423        if let Some(sink) = &mut self.repeated_sink {
424            if !sink.inner.is_playing()? {
425                sink.play(float_volume)?;
426            }
427        }
428        Ok(())
429    }
430
431    fn is_empty(&self) -> bool {
432        self.once_sinks.is_empty() && self.repeated_sink.is_none()
433    }
434}
435
436#[derive(Debug)]
437struct Reactor {
438    device: Box<dyn AudioDevice>,
439    cancel_token: CancellationToken,
440    ticker: TickSession,
441    command_receiver: non_blocking::spsc::unbounded::Receiver<Vec<Command>>,
442    sinks: HashMap<Cow<'static, str>, AudioSinkGroup>,
443    groups_to_be_cleared: Vec<Cow<'static, str>>,
444}
445
446impl Reactor {
447    pub fn new(
448        resources: OpenResources,
449        command_receiver: non_blocking::spsc::unbounded::Receiver<Vec<Command>>,
450    ) -> Self {
451        Self {
452            device: resources.device,
453            cancel_token: resources.cancel_token,
454            ticker: resources.timer.new_session(),
455            command_receiver,
456            sinks: HashMap::new(),
457            groups_to_be_cleared: Vec::new(),
458        }
459    }
460
461    pub async fn run(&mut self) -> Result<(), runtime::Error> {
462        let mut commands = Vec::<Command>::new();
463
464        loop {
465            if !self.execute_commands_sent(&mut commands)? {
466                break;
467            }
468
469            tokio::select! {
470                _ = self.ticker.tick() => (),
471                _ = self.cancel_token.cancelled() => {
472                    tracing::info!("Audio reactor token cancellation detected");
473                    break
474                },
475            }
476
477            self.tick()?;
478
479            tokio::select! {
480                _ = self.ticker.tick() => (),
481                _ = self.cancel_token.cancelled() => {
482                    tracing::info!("Audio reactor token cancellation detected");
483                    break
484                },
485            }
486        }
487
488        Ok(())
489    }
490
491    fn execute_commands_sent(
492        &mut self,
493        buf: &mut Vec<Command>,
494    ) -> Result<bool, Error> {
495        let Ok(command_iterator) = self.command_receiver.recv_many() else {
496            tracing::info!("Audio reactor command sender disconnected");
497            return Ok(false);
498        };
499        buf.extend(command_iterator.flatten());
500        for command in buf.drain(..) {
501            self.execute_command(command)?;
502        }
503        Ok(true)
504    }
505
506    fn execute_command(&mut self, command: Command) -> Result<(), Error> {
507        task::block_in_place(|| match command {
508            Command::EnterRepeated(name, bytes, options) => {
509                self.enter_repeated(name, bytes, options)
510            },
511            Command::LeaveRepeated(name) => self.leave_repeated(name),
512            Command::PlayOnce(group, bytes, options) => {
513                self.play_once(group, bytes, options)
514            },
515            Command::Pause(group) => self.pause(group),
516            Command::Resume(group) => self.resume(group),
517            Command::Clear(group) => self.clear(group),
518            Command::SetVolume(group, level) => self.set_volume(group, level),
519        })
520    }
521
522    fn enter_repeated(
523        &mut self,
524        group: Cow<'static, str>,
525        bytes: Cow<'static, [u8]>,
526        options: PlayOptions,
527    ) -> Result<(), Error> {
528        self.with_sink_group(&group[..], move |group, device| {
529            group.enter_repeated(device, bytes, options)
530        })
531    }
532
533    fn leave_repeated(
534        &mut self,
535        group: Cow<'static, str>,
536    ) -> Result<(), Error> {
537        self.with_sink_group(&group[..], move |group, _device| {
538            group.leave_repeated()
539        })
540    }
541
542    fn play_once(
543        &mut self,
544        group: Cow<'static, str>,
545        bytes: Cow<'static, [u8]>,
546        options: PlayOptions,
547    ) -> Result<(), Error> {
548        self.with_sink_group(&group[..], move |group, device| {
549            group.play_once(device, bytes, options)
550        })
551    }
552
553    fn set_volume(
554        &mut self,
555        group: Cow<'static, str>,
556        volume: u8,
557    ) -> Result<(), Error> {
558        self.with_sink_group(&group, move |group, _device| {
559            group.set_volume(volume)
560        })
561    }
562
563    fn pause(&mut self, group: Cow<'static, str>) -> Result<(), Error> {
564        self.with_sink_group(&group[..], move |group, _device| group.pause())
565    }
566
567    fn resume(&mut self, group: Cow<'static, str>) -> Result<(), Error> {
568        self.with_sink_group(&group[..], move |group, _device| group.resume())
569    }
570
571    fn clear(&mut self, group: Cow<'static, str>) -> Result<(), Error> {
572        self.with_sink_group(&group[..], move |group, _device| group.clear())
573    }
574
575    fn clear_all(&mut self) {
576        for group in self.sinks.values_mut() {
577            _ = group.clear();
578        }
579    }
580
581    fn tick(&mut self) -> Result<(), Error> {
582        self.groups_to_be_cleared.clear();
583        task::block_in_place(|| {
584            for (key, group) in &mut self.sinks {
585                group.collect_garbage();
586                if group.is_empty() {
587                    self.groups_to_be_cleared.push(key.clone());
588                }
589                group.revive_repeated()?;
590            }
591            for key in self.groups_to_be_cleared.drain(..) {
592                self.sinks.remove(&key);
593            }
594            Ok(())
595        })
596    }
597
598    fn with_sink_group<F, T>(&mut self, group: &str, consumer: F) -> T
599    where
600        F: FnOnce(&mut AudioSinkGroup, &mut dyn AudioDevice) -> T,
601    {
602        loop {
603            if let Some(group) = self.sinks.get_mut(group) {
604                break consumer(group, &mut self.device);
605            }
606            self.sinks
607                .insert(Cow::from(group.to_owned()), AudioSinkGroup::new());
608        }
609    }
610}
611
612impl Drop for Reactor {
613    fn drop(&mut self) {
614        if tokio::runtime::Handle::try_current().is_ok() {
615            task::block_in_place(|| self.clear_all());
616        } else {
617            self.clear_all();
618        }
619    }
620}
621
622#[cfg(test)]
623mod test {
624    use std::time::Duration;
625
626    use thedes_async_util::timer::Timer;
627    use tokio_util::sync::CancellationToken;
628
629    use crate::{
630        audio::{
631            Command,
632            Config,
633            OpenResources,
634            PlayOptions,
635            device::mock::AudioDeviceMock,
636        },
637        runtime::JoinSet,
638    };
639
640    #[tokio::test(flavor = "multi_thread")]
641    async fn play_does_play() {
642        let device_mock = AudioDeviceMock::new();
643        let device = device_mock.open();
644        let sink_mock = device_mock.register_sink();
645        sink_mock.enable_play_log();
646
647        let timer = Timer::new(Duration::from_millis(4));
648        let mut tick_session = timer.new_session();
649        let cancel_token = CancellationToken::new();
650        let mut join_set = JoinSet::new();
651
652        let mut handles = Config::new()
653            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
654        handles.controller.queue([Command::new_play_once("Music", &[1, 2, 3])]);
655        handles.controller.flush().unwrap();
656
657        tick_session.tick().await;
658        tick_session.tick().await;
659        tick_session.tick().await;
660
661        let log = sink_mock.take_play_log().unwrap();
662        assert_eq!(log.len(), 1);
663        assert_eq!(log[0], &[1_u8, 2, 3] as &[u8]);
664    }
665
666    #[tokio::test(flavor = "multi_thread")]
667    async fn set_volume_changes_volume() {
668        let device_mock = AudioDeviceMock::new();
669        let device = device_mock.open();
670        let sink_mock = device_mock.register_sink();
671        sink_mock.enable_set_volume_log();
672
673        let timer = Timer::new(Duration::from_millis(4));
674        let mut tick_session = timer.new_session();
675        let cancel_token = CancellationToken::new();
676        let mut join_set = JoinSet::new();
677
678        let mut handles = Config::new()
679            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
680        handles.controller.queue([
681            Command::new_play_once("Music", &[1, 2, 3]),
682            Command::new_set_volume("Music", 13),
683        ]);
684        handles.controller.flush().unwrap();
685
686        tick_session.tick().await;
687        tick_session.tick().await;
688        tick_session.tick().await;
689
690        let log = sink_mock.take_set_volume_log().unwrap();
691        assert_eq!(log[1], 13.0 / 255.0);
692    }
693
694    #[tokio::test(flavor = "multi_thread")]
695    async fn play_repeated_plays_repeatedly() {
696        let device_mock = AudioDeviceMock::new();
697        let device = device_mock.open();
698        let sink_mock = device_mock.register_sink();
699        sink_mock.enable_play_log();
700
701        let timer = Timer::new(Duration::from_millis(4));
702        let mut tick_session = timer.new_session();
703        let cancel_token = CancellationToken::new();
704        let mut join_set = JoinSet::new();
705
706        let mut handles = Config::new()
707            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
708        handles
709            .controller
710            .queue([Command::new_enter_repeated("Music", &[1, 2, 3])]);
711        handles.controller.flush().unwrap();
712
713        tick_session.tick().await;
714        tick_session.tick().await;
715        tick_session.tick().await;
716
717        sink_mock.force_pause();
718
719        tick_session.tick().await;
720        tick_session.tick().await;
721        tick_session.tick().await;
722
723        let log = sink_mock.take_play_log().unwrap();
724        assert_eq!(log.len(), 2);
725        for entry in log {
726            assert_eq!(entry, &[1_u8, 2, 3] as &[u8]);
727        }
728    }
729
730    #[tokio::test(flavor = "multi_thread")]
731    async fn pause_calls_pause() {
732        let device_mock = AudioDeviceMock::new();
733        let device = device_mock.open();
734        let sink_mock = device_mock.register_sink();
735
736        let timer = Timer::new(Duration::from_millis(4));
737        let mut tick_session = timer.new_session();
738        let cancel_token = CancellationToken::new();
739        let mut join_set = JoinSet::new();
740
741        let mut handles = Config::new()
742            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
743        handles.controller.queue([Command::new_play_once("Music", &[1, 2, 3])]);
744        handles.controller.flush().unwrap();
745
746        tick_session.tick().await;
747        tick_session.tick().await;
748        tick_session.tick().await;
749
750        handles.controller.queue([Command::new_pause("Music")]);
751        handles.controller.flush().unwrap();
752
753        tick_session.tick().await;
754        tick_session.tick().await;
755        tick_session.tick().await;
756
757        assert_eq!(sink_mock.is_playing().unwrap(), false);
758    }
759
760    #[tokio::test(flavor = "multi_thread")]
761    async fn resume_calls_pause() {
762        let device_mock = AudioDeviceMock::new();
763        let device = device_mock.open();
764        let sink_mock = device_mock.register_sink();
765
766        let timer = Timer::new(Duration::from_millis(4));
767        let mut tick_session = timer.new_session();
768        let cancel_token = CancellationToken::new();
769        let mut join_set = JoinSet::new();
770
771        let mut handles = Config::new()
772            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
773        handles.controller.queue([Command::new_play_once("Music", &[1, 2, 3])]);
774        handles.controller.flush().unwrap();
775
776        tick_session.tick().await;
777        tick_session.tick().await;
778        tick_session.tick().await;
779
780        handles.controller.queue([Command::new_pause("Music")]);
781        handles.controller.flush().unwrap();
782
783        tick_session.tick().await;
784        tick_session.tick().await;
785        tick_session.tick().await;
786
787        assert_eq!(sink_mock.is_playing().unwrap(), false);
788
789        handles.controller.queue([Command::new_resume("Music")]);
790        handles.controller.flush().unwrap();
791
792        tick_session.tick().await;
793        tick_session.tick().await;
794        tick_session.tick().await;
795
796        assert_eq!(sink_mock.is_playing().unwrap(), true);
797    }
798
799    #[tokio::test(flavor = "multi_thread")]
800    async fn clear_calls_clear() {
801        let device_mock = AudioDeviceMock::new();
802        let device = device_mock.open();
803        let sink_mock = device_mock.register_sink();
804
805        let timer = Timer::new(Duration::from_millis(4));
806        let mut tick_session = timer.new_session();
807        let cancel_token = CancellationToken::new();
808        let mut join_set = JoinSet::new();
809
810        let mut handles = Config::new()
811            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
812        handles.controller.queue([Command::new_play_once("Music", &[1, 2, 3])]);
813        handles.controller.flush().unwrap();
814
815        tick_session.tick().await;
816        tick_session.tick().await;
817        tick_session.tick().await;
818
819        handles.controller.queue([Command::new_clear("Music")]);
820        handles.controller.flush().unwrap();
821
822        tick_session.tick().await;
823        tick_session.tick().await;
824        tick_session.tick().await;
825
826        assert_eq!(sink_mock.is_playing().unwrap(), false);
827
828        handles.controller.queue([Command::new_resume("Music")]);
829
830        tick_session.tick().await;
831        tick_session.tick().await;
832        tick_session.tick().await;
833
834        assert_eq!(sink_mock.is_playing().unwrap(), false);
835    }
836
837    #[tokio::test(flavor = "multi_thread")]
838    async fn play_repeated_plays_repeatedly_on_correct_group() {
839        let device_mock = AudioDeviceMock::new();
840        let device = device_mock.open();
841        let music_sink_mock = device_mock.register_sink();
842        music_sink_mock.enable_play_log();
843        let fx_sink_mock = device_mock.register_sink();
844        fx_sink_mock.enable_play_log();
845
846        let timer = Timer::new(Duration::from_millis(4));
847        let mut tick_session = timer.new_session();
848        let cancel_token = CancellationToken::new();
849        let mut join_set = JoinSet::new();
850
851        let mut handles = Config::new()
852            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
853        handles.controller.queue([
854            Command::new_enter_repeated("Music", &[1, 2, 3]),
855            Command::new_play_once("FX", &[5, 2, 3]),
856        ]);
857        handles.controller.flush().unwrap();
858
859        tick_session.tick().await;
860        tick_session.tick().await;
861        tick_session.tick().await;
862
863        music_sink_mock.force_pause();
864        fx_sink_mock.force_pause();
865
866        tick_session.tick().await;
867        tick_session.tick().await;
868        tick_session.tick().await;
869
870        let music_log = music_sink_mock.take_play_log().unwrap();
871        assert_eq!(music_log.len(), 2);
872        for entry in music_log {
873            assert_eq!(entry, &[1_u8, 2, 3] as &[u8]);
874        }
875
876        let fx_log = fx_sink_mock.take_play_log().unwrap();
877        assert_eq!(fx_log.len(), 1);
878        for entry in fx_log {
879            assert_eq!(entry, &[5_u8, 2, 3] as &[u8]);
880        }
881    }
882
883    #[tokio::test(flavor = "multi_thread")]
884    async fn pause_calls_pause_on_correct_group() {
885        let device_mock = AudioDeviceMock::new();
886        let device = device_mock.open();
887        let music_sink_mock = device_mock.register_sink();
888        let fx_sink_mock = device_mock.register_sink();
889
890        let timer = Timer::new(Duration::from_millis(4));
891        let mut tick_session = timer.new_session();
892        let cancel_token = CancellationToken::new();
893        let mut join_set = JoinSet::new();
894
895        let mut handles = Config::new()
896            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
897        handles.controller.queue([
898            Command::new_play_once("Music", &[1, 2, 3]),
899            Command::new_play_once("FX", &[4, 5, 6]),
900        ]);
901        handles.controller.flush().unwrap();
902
903        tick_session.tick().await;
904        tick_session.tick().await;
905        tick_session.tick().await;
906
907        handles.controller.queue([Command::new_pause("Music")]);
908        handles.controller.flush().unwrap();
909
910        tick_session.tick().await;
911        tick_session.tick().await;
912        tick_session.tick().await;
913
914        assert_eq!(music_sink_mock.is_playing().unwrap(), false);
915        assert_eq!(fx_sink_mock.is_playing().unwrap(), true);
916    }
917
918    #[tokio::test(flavor = "multi_thread")]
919    async fn resume_calls_resume_on_correct_group() {
920        let device_mock = AudioDeviceMock::new();
921        let device = device_mock.open();
922        let music_sink_mock = device_mock.register_sink();
923        let fx_sink_mock = device_mock.register_sink();
924
925        let timer = Timer::new(Duration::from_millis(4));
926        let mut tick_session = timer.new_session();
927        let cancel_token = CancellationToken::new();
928        let mut join_set = JoinSet::new();
929
930        let mut handles = Config::new()
931            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
932        handles.controller.queue([
933            Command::new_play_once("Music", &[1, 2, 3]),
934            Command::new_play_once("FX", &[4, 5, 6]),
935        ]);
936        handles.controller.flush().unwrap();
937
938        tick_session.tick().await;
939        tick_session.tick().await;
940        tick_session.tick().await;
941
942        handles
943            .controller
944            .queue([Command::new_pause("Music"), Command::new_pause("FX")]);
945        handles.controller.flush().unwrap();
946
947        tick_session.tick().await;
948        tick_session.tick().await;
949        tick_session.tick().await;
950
951        handles.controller.queue([Command::new_resume("Music")]);
952        handles.controller.flush().unwrap();
953
954        tick_session.tick().await;
955        tick_session.tick().await;
956        tick_session.tick().await;
957
958        assert_eq!(music_sink_mock.is_playing().unwrap(), true);
959        assert_eq!(fx_sink_mock.is_playing().unwrap(), false);
960    }
961
962    #[tokio::test(flavor = "multi_thread")]
963    async fn clear_calls_clear_on_correct_group() {
964        let device_mock = AudioDeviceMock::new();
965        let device = device_mock.open();
966        let music_sink_mock = device_mock.register_sink();
967        let fx_sink_mock = device_mock.register_sink();
968
969        let timer = Timer::new(Duration::from_millis(4));
970        let mut tick_session = timer.new_session();
971        let cancel_token = CancellationToken::new();
972        let mut join_set = JoinSet::new();
973
974        let mut handles = Config::new()
975            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
976        handles.controller.queue([
977            Command::new_play_once("Music", &[1, 2, 3]),
978            Command::new_play_once("FX", &[4, 5, 6]),
979        ]);
980        handles.controller.flush().unwrap();
981
982        tick_session.tick().await;
983        tick_session.tick().await;
984        tick_session.tick().await;
985
986        handles.controller.queue([Command::new_clear("Music")]);
987        handles.controller.flush().unwrap();
988
989        tick_session.tick().await;
990        tick_session.tick().await;
991        tick_session.tick().await;
992
993        assert_eq!(music_sink_mock.is_playing().unwrap(), false);
994        assert_eq!(fx_sink_mock.is_playing().unwrap(), true);
995    }
996
997    #[tokio::test(flavor = "multi_thread")]
998    async fn play_with_relative_volume() {
999        let device_mock = AudioDeviceMock::new();
1000        let device = device_mock.open();
1001        let sink_mock = device_mock.register_sink();
1002        sink_mock.enable_set_volume_log();
1003
1004        let timer = Timer::new(Duration::from_millis(4));
1005        let mut tick_session = timer.new_session();
1006        let cancel_token = CancellationToken::new();
1007        let mut join_set = JoinSet::new();
1008
1009        let mut handles = Config::new()
1010            .open(OpenResources { device, timer, cancel_token }, &mut join_set);
1011        handles.controller.queue([Command::new_play_once_with(
1012            "Music",
1013            &[1, 2, 3],
1014            PlayOptions { relative_volume: 127 },
1015        )]);
1016        handles.controller.flush().unwrap();
1017
1018        tick_session.tick().await;
1019        tick_session.tick().await;
1020        tick_session.tick().await;
1021
1022        let log = sink_mock.take_set_volume_log().unwrap();
1023        assert_eq!(log.len(), 1);
1024        assert!(log[0] >= 0.24, "{}", log[0]);
1025        assert!(log[0] <= 0.26, "{}", log[0]);
1026    }
1027}