howudoin/
rx.rs

1use super::*;
2use crate::{
3    flat_tree::FlatTree,
4    report::{Message, Report, State},
5};
6use flume::Receiver;
7use std::collections::BTreeSet;
8use Payload::*;
9
10pub(crate) fn spawn<C: Consume>(rx: Receiver<Payload>, mut consumer: C) {
11    let debounce = consumer.debounce();
12
13    let mut controller = Controller::default();
14    let mut chgd_buf = BTreeSet::new();
15    let mut last = Instant::now();
16
17    loop {
18        if rx.is_disconnected() {
19            break; // static tx dropped, exit receiver loop
20        }
21
22        // use a timeout to avoid thrashing the loop
23        let x = if debounce.is_zero() {
24            rx.recv().ok()
25        } else {
26            rx.recv_timeout(debounce).ok()
27        };
28
29        if let Some(x) = x.and_then(|x| controller.process(x)) {
30            chgd_buf.insert(x);
31        }
32
33        if last.elapsed() >= debounce {
34            // debounce duration has occurred; can update the consumer with any changes
35
36            while let Some(id) = chgd_buf.pop_first() {
37                if let Some(Progress_ {
38                    rpt,
39                    children: _,
40                    parent,
41                    started: _,
42                }) = controller.ps.get(&id)
43                {
44                    consumer.rpt(rpt, id, *parent, &controller);
45                } else {
46                    consumer.closed(id);
47                }
48            }
49
50            last = Instant::now();
51        }
52    }
53}
54
55/// The progress consumer loop controller.
56#[derive(Default)]
57pub struct Controller {
58    ps: FlatTree<Id, Progress_>,
59    last: Option<Id>,
60    cancelled: bool,
61    nextid: Id,
62}
63
64impl Controller {
65    fn next_id(&mut self) -> Id {
66        let id = self.nextid;
67        self.nextid = self.nextid.wrapping_add(1);
68        id
69    }
70
71    fn process(&mut self, payload: Payload) -> Option<Id> {
72        match payload {
73            AddReport(None, tx) => {
74                let id = match self.last {
75                    Some(parent) => self.add_child(parent),
76                    None => self.add_root(),
77                };
78
79                tx.send(id).ok();
80                Some(id)
81            }
82
83            AddReport(Some(parent), tx) => {
84                let id = self.add_child(parent);
85                tx.send(id).ok();
86                Some(id)
87            }
88
89            AddRootReport(tx) => {
90                let id = self.add_root();
91                tx.send(id).ok();
92                Some(id)
93            }
94
95            Fetch(tx) => {
96                tx.send(self.build_progress_tree()).ok();
97                None
98            }
99
100            SetLabel(id, label) => {
101                self.set(id, |x, _| x.label = label);
102                Some(id)
103            }
104
105            SetDesc(id, d) => {
106                self.set(id, |x, _| x.desc = d);
107                Some(id)
108            }
109
110            SetLen(id, len) => {
111                self.set(id, |x, _| x.set_len(len));
112                Some(id)
113            }
114
115            Inc(id, by) => {
116                self.set(id, |x, e| x.inc_pos(by, e));
117                Some(id)
118            }
119
120            SetPos(id, pos) => {
121                self.set(id, |x, e| x.update_pos(pos, e));
122                Some(id)
123            }
124
125            SetFmtBytes(id, y) => {
126                self.set(id, |x, _| x.set_fmt_as_bytes(y));
127                Some(id)
128            }
129
130            Accum(id, severity, msg) => {
131                self.set(id, |x, _| x.accums.push(Message { severity, msg }));
132                Some(id)
133            }
134
135            Finish(id) => {
136                self.set(id, |x, e| {
137                    x.state = State::Completed {
138                        duration: e.as_secs_f32(),
139                    }
140                });
141
142                // if finished, do not keep around as a parent
143                if self.last == Some(id) {
144                    self.last = None;
145                }
146
147                Some(id)
148            }
149
150            Close(id) => {
151                self.ps.remove(&id);
152
153                if self.last == Some(id) {
154                    self.last = None;
155                }
156
157                Some(id)
158            }
159
160            Cancel => {
161                self.cancelled = true;
162                None
163            }
164
165            Cancelled(tx) => {
166                tx.send(self.cancelled).ok();
167                None
168            }
169
170            Reset => {
171                *self = Self::default();
172                None
173            }
174        }
175    }
176
177    fn add_root(&mut self) -> Id {
178        let id = self.next_id();
179        self.ps.insert_root(
180            id,
181            Progress_ {
182                parent: None,
183                ..Progress_::root()
184            },
185        );
186        self.last = Some(id);
187        id
188    }
189
190    fn add_child(&mut self, parent: Id) -> Id {
191        let id = self.next_id();
192        match self.ps.get_mut(&parent) {
193            Some(p) => {
194                p.children.push(id);
195                self.ps.insert(
196                    id,
197                    Progress_ {
198                        parent: Some(parent),
199                        ..Progress_::root()
200                    },
201                );
202            }
203            None => {
204                self.ps.insert_root(id, Progress_::root());
205            }
206        }
207
208        self.last = Some(id);
209        id
210    }
211
212    fn set<F: FnOnce(&mut Report, Duration)>(&mut self, id: Id, f: F) {
213        if let Some(x) = self.ps.get_mut(&id) {
214            f(&mut x.rpt, x.started.elapsed())
215        }
216    }
217
218    /// Build the progress tree.
219    ///
220    /// This is utilised by [`fetch`].
221    pub fn build_progress_tree(&self) -> Vec<Progress> {
222        self.ps
223            .roots()
224            .filter_map(|(id, _)| self.build_public_prg_(id))
225            .collect()
226    }
227
228    fn build_public_prg_(&self, id: &Id) -> Option<Progress> {
229        self.ps.get(id).map(
230            |Progress_ {
231                 rpt,
232                 children,
233                 parent: _,
234                 started: _,
235             }| {
236                let children = children
237                    .iter()
238                    .filter_map(|id| self.build_public_prg_(id))
239                    .collect();
240
241                Progress {
242                    report: rpt.clone(),
243                    children,
244                }
245            },
246        )
247    }
248}
249
250struct Progress_ {
251    rpt: Report,
252    children: Vec<Id>,
253    parent: Option<Id>,
254    started: Instant,
255}
256
257impl Progress_ {
258    fn root() -> Self {
259        Self {
260            rpt: Default::default(),
261            children: Default::default(),
262            parent: None,
263            started: Instant::now(),
264        }
265    }
266}
267
268impl Report {
269    fn set_len(&mut self, len_: Option<u64>) {
270        if let State::InProgress { len, .. } = &mut self.state {
271            *len = len_
272        }
273    }
274
275    fn set_fmt_as_bytes(&mut self, x: bool) {
276        if let State::InProgress { bytes, .. } = &mut self.state {
277            *bytes = x
278        }
279    }
280
281    fn inc_pos(&mut self, ticks: u64, elapsed: Duration) {
282        if let State::InProgress { pos, .. } = &self.state {
283            self.update_pos(pos.saturating_add(ticks), elapsed)
284        }
285    }
286
287    fn update_pos(&mut self, pos_: u64, elapsed: Duration) {
288        if let State::InProgress {
289            len,
290            pos,
291            remaining,
292            ..
293        } = &mut self.state
294        {
295            *pos = len.map(|len| len.min(pos_)).unwrap_or(pos_);
296
297            if let Some(len) = *len {
298                let rate = elapsed.as_secs_f32() / *pos as f32;
299                *remaining = (len - *pos) as f32 * rate;
300            }
301        }
302    }
303}