thedes_tui_core/
input.rs

1use std::time::Duration;
2
3use device::InputDevice;
4use thedes_async_util::non_blocking::{self, spsc::watch::MessageBox};
5use thiserror::Error;
6
7use crate::{
8    event::{Event, InternalEvent},
9    geometry::CoordPair,
10    runtime,
11    status::Status,
12};
13
14pub mod device;
15
16#[derive(Debug, Error)]
17pub enum Error {
18    #[error("Failed to read event from input device")]
19    Device(
20        #[from]
21        #[source]
22        device::Error,
23    ),
24}
25
26#[derive(Debug, Error)]
27#[error("Failed to read value from event reactor")]
28pub struct ReadError {
29    #[source]
30    inner: non_blocking::spsc::bounded::RecvError,
31}
32
33impl ReadError {
34    fn new(inner: non_blocking::spsc::bounded::RecvError) -> Self {
35        Self { inner }
36    }
37}
38
39#[derive(Debug, Error)]
40#[error("Terminal size watch publisher disconnected")]
41pub struct TermSizeWatchError {
42    #[source]
43    inner: non_blocking::spsc::watch::RecvError,
44}
45
46impl TermSizeWatchError {
47    fn new(inner: non_blocking::spsc::watch::RecvError) -> Self {
48        Self { inner }
49    }
50}
51
52#[derive(Debug)]
53pub(crate) struct InputHandles {
54    pub events: EventReader,
55    pub term_size: TermSizeWatch,
56}
57
58#[derive(Debug)]
59pub(crate) struct OpenResources {
60    pub device: Box<dyn InputDevice>,
61    pub status: Status,
62}
63
64#[derive(Debug, Clone)]
65pub struct Config {
66    poll_timeout: Duration,
67    buf_size: usize,
68}
69
70impl Config {
71    pub fn new() -> Self {
72        Self { poll_timeout: Duration::from_millis(160), buf_size: 16 }
73    }
74
75    pub fn with_poll_timeout(self, duration: Duration) -> Self {
76        Self { poll_timeout: duration, ..self }
77    }
78
79    pub fn with_buf_size(self, buf_size: usize) -> Self {
80        Self { buf_size, ..self }
81    }
82
83    pub(crate) fn open(
84        self,
85        resources: OpenResources,
86        join_set: &mut runtime::JoinSet,
87    ) -> InputHandles {
88        let (event_sender, event_receiver) =
89            non_blocking::spsc::bounded::channel(self.buf_size);
90        let (term_size_sender, term_size_receiver) =
91            non_blocking::spsc::watch::channel();
92
93        let mut reactor =
94            Reactor::new(self, resources, term_size_sender, event_sender);
95        join_set.spawn_blocking(move || reactor.run());
96
97        let event_reader = EventReader::new(event_receiver);
98        let term_size_watch = TermSizeWatch::new(term_size_receiver);
99        InputHandles { events: event_reader, term_size: term_size_watch }
100    }
101}
102
103#[derive(Debug)]
104pub struct TermSizeWatch {
105    receiver: non_blocking::spsc::watch::Receiver<MessageBox<CoordPair>>,
106}
107
108impl TermSizeWatch {
109    pub(crate) fn new(
110        receiver: non_blocking::spsc::watch::Receiver<MessageBox<CoordPair>>,
111    ) -> Self {
112        Self { receiver }
113    }
114
115    pub fn is_connected(&self) -> bool {
116        self.receiver.is_connected()
117    }
118
119    pub fn recv(&mut self) -> Result<Option<CoordPair>, TermSizeWatchError> {
120        self.receiver.recv().map_err(TermSizeWatchError::new)
121    }
122}
123
124#[derive(Debug)]
125pub struct EventReader {
126    receiver: non_blocking::spsc::bounded::Receiver<Event>,
127}
128
129impl EventReader {
130    fn new(receiver: non_blocking::spsc::bounded::Receiver<Event>) -> Self {
131        Self { receiver }
132    }
133
134    pub fn is_connected(&self) -> bool {
135        self.receiver.is_connected()
136    }
137
138    pub fn read_one(&mut self) -> Result<Option<Event>, ReadError> {
139        self.receiver.recv_one().map_err(ReadError::new)
140    }
141
142    pub fn read_until_now<'a>(
143        &'a mut self,
144    ) -> Result<ReadUntilThen<'a>, ReadError> {
145        self.receiver
146            .recv_many()
147            .map(ReadUntilThen::new)
148            .map_err(ReadError::new)
149    }
150}
151
152#[derive(Debug)]
153pub struct ReadUntilThen<'a> {
154    inner: non_blocking::spsc::bounded::RecvMany<'a, Event>,
155}
156
157impl<'a> ReadUntilThen<'a> {
158    fn new(inner: non_blocking::spsc::bounded::RecvMany<'a, Event>) -> Self {
159        Self { inner }
160    }
161}
162
163impl<'a> Iterator for ReadUntilThen<'a> {
164    type Item = Event;
165
166    fn next(&mut self) -> Option<Self::Item> {
167        self.inner.next()
168    }
169}
170
171#[derive(Debug)]
172struct Reactor {
173    device: Box<dyn InputDevice>,
174    poll_timeout: Duration,
175    term_size_sender: non_blocking::spsc::watch::Sender<MessageBox<CoordPair>>,
176    external_event_sender: non_blocking::spsc::bounded::Sender<Event>,
177    status: Status,
178}
179
180impl Reactor {
181    fn new(
182        config: Config,
183        resources: OpenResources,
184        resize_sender: non_blocking::spsc::watch::Sender<MessageBox<CoordPair>>,
185        external_event_sender: non_blocking::spsc::bounded::Sender<Event>,
186    ) -> Self {
187        Self {
188            device: resources.device,
189            poll_timeout: config.poll_timeout,
190            term_size_sender: resize_sender,
191            external_event_sender,
192            status: resources.status,
193        }
194    }
195
196    pub fn run(&mut self) -> Result<(), runtime::Error> {
197        loop {
198            if let Some(event) = self
199                .device
200                .blocking_read(self.poll_timeout)
201                .map_err(Error::Device)?
202            {
203                match event {
204                    InternalEvent::Resize(event) => {
205                        let _ = self.term_size_sender.send(event.size);
206                    },
207                    InternalEvent::External(event) => {
208                        if !self.status.is_blocked() {
209                            let _ = self.external_event_sender.send(event);
210                        }
211                    },
212                }
213            }
214
215            if !self.term_size_sender.is_connected() {
216                tracing::info!("Terminal size receiver disconnected");
217                break;
218            }
219            if !self.external_event_sender.is_connected() {
220                tracing::info!("External event receiver disconnected");
221                break;
222            }
223        }
224
225        Ok(())
226    }
227}
228
229#[cfg(test)]
230mod test {
231    use std::time::Duration;
232
233    use tokio::{io, time::timeout};
234
235    use crate::{
236        event::{Event, InternalEvent, Key, KeyEvent, ResizeEvent},
237        geometry::CoordPair,
238        input::{
239            InputHandles,
240            OpenResources,
241            device::{self, mock::InputDeviceMock},
242        },
243        runtime::JoinSet,
244        status::Status,
245    };
246
247    use super::Config;
248
249    #[tokio::test]
250    async fn send_correct_events() {
251        let device_mock = InputDeviceMock::new();
252        let device = device_mock.open();
253        let mut join_set = JoinSet::new();
254        let resources = OpenResources { device, status: Status::new() };
255
256        device_mock.publish_ok([
257            InternalEvent::External(Event::Key(KeyEvent {
258                alt: false,
259                ctrl: false,
260                shift: false,
261                main_key: Key::Esc,
262            })),
263            InternalEvent::Resize(ResizeEvent {
264                size: CoordPair { y: 30, x: 100 },
265            }),
266            InternalEvent::External(Event::Key(KeyEvent {
267                alt: false,
268                ctrl: true,
269                shift: false,
270                main_key: Key::Enter,
271            })),
272        ]);
273
274        let mut handles = Config::new()
275            .with_poll_timeout(Duration::from_millis(1))
276            .open(resources, &mut join_set);
277
278        let max_tries = 10;
279        let mut esc_event = None;
280        for _ in 0 .. max_tries {
281            esc_event = handles.events.read_one().unwrap();
282            if esc_event.is_some() {
283                break;
284            }
285            tokio::time::sleep(Duration::from_millis(1)).await;
286        }
287        let esc_event = esc_event.unwrap();
288
289        assert_eq!(
290            esc_event,
291            Event::Key(KeyEvent {
292                alt: false,
293                ctrl: false,
294                shift: false,
295                main_key: Key::Esc,
296            })
297        );
298
299        let enter_event = handles.events.read_one().unwrap().unwrap();
300        assert_eq!(
301            enter_event,
302            Event::Key(KeyEvent {
303                alt: false,
304                ctrl: true,
305                shift: false,
306                main_key: Key::Enter,
307            })
308        );
309
310        assert_eq!(handles.events.read_one().unwrap(), None);
311
312        let resize_event = handles.term_size.recv().unwrap().unwrap();
313        assert_eq!(resize_event, CoordPair { y: 30, x: 100 },);
314
315        assert_eq!(handles.term_size.recv().unwrap(), None);
316
317        drop(handles);
318
319        let results = timeout(Duration::from_millis(200), join_set.join_all())
320            .await
321            .unwrap();
322        for result in results {
323            result.unwrap();
324        }
325    }
326
327    #[tokio::test]
328    async fn block_if_blocked() {
329        let device_mock = InputDeviceMock::new();
330        device_mock.enable_timeout_log();
331
332        let device = device_mock.open();
333        let mut join_set = JoinSet::new();
334        let status = Status::new();
335        let resources = OpenResources { device, status: status.clone() };
336
337        status.set_blocked(true);
338
339        device_mock.publish_ok([
340            InternalEvent::External(Event::Key(KeyEvent {
341                alt: false,
342                ctrl: false,
343                shift: false,
344                main_key: Key::Esc,
345            })),
346            InternalEvent::Resize(ResizeEvent {
347                size: CoordPair { y: 30, x: 100 },
348            }),
349            InternalEvent::External(Event::Key(KeyEvent {
350                alt: false,
351                ctrl: true,
352                shift: false,
353                main_key: Key::Enter,
354            })),
355        ]);
356
357        let handles = Config::new()
358            .with_poll_timeout(Duration::from_millis(1))
359            .open(resources, &mut join_set);
360
361        let InputHandles { mut events, term_size } = handles;
362        drop(term_size);
363
364        let results = timeout(Duration::from_millis(200), join_set.join_all())
365            .await
366            .unwrap();
367        for result in results {
368            result.unwrap();
369        }
370
371        events.read_until_now().unwrap_err();
372    }
373
374    #[tokio::test]
375    async fn use_configured_timeout() {
376        let device_mock = InputDeviceMock::new();
377        device_mock.enable_timeout_log();
378
379        let device = device_mock.open();
380        let mut join_set = JoinSet::new();
381        let resources = OpenResources { device, status: Status::new() };
382
383        device_mock.publish_ok([
384            InternalEvent::External(Event::Key(KeyEvent {
385                alt: false,
386                ctrl: false,
387                shift: false,
388                main_key: Key::Esc,
389            })),
390            InternalEvent::Resize(ResizeEvent {
391                size: CoordPair { y: 30, x: 100 },
392            }),
393            InternalEvent::External(Event::Key(KeyEvent {
394                alt: false,
395                ctrl: true,
396                shift: false,
397                main_key: Key::Enter,
398            })),
399        ]);
400
401        let handles = Config::new()
402            .with_poll_timeout(Duration::from_millis(1))
403            .open(resources, &mut join_set);
404
405        drop(handles);
406
407        let results = timeout(Duration::from_millis(200), join_set.join_all())
408            .await
409            .unwrap();
410        for result in results {
411            result.unwrap();
412        }
413
414        let timeout_log = device_mock.take_timeout_log().unwrap();
415        for timeout in timeout_log {
416            assert_eq!(timeout, Duration::from_millis(1));
417        }
418    }
419
420    #[tokio::test]
421    async fn stop_on_error() {
422        let device_mock = InputDeviceMock::new();
423        let device = device_mock.open();
424        let mut join_set = JoinSet::new();
425        let resources = OpenResources { device, status: Status::new() };
426
427        let error = io::ErrorKind::Unsupported.into();
428        device_mock.publish([
429            Err(device::Error::Io(error)),
430            Ok(InternalEvent::External(Event::Key(KeyEvent {
431                alt: false,
432                ctrl: true,
433                shift: false,
434                main_key: Key::Enter,
435            }))),
436        ]);
437
438        let handles = Config::new()
439            .with_poll_timeout(Duration::from_millis(1))
440            .open(resources, &mut join_set);
441
442        drop(handles);
443
444        let results = timeout(Duration::from_millis(200), join_set.join_all())
445            .await
446            .unwrap();
447        assert!(
448            results.iter().any(|result| result.is_err()),
449            "results: {results:#?}",
450        );
451    }
452
453    #[tokio::test]
454    async fn stops_if_event_listener_disconnects() {
455        let device_mock = InputDeviceMock::new();
456        let device = device_mock.open();
457        let mut join_set = JoinSet::new();
458        let resources = OpenResources { device, status: Status::new() };
459
460        device_mock.publish_ok([
461            InternalEvent::External(Event::Key(KeyEvent {
462                alt: false,
463                ctrl: false,
464                shift: false,
465                main_key: Key::Esc,
466            })),
467            InternalEvent::Resize(ResizeEvent {
468                size: CoordPair { y: 30, x: 100 },
469            }),
470            InternalEvent::External(Event::Key(KeyEvent {
471                alt: false,
472                ctrl: true,
473                shift: false,
474                main_key: Key::Enter,
475            })),
476        ]);
477
478        let handles = Config::new()
479            .with_poll_timeout(Duration::from_millis(1))
480            .open(resources, &mut join_set);
481
482        let InputHandles { events, term_size } = handles;
483
484        drop(events);
485
486        let results = timeout(Duration::from_millis(200), join_set.join_all())
487            .await
488            .unwrap();
489        for result in results {
490            result.unwrap();
491        }
492
493        drop(term_size);
494    }
495
496    #[tokio::test]
497    async fn stops_if_term_size_watch_disconnects() {
498        let device_mock = InputDeviceMock::new();
499        let device = device_mock.open();
500        let mut join_set = JoinSet::new();
501        let resources = OpenResources { device, status: Status::new() };
502
503        device_mock.publish_ok([
504            InternalEvent::External(Event::Key(KeyEvent {
505                alt: false,
506                ctrl: false,
507                shift: false,
508                main_key: Key::Esc,
509            })),
510            InternalEvent::Resize(ResizeEvent {
511                size: CoordPair { y: 30, x: 100 },
512            }),
513            InternalEvent::External(Event::Key(KeyEvent {
514                alt: false,
515                ctrl: true,
516                shift: false,
517                main_key: Key::Enter,
518            })),
519        ]);
520
521        let handles = Config::new()
522            .with_poll_timeout(Duration::from_millis(1))
523            .open(resources, &mut join_set);
524
525        let InputHandles { events, term_size } = handles;
526
527        drop(term_size);
528
529        let results = timeout(Duration::from_millis(200), join_set.join_all())
530            .await
531            .unwrap();
532        for result in results {
533            result.unwrap();
534        }
535
536        drop(events);
537    }
538}