1use std::time::Duration;
2
3use device::InputDevice;
4use thedes_async_util::non_blocking::{self, spsc::watch::MessageBox};
5use thiserror::Error;
6
7use crate::{
8 event::{Event, InternalEvent},
9 geometry::CoordPair,
10 runtime,
11 status::Status,
12};
13
14pub mod device;
15
16#[derive(Debug, Error)]
17pub enum Error {
18 #[error("Failed to read event from input device")]
19 Device(
20 #[from]
21 #[source]
22 device::Error,
23 ),
24}
25
26#[derive(Debug, Error)]
27#[error("Failed to read value from event reactor")]
28pub struct ReadError {
29 #[source]
30 inner: non_blocking::spsc::bounded::RecvError,
31}
32
33impl ReadError {
34 fn new(inner: non_blocking::spsc::bounded::RecvError) -> Self {
35 Self { inner }
36 }
37}
38
39#[derive(Debug, Error)]
40#[error("Terminal size watch publisher disconnected")]
41pub struct TermSizeWatchError {
42 #[source]
43 inner: non_blocking::spsc::watch::RecvError,
44}
45
46impl TermSizeWatchError {
47 fn new(inner: non_blocking::spsc::watch::RecvError) -> Self {
48 Self { inner }
49 }
50}
51
52#[derive(Debug)]
53pub(crate) struct InputHandles {
54 pub events: EventReader,
55 pub term_size: TermSizeWatch,
56}
57
58#[derive(Debug)]
59pub(crate) struct OpenResources {
60 pub device: Box<dyn InputDevice>,
61 pub status: Status,
62}
63
64#[derive(Debug, Clone)]
65pub struct Config {
66 poll_timeout: Duration,
67 buf_size: usize,
68}
69
70impl Config {
71 pub fn new() -> Self {
72 Self { poll_timeout: Duration::from_millis(160), buf_size: 16 }
73 }
74
75 pub fn with_poll_timeout(self, duration: Duration) -> Self {
76 Self { poll_timeout: duration, ..self }
77 }
78
79 pub fn with_buf_size(self, buf_size: usize) -> Self {
80 Self { buf_size, ..self }
81 }
82
83 pub(crate) fn open(
84 self,
85 resources: OpenResources,
86 join_set: &mut runtime::JoinSet,
87 ) -> InputHandles {
88 let (event_sender, event_receiver) =
89 non_blocking::spsc::bounded::channel(self.buf_size);
90 let (term_size_sender, term_size_receiver) =
91 non_blocking::spsc::watch::channel();
92
93 let mut reactor =
94 Reactor::new(self, resources, term_size_sender, event_sender);
95 join_set.spawn_blocking(move || reactor.run());
96
97 let event_reader = EventReader::new(event_receiver);
98 let term_size_watch = TermSizeWatch::new(term_size_receiver);
99 InputHandles { events: event_reader, term_size: term_size_watch }
100 }
101}
102
103#[derive(Debug)]
104pub struct TermSizeWatch {
105 receiver: non_blocking::spsc::watch::Receiver<MessageBox<CoordPair>>,
106}
107
108impl TermSizeWatch {
109 pub(crate) fn new(
110 receiver: non_blocking::spsc::watch::Receiver<MessageBox<CoordPair>>,
111 ) -> Self {
112 Self { receiver }
113 }
114
115 pub fn is_connected(&self) -> bool {
116 self.receiver.is_connected()
117 }
118
119 pub fn recv(&mut self) -> Result<Option<CoordPair>, TermSizeWatchError> {
120 self.receiver.recv().map_err(TermSizeWatchError::new)
121 }
122}
123
124#[derive(Debug)]
125pub struct EventReader {
126 receiver: non_blocking::spsc::bounded::Receiver<Event>,
127}
128
129impl EventReader {
130 fn new(receiver: non_blocking::spsc::bounded::Receiver<Event>) -> Self {
131 Self { receiver }
132 }
133
134 pub fn is_connected(&self) -> bool {
135 self.receiver.is_connected()
136 }
137
138 pub fn read_one(&mut self) -> Result<Option<Event>, ReadError> {
139 self.receiver.recv_one().map_err(ReadError::new)
140 }
141
142 pub fn read_until_now<'a>(
143 &'a mut self,
144 ) -> Result<ReadUntilThen<'a>, ReadError> {
145 self.receiver
146 .recv_many()
147 .map(ReadUntilThen::new)
148 .map_err(ReadError::new)
149 }
150}
151
152#[derive(Debug)]
153pub struct ReadUntilThen<'a> {
154 inner: non_blocking::spsc::bounded::RecvMany<'a, Event>,
155}
156
157impl<'a> ReadUntilThen<'a> {
158 fn new(inner: non_blocking::spsc::bounded::RecvMany<'a, Event>) -> Self {
159 Self { inner }
160 }
161}
162
163impl<'a> Iterator for ReadUntilThen<'a> {
164 type Item = Event;
165
166 fn next(&mut self) -> Option<Self::Item> {
167 self.inner.next()
168 }
169}
170
171#[derive(Debug)]
172struct Reactor {
173 device: Box<dyn InputDevice>,
174 poll_timeout: Duration,
175 term_size_sender: non_blocking::spsc::watch::Sender<MessageBox<CoordPair>>,
176 external_event_sender: non_blocking::spsc::bounded::Sender<Event>,
177 status: Status,
178}
179
180impl Reactor {
181 fn new(
182 config: Config,
183 resources: OpenResources,
184 resize_sender: non_blocking::spsc::watch::Sender<MessageBox<CoordPair>>,
185 external_event_sender: non_blocking::spsc::bounded::Sender<Event>,
186 ) -> Self {
187 Self {
188 device: resources.device,
189 poll_timeout: config.poll_timeout,
190 term_size_sender: resize_sender,
191 external_event_sender,
192 status: resources.status,
193 }
194 }
195
196 pub fn run(&mut self) -> Result<(), runtime::Error> {
197 loop {
198 if let Some(event) = self
199 .device
200 .blocking_read(self.poll_timeout)
201 .map_err(Error::Device)?
202 {
203 match event {
204 InternalEvent::Resize(event) => {
205 let _ = self.term_size_sender.send(event.size);
206 },
207 InternalEvent::External(event) => {
208 if !self.status.is_blocked() {
209 let _ = self.external_event_sender.send(event);
210 }
211 },
212 }
213 }
214
215 if !self.term_size_sender.is_connected() {
216 tracing::info!("Terminal size receiver disconnected");
217 break;
218 }
219 if !self.external_event_sender.is_connected() {
220 tracing::info!("External event receiver disconnected");
221 break;
222 }
223 }
224
225 Ok(())
226 }
227}
228
229#[cfg(test)]
230mod test {
231 use std::time::Duration;
232
233 use tokio::{io, time::timeout};
234
235 use crate::{
236 event::{Event, InternalEvent, Key, KeyEvent, ResizeEvent},
237 geometry::CoordPair,
238 input::{
239 InputHandles,
240 OpenResources,
241 device::{self, mock::InputDeviceMock},
242 },
243 runtime::JoinSet,
244 status::Status,
245 };
246
247 use super::Config;
248
249 #[tokio::test]
250 async fn send_correct_events() {
251 let device_mock = InputDeviceMock::new();
252 let device = device_mock.open();
253 let mut join_set = JoinSet::new();
254 let resources = OpenResources { device, status: Status::new() };
255
256 device_mock.publish_ok([
257 InternalEvent::External(Event::Key(KeyEvent {
258 alt: false,
259 ctrl: false,
260 shift: false,
261 main_key: Key::Esc,
262 })),
263 InternalEvent::Resize(ResizeEvent {
264 size: CoordPair { y: 30, x: 100 },
265 }),
266 InternalEvent::External(Event::Key(KeyEvent {
267 alt: false,
268 ctrl: true,
269 shift: false,
270 main_key: Key::Enter,
271 })),
272 ]);
273
274 let mut handles = Config::new()
275 .with_poll_timeout(Duration::from_millis(1))
276 .open(resources, &mut join_set);
277
278 let max_tries = 10;
279 let mut esc_event = None;
280 for _ in 0 .. max_tries {
281 esc_event = handles.events.read_one().unwrap();
282 if esc_event.is_some() {
283 break;
284 }
285 tokio::time::sleep(Duration::from_millis(1)).await;
286 }
287 let esc_event = esc_event.unwrap();
288
289 assert_eq!(
290 esc_event,
291 Event::Key(KeyEvent {
292 alt: false,
293 ctrl: false,
294 shift: false,
295 main_key: Key::Esc,
296 })
297 );
298
299 let enter_event = handles.events.read_one().unwrap().unwrap();
300 assert_eq!(
301 enter_event,
302 Event::Key(KeyEvent {
303 alt: false,
304 ctrl: true,
305 shift: false,
306 main_key: Key::Enter,
307 })
308 );
309
310 assert_eq!(handles.events.read_one().unwrap(), None);
311
312 let resize_event = handles.term_size.recv().unwrap().unwrap();
313 assert_eq!(resize_event, CoordPair { y: 30, x: 100 },);
314
315 assert_eq!(handles.term_size.recv().unwrap(), None);
316
317 drop(handles);
318
319 let results = timeout(Duration::from_millis(200), join_set.join_all())
320 .await
321 .unwrap();
322 for result in results {
323 result.unwrap();
324 }
325 }
326
327 #[tokio::test]
328 async fn block_if_blocked() {
329 let device_mock = InputDeviceMock::new();
330 device_mock.enable_timeout_log();
331
332 let device = device_mock.open();
333 let mut join_set = JoinSet::new();
334 let status = Status::new();
335 let resources = OpenResources { device, status: status.clone() };
336
337 status.set_blocked(true);
338
339 device_mock.publish_ok([
340 InternalEvent::External(Event::Key(KeyEvent {
341 alt: false,
342 ctrl: false,
343 shift: false,
344 main_key: Key::Esc,
345 })),
346 InternalEvent::Resize(ResizeEvent {
347 size: CoordPair { y: 30, x: 100 },
348 }),
349 InternalEvent::External(Event::Key(KeyEvent {
350 alt: false,
351 ctrl: true,
352 shift: false,
353 main_key: Key::Enter,
354 })),
355 ]);
356
357 let handles = Config::new()
358 .with_poll_timeout(Duration::from_millis(1))
359 .open(resources, &mut join_set);
360
361 let InputHandles { mut events, term_size } = handles;
362 drop(term_size);
363
364 let results = timeout(Duration::from_millis(200), join_set.join_all())
365 .await
366 .unwrap();
367 for result in results {
368 result.unwrap();
369 }
370
371 events.read_until_now().unwrap_err();
372 }
373
374 #[tokio::test]
375 async fn use_configured_timeout() {
376 let device_mock = InputDeviceMock::new();
377 device_mock.enable_timeout_log();
378
379 let device = device_mock.open();
380 let mut join_set = JoinSet::new();
381 let resources = OpenResources { device, status: Status::new() };
382
383 device_mock.publish_ok([
384 InternalEvent::External(Event::Key(KeyEvent {
385 alt: false,
386 ctrl: false,
387 shift: false,
388 main_key: Key::Esc,
389 })),
390 InternalEvent::Resize(ResizeEvent {
391 size: CoordPair { y: 30, x: 100 },
392 }),
393 InternalEvent::External(Event::Key(KeyEvent {
394 alt: false,
395 ctrl: true,
396 shift: false,
397 main_key: Key::Enter,
398 })),
399 ]);
400
401 let handles = Config::new()
402 .with_poll_timeout(Duration::from_millis(1))
403 .open(resources, &mut join_set);
404
405 drop(handles);
406
407 let results = timeout(Duration::from_millis(200), join_set.join_all())
408 .await
409 .unwrap();
410 for result in results {
411 result.unwrap();
412 }
413
414 let timeout_log = device_mock.take_timeout_log().unwrap();
415 for timeout in timeout_log {
416 assert_eq!(timeout, Duration::from_millis(1));
417 }
418 }
419
420 #[tokio::test]
421 async fn stop_on_error() {
422 let device_mock = InputDeviceMock::new();
423 let device = device_mock.open();
424 let mut join_set = JoinSet::new();
425 let resources = OpenResources { device, status: Status::new() };
426
427 let error = io::ErrorKind::Unsupported.into();
428 device_mock.publish([
429 Err(device::Error::Io(error)),
430 Ok(InternalEvent::External(Event::Key(KeyEvent {
431 alt: false,
432 ctrl: true,
433 shift: false,
434 main_key: Key::Enter,
435 }))),
436 ]);
437
438 let handles = Config::new()
439 .with_poll_timeout(Duration::from_millis(1))
440 .open(resources, &mut join_set);
441
442 drop(handles);
443
444 let results = timeout(Duration::from_millis(200), join_set.join_all())
445 .await
446 .unwrap();
447 assert!(
448 results.iter().any(|result| result.is_err()),
449 "results: {results:#?}",
450 );
451 }
452
453 #[tokio::test]
454 async fn stops_if_event_listener_disconnects() {
455 let device_mock = InputDeviceMock::new();
456 let device = device_mock.open();
457 let mut join_set = JoinSet::new();
458 let resources = OpenResources { device, status: Status::new() };
459
460 device_mock.publish_ok([
461 InternalEvent::External(Event::Key(KeyEvent {
462 alt: false,
463 ctrl: false,
464 shift: false,
465 main_key: Key::Esc,
466 })),
467 InternalEvent::Resize(ResizeEvent {
468 size: CoordPair { y: 30, x: 100 },
469 }),
470 InternalEvent::External(Event::Key(KeyEvent {
471 alt: false,
472 ctrl: true,
473 shift: false,
474 main_key: Key::Enter,
475 })),
476 ]);
477
478 let handles = Config::new()
479 .with_poll_timeout(Duration::from_millis(1))
480 .open(resources, &mut join_set);
481
482 let InputHandles { events, term_size } = handles;
483
484 drop(events);
485
486 let results = timeout(Duration::from_millis(200), join_set.join_all())
487 .await
488 .unwrap();
489 for result in results {
490 result.unwrap();
491 }
492
493 drop(term_size);
494 }
495
496 #[tokio::test]
497 async fn stops_if_term_size_watch_disconnects() {
498 let device_mock = InputDeviceMock::new();
499 let device = device_mock.open();
500 let mut join_set = JoinSet::new();
501 let resources = OpenResources { device, status: Status::new() };
502
503 device_mock.publish_ok([
504 InternalEvent::External(Event::Key(KeyEvent {
505 alt: false,
506 ctrl: false,
507 shift: false,
508 main_key: Key::Esc,
509 })),
510 InternalEvent::Resize(ResizeEvent {
511 size: CoordPair { y: 30, x: 100 },
512 }),
513 InternalEvent::External(Event::Key(KeyEvent {
514 alt: false,
515 ctrl: true,
516 shift: false,
517 main_key: Key::Enter,
518 })),
519 ]);
520
521 let handles = Config::new()
522 .with_poll_timeout(Duration::from_millis(1))
523 .open(resources, &mut join_set);
524
525 let InputHandles { events, term_size } = handles;
526
527 drop(term_size);
528
529 let results = timeout(Duration::from_millis(200), join_set.join_all())
530 .await
531 .unwrap();
532 for result in results {
533 result.unwrap();
534 }
535
536 drop(events);
537 }
538}