1use std::{borrow::Cow, collections::HashMap, mem};
2
3use thedes_async_util::{
4 non_blocking,
5 timer::{TickSession, Timer},
6};
7use thiserror::Error;
8use tokio::task;
9use tokio_util::sync::CancellationToken;
10
11use crate::{
12 audio::device::{AudioDevice, AudioSinkDevice},
13 runtime,
14};
15
16pub mod device;
17
18pub type Volume = u8;
19
20#[derive(Debug, Error)]
21pub enum PlayNowError {
22 #[error("Device failed to play")]
23 DevicePlayNow(#[from] device::PlayNowError),
24 #[error("Device failed to open sink")]
25 DeviceOpenSink(#[from] device::OpenSinkError),
26 #[error("Device failed to set volume")]
27 DeviceSetVolume(#[from] device::SetVolumeError),
28}
29
30#[derive(Debug, Error)]
31pub enum Error {
32 #[error("Device failed to play")]
33 DevicePlayNow(#[from] device::PlayNowError),
34 #[error("Device failed to set volume")]
35 DeviceSetVolume(#[from] device::SetVolumeError),
36 #[error("Device failed to pause a sink")]
37 DevicePause(#[from] device::PauseSinkError),
38 #[error("Device failed to resume a sink")]
39 DeviceResume(#[from] device::ResumeSinkError),
40 #[error("Device failed to clear a sink")]
41 DeviceClear(#[from] device::ClearSinkError),
42 #[error("Device failed to check play status")]
43 DeviceIsPlaying(#[from] device::CheckPlayStatusError),
44 #[error("Device failed to open sink")]
45 DeviceOpenSink(#[from] device::OpenSinkError),
46}
47
48#[derive(Debug, Error)]
49#[error(transparent)]
50pub struct FlushError {
51 inner: non_blocking::spsc::unbounded::SendError<Vec<Command>>,
52}
53
54impl FlushError {
55 fn new(
56 inner: non_blocking::spsc::unbounded::SendError<Vec<Command>>,
57 ) -> Self {
58 Self { inner }
59 }
60
61 pub fn into_bounced_commands(self) -> Vec<Command> {
62 self.inner.into_message()
63 }
64}
65
66#[derive(Debug)]
67pub enum Command {
68 EnterRepeated(Cow<'static, str>, Cow<'static, [u8]>, PlayOptions),
69 LeaveRepeated(Cow<'static, str>),
70 PlayOnce(Cow<'static, str>, Cow<'static, [u8]>, PlayOptions),
71 Pause(Cow<'static, str>),
72 Resume(Cow<'static, str>),
73 Clear(Cow<'static, str>),
74 SetVolume(Cow<'static, str>, u8),
75}
76
77impl Command {
78 pub fn new_enter_repeated(
79 name: impl Into<Cow<'static, str>>,
80 bytes: impl Into<Cow<'static, [u8]>>,
81 ) -> Self {
82 Self::new_enter_repeated_with(name, bytes, PlayOptions::default())
83 }
84
85 pub fn new_enter_repeated_with(
86 name: impl Into<Cow<'static, str>>,
87 bytes: impl Into<Cow<'static, [u8]>>,
88 options: PlayOptions,
89 ) -> Self {
90 Self::EnterRepeated(name.into(), bytes.into(), options)
91 }
92
93 pub fn new_leave_repeated(name: impl Into<Cow<'static, str>>) -> Self {
94 Self::LeaveRepeated(name.into())
95 }
96
97 pub fn new_play_once(
98 name: impl Into<Cow<'static, str>>,
99 bytes: impl Into<Cow<'static, [u8]>>,
100 ) -> Self {
101 Self::new_play_once_with(name, bytes, PlayOptions::default())
102 }
103
104 pub fn new_play_once_with(
105 name: impl Into<Cow<'static, str>>,
106 bytes: impl Into<Cow<'static, [u8]>>,
107 options: PlayOptions,
108 ) -> Self {
109 Self::PlayOnce(name.into(), bytes.into(), options)
110 }
111
112 pub fn new_pause(name: impl Into<Cow<'static, str>>) -> Self {
113 Self::Pause(name.into())
114 }
115
116 pub fn new_resume(name: impl Into<Cow<'static, str>>) -> Self {
117 Self::Resume(name.into())
118 }
119
120 pub fn new_clear(name: impl Into<Cow<'static, str>>) -> Self {
121 Self::Clear(name.into())
122 }
123
124 pub fn new_set_volume(
125 name: impl Into<Cow<'static, str>>,
126 level: u8,
127 ) -> Self {
128 Self::SetVolume(name.into(), level)
129 }
130}
131
132#[derive(Debug)]
133pub(crate) struct OpenResources {
134 pub device: Box<dyn AudioDevice>,
135 pub timer: Timer,
136 pub cancel_token: CancellationToken,
137}
138
139#[derive(Debug)]
140pub(crate) struct AudioHandles {
141 pub controller: AudioControllerHandle,
142}
143
144#[derive(Debug)]
145pub struct Config {
146 _private: (),
147}
148
149impl Config {
150 pub fn new() -> Self {
151 Self { _private: () }
152 }
153
154 pub(crate) fn open(
155 self,
156 resources: OpenResources,
157 join_set: &mut runtime::JoinSet,
158 ) -> AudioHandles {
159 let (command_sender, command_receiver) =
160 non_blocking::spsc::unbounded::channel();
161
162 join_set.spawn(async move {
163 let mut reactor = Reactor::new(resources, command_receiver);
164 reactor.run().await
165 });
166
167 AudioHandles { controller: AudioControllerHandle::new(command_sender) }
168 }
169}
170
171#[derive(Debug)]
172pub struct AudioControllerHandle {
173 command_sender: non_blocking::spsc::unbounded::Sender<Vec<Command>>,
174 command_queue: Vec<Command>,
175}
176
177impl AudioControllerHandle {
178 fn new(
179 command_sender: non_blocking::spsc::unbounded::Sender<Vec<Command>>,
180 ) -> Self {
181 Self { command_sender, command_queue: Vec::new() }
182 }
183
184 pub fn is_connected(&self) -> bool {
185 self.command_sender.is_connected()
186 }
187
188 pub fn queue<I>(&mut self, commands: I)
189 where
190 I: IntoIterator<Item = Command>,
191 {
192 self.command_queue.extend(commands);
193 }
194
195 pub fn flush(&mut self) -> Result<(), FlushError> {
196 let commands = mem::take(&mut self.command_queue);
197 self.command_sender.send(commands).map_err(FlushError::new)
198 }
199}
200
201#[derive(Debug, Clone)]
202pub struct PlayOptions {
203 pub relative_volume: Volume,
204}
205
206impl Default for PlayOptions {
207 fn default() -> Self {
208 Self { relative_volume: Volume::MAX }
209 }
210}
211
212impl PlayOptions {
213 pub fn apply_playing(
214 &self,
215 sink: &mut dyn AudioSinkDevice,
216 group_float_volume: f32,
217 ) -> Result<(), Error> {
218 let max = self.relative_volume as f32 / Volume::MAX as f32;
219 let compressed_volume = group_float_volume * max;
220 sink.set_volume(compressed_volume)?;
221 Ok(())
222 }
223}
224
225#[derive(Debug, Clone)]
226struct RepeatedSinkFrame {
227 bytes: Cow<'static, [u8]>,
228 options: PlayOptions,
229}
230
231#[derive(Debug)]
232struct RepeatedSink {
233 inner: Box<dyn AudioSinkDevice>,
234 curr: RepeatedSinkFrame,
235 prev: Vec<RepeatedSinkFrame>,
236}
237
238impl RepeatedSink {
239 pub fn play(&mut self, group_float_volume: f32) -> Result<(), Error> {
240 self.apply_options_playing(group_float_volume)?;
241 self.inner.play_now(self.curr.bytes.clone())?;
242 Ok(())
243 }
244
245 pub fn apply_options_playing(
246 &mut self,
247 group_float_volume: f32,
248 ) -> Result<(), Error> {
249 self.curr.options.apply_playing(&mut *self.inner, group_float_volume)
250 }
251}
252
253#[derive(Debug)]
254struct OnceSink {
255 inner: Box<dyn AudioSinkDevice>,
256 options: PlayOptions,
257}
258
259#[derive(Debug)]
260struct AudioSinkGroup {
261 once_sinks: Vec<Option<Box<OnceSink>>>,
262 repeated_sink: Option<Box<RepeatedSink>>,
263 paused: bool,
264 volume: Volume,
265}
266
267impl AudioSinkGroup {
268 fn new() -> Self {
269 Self {
270 once_sinks: Vec::new(),
271 paused: false,
272 repeated_sink: None,
273 volume: 127,
274 }
275 }
276
277 fn float_volume(&self) -> f32 {
278 f32::from(self.volume) / 255.0
279 }
280
281 fn set_volume(&mut self, volume: Volume) -> Result<(), Error> {
282 self.volume = volume;
283 let volume = self.float_volume();
284 if let Some(sink) = &mut self.repeated_sink {
285 sink.apply_options_playing(volume)?;
286 }
287 let once_sinks =
288 self.once_sinks.iter_mut().filter_map(|sink| sink.as_mut());
289 for sink in once_sinks {
290 sink.options.apply_playing(&mut *sink.inner, volume)?;
291 }
292 Ok(())
293 }
294
295 fn enter_repeated(
296 &mut self,
297 device: &mut dyn AudioDevice,
298 bytes: Cow<'static, [u8]>,
299 options: PlayOptions,
300 ) -> Result<(), Error> {
301 let frame = RepeatedSinkFrame { options, bytes };
302 match self.repeated_sink.take() {
303 Some(mut sink) => {
304 let prev = mem::replace(&mut sink.curr, frame);
305 sink.prev.push(prev);
306 sink.inner.clear()?;
307 sink.play(self.float_volume())?;
308 self.repeated_sink = Some(sink);
309 },
310 None => {
311 let inner = device.open_sink()?;
312 let mut sink = Box::new(RepeatedSink {
313 curr: frame,
314 prev: Vec::new(),
315 inner,
316 });
317 sink.play(self.float_volume())?;
318 self.repeated_sink = Some(sink);
319 },
320 };
321 Ok(())
322 }
323
324 pub fn leave_repeated(&mut self) -> Result<(), Error> {
325 if let Some(mut sink) = self.repeated_sink.take() {
326 if let Some(prev) = sink.prev.pop() {
327 sink.curr = prev;
328 sink.play(self.float_volume())?;
329 self.repeated_sink = Some(sink);
330 }
331 }
332 Ok(())
333 }
334
335 fn play_once(
336 &mut self,
337 device: &mut dyn AudioDevice,
338 bytes: Cow<'static, [u8]>,
339 options: PlayOptions,
340 ) -> Result<(), Error> {
341 let mut inner = device.open_sink()?;
342 inner.play_now(bytes.clone())?;
343 options.apply_playing(&mut inner, self.float_volume())?;
344 let sink = OnceSink { inner, options };
345 if let Some(entry) = self
346 .once_sinks
347 .iter_mut()
348 .find(|maybe_device| maybe_device.is_none())
349 {
350 *entry = Some(Box::new(sink));
351 } else {
352 self.once_sinks.push(Some(Box::new(sink)));
353 }
354 if self.paused {
355 self.resume()?;
356 }
357 Ok(())
358 }
359
360 fn pause(&mut self) -> Result<(), Error> {
361 self.paused = true;
362 if let Some(sink) = &mut self.repeated_sink {
363 sink.inner.pause()?;
364 }
365 for maybe_sink in &mut self.once_sinks {
366 if let Some(sink) = maybe_sink {
367 sink.inner.pause()?;
368 }
369 }
370 Ok(())
371 }
372
373 fn resume(&mut self) -> Result<(), Error> {
374 self.paused = false;
375 let float_volume = self.float_volume();
376 if let Some(sink) = &mut self.repeated_sink {
377 sink.inner.resume()?;
378 sink.apply_options_playing(float_volume)?;
379 }
380 for maybe_sink in &mut self.once_sinks {
381 if let Some(sink) = maybe_sink {
382 sink.inner.resume()?;
383 }
384 }
385 Ok(())
386 }
387
388 fn clear(&mut self) -> Result<(), Error> {
389 self.paused = true;
390 if let Some(mut sink) = self.repeated_sink.take() {
391 sink.inner.pause()?;
392 }
393 for maybe_sink in self.once_sinks.drain(..) {
394 if let Some(mut sink) = maybe_sink {
395 sink.inner.pause()?;
396 }
397 }
398 Ok(())
399 }
400
401 fn collect_garbage(&mut self) {
402 if self.paused {
403 return;
404 }
405
406 for maybe_sink in &mut self.once_sinks {
407 if !maybe_sink
408 .as_ref()
409 .is_some_and(|sink| sink.inner.is_playing().is_ok_and(|is| is))
410 {
411 *maybe_sink = None;
412 }
413 }
414 if let Some(last_some) =
415 self.once_sinks.iter().rposition(Option::is_some)
416 {
417 self.once_sinks.drain(last_some + 1 ..);
418 }
419 }
420
421 fn revive_repeated(&mut self) -> Result<(), Error> {
422 let float_volume = self.float_volume();
423 if let Some(sink) = &mut self.repeated_sink {
424 if !sink.inner.is_playing()? {
425 sink.play(float_volume)?;
426 }
427 }
428 Ok(())
429 }
430
431 fn is_empty(&self) -> bool {
432 self.once_sinks.is_empty() && self.repeated_sink.is_none()
433 }
434}
435
436#[derive(Debug)]
437struct Reactor {
438 device: Box<dyn AudioDevice>,
439 cancel_token: CancellationToken,
440 ticker: TickSession,
441 command_receiver: non_blocking::spsc::unbounded::Receiver<Vec<Command>>,
442 sinks: HashMap<Cow<'static, str>, AudioSinkGroup>,
443 groups_to_be_cleared: Vec<Cow<'static, str>>,
444}
445
446impl Reactor {
447 pub fn new(
448 resources: OpenResources,
449 command_receiver: non_blocking::spsc::unbounded::Receiver<Vec<Command>>,
450 ) -> Self {
451 Self {
452 device: resources.device,
453 cancel_token: resources.cancel_token,
454 ticker: resources.timer.new_session(),
455 command_receiver,
456 sinks: HashMap::new(),
457 groups_to_be_cleared: Vec::new(),
458 }
459 }
460
461 pub async fn run(&mut self) -> Result<(), runtime::Error> {
462 let mut commands = Vec::<Command>::new();
463
464 loop {
465 if !self.execute_commands_sent(&mut commands)? {
466 break;
467 }
468
469 tokio::select! {
470 _ = self.ticker.tick() => (),
471 _ = self.cancel_token.cancelled() => {
472 tracing::info!("Audio reactor token cancellation detected");
473 break
474 },
475 }
476
477 self.tick()?;
478
479 tokio::select! {
480 _ = self.ticker.tick() => (),
481 _ = self.cancel_token.cancelled() => {
482 tracing::info!("Audio reactor token cancellation detected");
483 break
484 },
485 }
486 }
487
488 Ok(())
489 }
490
491 fn execute_commands_sent(
492 &mut self,
493 buf: &mut Vec<Command>,
494 ) -> Result<bool, Error> {
495 let Ok(command_iterator) = self.command_receiver.recv_many() else {
496 tracing::info!("Audio reactor command sender disconnected");
497 return Ok(false);
498 };
499 buf.extend(command_iterator.flatten());
500 for command in buf.drain(..) {
501 self.execute_command(command)?;
502 }
503 Ok(true)
504 }
505
506 fn execute_command(&mut self, command: Command) -> Result<(), Error> {
507 task::block_in_place(|| match command {
508 Command::EnterRepeated(name, bytes, options) => {
509 self.enter_repeated(name, bytes, options)
510 },
511 Command::LeaveRepeated(name) => self.leave_repeated(name),
512 Command::PlayOnce(group, bytes, options) => {
513 self.play_once(group, bytes, options)
514 },
515 Command::Pause(group) => self.pause(group),
516 Command::Resume(group) => self.resume(group),
517 Command::Clear(group) => self.clear(group),
518 Command::SetVolume(group, level) => self.set_volume(group, level),
519 })
520 }
521
522 fn enter_repeated(
523 &mut self,
524 group: Cow<'static, str>,
525 bytes: Cow<'static, [u8]>,
526 options: PlayOptions,
527 ) -> Result<(), Error> {
528 self.with_sink_group(&group[..], move |group, device| {
529 group.enter_repeated(device, bytes, options)
530 })
531 }
532
533 fn leave_repeated(
534 &mut self,
535 group: Cow<'static, str>,
536 ) -> Result<(), Error> {
537 self.with_sink_group(&group[..], move |group, _device| {
538 group.leave_repeated()
539 })
540 }
541
542 fn play_once(
543 &mut self,
544 group: Cow<'static, str>,
545 bytes: Cow<'static, [u8]>,
546 options: PlayOptions,
547 ) -> Result<(), Error> {
548 self.with_sink_group(&group[..], move |group, device| {
549 group.play_once(device, bytes, options)
550 })
551 }
552
553 fn set_volume(
554 &mut self,
555 group: Cow<'static, str>,
556 volume: u8,
557 ) -> Result<(), Error> {
558 self.with_sink_group(&group, move |group, _device| {
559 group.set_volume(volume)
560 })
561 }
562
563 fn pause(&mut self, group: Cow<'static, str>) -> Result<(), Error> {
564 self.with_sink_group(&group[..], move |group, _device| group.pause())
565 }
566
567 fn resume(&mut self, group: Cow<'static, str>) -> Result<(), Error> {
568 self.with_sink_group(&group[..], move |group, _device| group.resume())
569 }
570
571 fn clear(&mut self, group: Cow<'static, str>) -> Result<(), Error> {
572 self.with_sink_group(&group[..], move |group, _device| group.clear())
573 }
574
575 fn clear_all(&mut self) {
576 for group in self.sinks.values_mut() {
577 _ = group.clear();
578 }
579 }
580
581 fn tick(&mut self) -> Result<(), Error> {
582 self.groups_to_be_cleared.clear();
583 task::block_in_place(|| {
584 for (key, group) in &mut self.sinks {
585 group.collect_garbage();
586 if group.is_empty() {
587 self.groups_to_be_cleared.push(key.clone());
588 }
589 group.revive_repeated()?;
590 }
591 for key in self.groups_to_be_cleared.drain(..) {
592 self.sinks.remove(&key);
593 }
594 Ok(())
595 })
596 }
597
598 fn with_sink_group<F, T>(&mut self, group: &str, consumer: F) -> T
599 where
600 F: FnOnce(&mut AudioSinkGroup, &mut dyn AudioDevice) -> T,
601 {
602 loop {
603 if let Some(group) = self.sinks.get_mut(group) {
604 break consumer(group, &mut self.device);
605 }
606 self.sinks
607 .insert(Cow::from(group.to_owned()), AudioSinkGroup::new());
608 }
609 }
610}
611
612impl Drop for Reactor {
613 fn drop(&mut self) {
614 if tokio::runtime::Handle::try_current().is_ok() {
615 task::block_in_place(|| self.clear_all());
616 } else {
617 self.clear_all();
618 }
619 }
620}
621
622#[cfg(test)]
623mod test {
624 use std::time::Duration;
625
626 use thedes_async_util::timer::Timer;
627 use tokio_util::sync::CancellationToken;
628
629 use crate::{
630 audio::{
631 Command,
632 Config,
633 OpenResources,
634 PlayOptions,
635 device::mock::AudioDeviceMock,
636 },
637 runtime::JoinSet,
638 };
639
640 #[tokio::test(flavor = "multi_thread")]
641 async fn play_does_play() {
642 let device_mock = AudioDeviceMock::new();
643 let device = device_mock.open();
644 let sink_mock = device_mock.register_sink();
645 sink_mock.enable_play_log();
646
647 let timer = Timer::new(Duration::from_millis(4));
648 let mut tick_session = timer.new_session();
649 let cancel_token = CancellationToken::new();
650 let mut join_set = JoinSet::new();
651
652 let mut handles = Config::new()
653 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
654 handles.controller.queue([Command::new_play_once("Music", &[1, 2, 3])]);
655 handles.controller.flush().unwrap();
656
657 tick_session.tick().await;
658 tick_session.tick().await;
659 tick_session.tick().await;
660
661 let log = sink_mock.take_play_log().unwrap();
662 assert_eq!(log.len(), 1);
663 assert_eq!(log[0], &[1_u8, 2, 3] as &[u8]);
664 }
665
666 #[tokio::test(flavor = "multi_thread")]
667 async fn set_volume_changes_volume() {
668 let device_mock = AudioDeviceMock::new();
669 let device = device_mock.open();
670 let sink_mock = device_mock.register_sink();
671 sink_mock.enable_set_volume_log();
672
673 let timer = Timer::new(Duration::from_millis(4));
674 let mut tick_session = timer.new_session();
675 let cancel_token = CancellationToken::new();
676 let mut join_set = JoinSet::new();
677
678 let mut handles = Config::new()
679 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
680 handles.controller.queue([
681 Command::new_play_once("Music", &[1, 2, 3]),
682 Command::new_set_volume("Music", 13),
683 ]);
684 handles.controller.flush().unwrap();
685
686 tick_session.tick().await;
687 tick_session.tick().await;
688 tick_session.tick().await;
689
690 let log = sink_mock.take_set_volume_log().unwrap();
691 assert_eq!(log[1], 13.0 / 255.0);
692 }
693
694 #[tokio::test(flavor = "multi_thread")]
695 async fn play_repeated_plays_repeatedly() {
696 let device_mock = AudioDeviceMock::new();
697 let device = device_mock.open();
698 let sink_mock = device_mock.register_sink();
699 sink_mock.enable_play_log();
700
701 let timer = Timer::new(Duration::from_millis(4));
702 let mut tick_session = timer.new_session();
703 let cancel_token = CancellationToken::new();
704 let mut join_set = JoinSet::new();
705
706 let mut handles = Config::new()
707 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
708 handles
709 .controller
710 .queue([Command::new_enter_repeated("Music", &[1, 2, 3])]);
711 handles.controller.flush().unwrap();
712
713 tick_session.tick().await;
714 tick_session.tick().await;
715 tick_session.tick().await;
716
717 sink_mock.force_pause();
718
719 tick_session.tick().await;
720 tick_session.tick().await;
721 tick_session.tick().await;
722
723 let log = sink_mock.take_play_log().unwrap();
724 assert_eq!(log.len(), 2);
725 for entry in log {
726 assert_eq!(entry, &[1_u8, 2, 3] as &[u8]);
727 }
728 }
729
730 #[tokio::test(flavor = "multi_thread")]
731 async fn pause_calls_pause() {
732 let device_mock = AudioDeviceMock::new();
733 let device = device_mock.open();
734 let sink_mock = device_mock.register_sink();
735
736 let timer = Timer::new(Duration::from_millis(4));
737 let mut tick_session = timer.new_session();
738 let cancel_token = CancellationToken::new();
739 let mut join_set = JoinSet::new();
740
741 let mut handles = Config::new()
742 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
743 handles.controller.queue([Command::new_play_once("Music", &[1, 2, 3])]);
744 handles.controller.flush().unwrap();
745
746 tick_session.tick().await;
747 tick_session.tick().await;
748 tick_session.tick().await;
749
750 handles.controller.queue([Command::new_pause("Music")]);
751 handles.controller.flush().unwrap();
752
753 tick_session.tick().await;
754 tick_session.tick().await;
755 tick_session.tick().await;
756
757 assert_eq!(sink_mock.is_playing().unwrap(), false);
758 }
759
760 #[tokio::test(flavor = "multi_thread")]
761 async fn resume_calls_pause() {
762 let device_mock = AudioDeviceMock::new();
763 let device = device_mock.open();
764 let sink_mock = device_mock.register_sink();
765
766 let timer = Timer::new(Duration::from_millis(4));
767 let mut tick_session = timer.new_session();
768 let cancel_token = CancellationToken::new();
769 let mut join_set = JoinSet::new();
770
771 let mut handles = Config::new()
772 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
773 handles.controller.queue([Command::new_play_once("Music", &[1, 2, 3])]);
774 handles.controller.flush().unwrap();
775
776 tick_session.tick().await;
777 tick_session.tick().await;
778 tick_session.tick().await;
779
780 handles.controller.queue([Command::new_pause("Music")]);
781 handles.controller.flush().unwrap();
782
783 tick_session.tick().await;
784 tick_session.tick().await;
785 tick_session.tick().await;
786
787 assert_eq!(sink_mock.is_playing().unwrap(), false);
788
789 handles.controller.queue([Command::new_resume("Music")]);
790 handles.controller.flush().unwrap();
791
792 tick_session.tick().await;
793 tick_session.tick().await;
794 tick_session.tick().await;
795
796 assert_eq!(sink_mock.is_playing().unwrap(), true);
797 }
798
799 #[tokio::test(flavor = "multi_thread")]
800 async fn clear_calls_clear() {
801 let device_mock = AudioDeviceMock::new();
802 let device = device_mock.open();
803 let sink_mock = device_mock.register_sink();
804
805 let timer = Timer::new(Duration::from_millis(4));
806 let mut tick_session = timer.new_session();
807 let cancel_token = CancellationToken::new();
808 let mut join_set = JoinSet::new();
809
810 let mut handles = Config::new()
811 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
812 handles.controller.queue([Command::new_play_once("Music", &[1, 2, 3])]);
813 handles.controller.flush().unwrap();
814
815 tick_session.tick().await;
816 tick_session.tick().await;
817 tick_session.tick().await;
818
819 handles.controller.queue([Command::new_clear("Music")]);
820 handles.controller.flush().unwrap();
821
822 tick_session.tick().await;
823 tick_session.tick().await;
824 tick_session.tick().await;
825
826 assert_eq!(sink_mock.is_playing().unwrap(), false);
827
828 handles.controller.queue([Command::new_resume("Music")]);
829
830 tick_session.tick().await;
831 tick_session.tick().await;
832 tick_session.tick().await;
833
834 assert_eq!(sink_mock.is_playing().unwrap(), false);
835 }
836
837 #[tokio::test(flavor = "multi_thread")]
838 async fn play_repeated_plays_repeatedly_on_correct_group() {
839 let device_mock = AudioDeviceMock::new();
840 let device = device_mock.open();
841 let music_sink_mock = device_mock.register_sink();
842 music_sink_mock.enable_play_log();
843 let fx_sink_mock = device_mock.register_sink();
844 fx_sink_mock.enable_play_log();
845
846 let timer = Timer::new(Duration::from_millis(4));
847 let mut tick_session = timer.new_session();
848 let cancel_token = CancellationToken::new();
849 let mut join_set = JoinSet::new();
850
851 let mut handles = Config::new()
852 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
853 handles.controller.queue([
854 Command::new_enter_repeated("Music", &[1, 2, 3]),
855 Command::new_play_once("FX", &[5, 2, 3]),
856 ]);
857 handles.controller.flush().unwrap();
858
859 tick_session.tick().await;
860 tick_session.tick().await;
861 tick_session.tick().await;
862
863 music_sink_mock.force_pause();
864 fx_sink_mock.force_pause();
865
866 tick_session.tick().await;
867 tick_session.tick().await;
868 tick_session.tick().await;
869
870 let music_log = music_sink_mock.take_play_log().unwrap();
871 assert_eq!(music_log.len(), 2);
872 for entry in music_log {
873 assert_eq!(entry, &[1_u8, 2, 3] as &[u8]);
874 }
875
876 let fx_log = fx_sink_mock.take_play_log().unwrap();
877 assert_eq!(fx_log.len(), 1);
878 for entry in fx_log {
879 assert_eq!(entry, &[5_u8, 2, 3] as &[u8]);
880 }
881 }
882
883 #[tokio::test(flavor = "multi_thread")]
884 async fn pause_calls_pause_on_correct_group() {
885 let device_mock = AudioDeviceMock::new();
886 let device = device_mock.open();
887 let music_sink_mock = device_mock.register_sink();
888 let fx_sink_mock = device_mock.register_sink();
889
890 let timer = Timer::new(Duration::from_millis(4));
891 let mut tick_session = timer.new_session();
892 let cancel_token = CancellationToken::new();
893 let mut join_set = JoinSet::new();
894
895 let mut handles = Config::new()
896 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
897 handles.controller.queue([
898 Command::new_play_once("Music", &[1, 2, 3]),
899 Command::new_play_once("FX", &[4, 5, 6]),
900 ]);
901 handles.controller.flush().unwrap();
902
903 tick_session.tick().await;
904 tick_session.tick().await;
905 tick_session.tick().await;
906
907 handles.controller.queue([Command::new_pause("Music")]);
908 handles.controller.flush().unwrap();
909
910 tick_session.tick().await;
911 tick_session.tick().await;
912 tick_session.tick().await;
913
914 assert_eq!(music_sink_mock.is_playing().unwrap(), false);
915 assert_eq!(fx_sink_mock.is_playing().unwrap(), true);
916 }
917
918 #[tokio::test(flavor = "multi_thread")]
919 async fn resume_calls_resume_on_correct_group() {
920 let device_mock = AudioDeviceMock::new();
921 let device = device_mock.open();
922 let music_sink_mock = device_mock.register_sink();
923 let fx_sink_mock = device_mock.register_sink();
924
925 let timer = Timer::new(Duration::from_millis(4));
926 let mut tick_session = timer.new_session();
927 let cancel_token = CancellationToken::new();
928 let mut join_set = JoinSet::new();
929
930 let mut handles = Config::new()
931 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
932 handles.controller.queue([
933 Command::new_play_once("Music", &[1, 2, 3]),
934 Command::new_play_once("FX", &[4, 5, 6]),
935 ]);
936 handles.controller.flush().unwrap();
937
938 tick_session.tick().await;
939 tick_session.tick().await;
940 tick_session.tick().await;
941
942 handles
943 .controller
944 .queue([Command::new_pause("Music"), Command::new_pause("FX")]);
945 handles.controller.flush().unwrap();
946
947 tick_session.tick().await;
948 tick_session.tick().await;
949 tick_session.tick().await;
950
951 handles.controller.queue([Command::new_resume("Music")]);
952 handles.controller.flush().unwrap();
953
954 tick_session.tick().await;
955 tick_session.tick().await;
956 tick_session.tick().await;
957
958 assert_eq!(music_sink_mock.is_playing().unwrap(), true);
959 assert_eq!(fx_sink_mock.is_playing().unwrap(), false);
960 }
961
962 #[tokio::test(flavor = "multi_thread")]
963 async fn clear_calls_clear_on_correct_group() {
964 let device_mock = AudioDeviceMock::new();
965 let device = device_mock.open();
966 let music_sink_mock = device_mock.register_sink();
967 let fx_sink_mock = device_mock.register_sink();
968
969 let timer = Timer::new(Duration::from_millis(4));
970 let mut tick_session = timer.new_session();
971 let cancel_token = CancellationToken::new();
972 let mut join_set = JoinSet::new();
973
974 let mut handles = Config::new()
975 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
976 handles.controller.queue([
977 Command::new_play_once("Music", &[1, 2, 3]),
978 Command::new_play_once("FX", &[4, 5, 6]),
979 ]);
980 handles.controller.flush().unwrap();
981
982 tick_session.tick().await;
983 tick_session.tick().await;
984 tick_session.tick().await;
985
986 handles.controller.queue([Command::new_clear("Music")]);
987 handles.controller.flush().unwrap();
988
989 tick_session.tick().await;
990 tick_session.tick().await;
991 tick_session.tick().await;
992
993 assert_eq!(music_sink_mock.is_playing().unwrap(), false);
994 assert_eq!(fx_sink_mock.is_playing().unwrap(), true);
995 }
996
997 #[tokio::test(flavor = "multi_thread")]
998 async fn play_with_relative_volume() {
999 let device_mock = AudioDeviceMock::new();
1000 let device = device_mock.open();
1001 let sink_mock = device_mock.register_sink();
1002 sink_mock.enable_set_volume_log();
1003
1004 let timer = Timer::new(Duration::from_millis(4));
1005 let mut tick_session = timer.new_session();
1006 let cancel_token = CancellationToken::new();
1007 let mut join_set = JoinSet::new();
1008
1009 let mut handles = Config::new()
1010 .open(OpenResources { device, timer, cancel_token }, &mut join_set);
1011 handles.controller.queue([Command::new_play_once_with(
1012 "Music",
1013 &[1, 2, 3],
1014 PlayOptions { relative_volume: 127 },
1015 )]);
1016 handles.controller.flush().unwrap();
1017
1018 tick_session.tick().await;
1019 tick_session.tick().await;
1020 tick_session.tick().await;
1021
1022 let log = sink_mock.take_set_volume_log().unwrap();
1023 assert_eq!(log.len(), 1);
1024 assert!(log[0] >= 0.24, "{}", log[0]);
1025 assert!(log[0] <= 0.26, "{}", log[0]);
1026 }
1027}