1use super::*;
2use crate::{
3 flat_tree::FlatTree,
4 report::{Message, Report, State},
5};
6use flume::Receiver;
7use std::collections::BTreeSet;
8use Payload::*;
9
10pub(crate) fn spawn<C: Consume>(rx: Receiver<Payload>, mut consumer: C) {
11 let debounce = consumer.debounce();
12
13 let mut controller = Controller::default();
14 let mut chgd_buf = BTreeSet::new();
15 let mut last = Instant::now();
16
17 loop {
18 if rx.is_disconnected() {
19 break; }
21
22 let x = if debounce.is_zero() {
24 rx.recv().ok()
25 } else {
26 rx.recv_timeout(debounce).ok()
27 };
28
29 if let Some(x) = x.and_then(|x| controller.process(x)) {
30 chgd_buf.insert(x);
31 }
32
33 if last.elapsed() >= debounce {
34 while let Some(id) = chgd_buf.pop_first() {
37 if let Some(Progress_ {
38 rpt,
39 children: _,
40 parent,
41 started: _,
42 }) = controller.ps.get(&id)
43 {
44 consumer.rpt(rpt, id, *parent, &controller);
45 } else {
46 consumer.closed(id);
47 }
48 }
49
50 last = Instant::now();
51 }
52 }
53}
54
55#[derive(Default)]
57pub struct Controller {
58 ps: FlatTree<Id, Progress_>,
59 last: Option<Id>,
60 cancelled: bool,
61 nextid: Id,
62}
63
64impl Controller {
65 fn next_id(&mut self) -> Id {
66 let id = self.nextid;
67 self.nextid = self.nextid.wrapping_add(1);
68 id
69 }
70
71 fn process(&mut self, payload: Payload) -> Option<Id> {
72 match payload {
73 AddReport(None, tx) => {
74 let id = match self.last {
75 Some(parent) => self.add_child(parent),
76 None => self.add_root(),
77 };
78
79 tx.send(id).ok();
80 Some(id)
81 }
82
83 AddReport(Some(parent), tx) => {
84 let id = self.add_child(parent);
85 tx.send(id).ok();
86 Some(id)
87 }
88
89 AddRootReport(tx) => {
90 let id = self.add_root();
91 tx.send(id).ok();
92 Some(id)
93 }
94
95 Fetch(tx) => {
96 tx.send(self.build_progress_tree()).ok();
97 None
98 }
99
100 SetLabel(id, label) => {
101 self.set(id, |x, _| x.label = label);
102 Some(id)
103 }
104
105 SetDesc(id, d) => {
106 self.set(id, |x, _| x.desc = d);
107 Some(id)
108 }
109
110 SetLen(id, len) => {
111 self.set(id, |x, _| x.set_len(len));
112 Some(id)
113 }
114
115 Inc(id, by) => {
116 self.set(id, |x, e| x.inc_pos(by, e));
117 Some(id)
118 }
119
120 SetPos(id, pos) => {
121 self.set(id, |x, e| x.update_pos(pos, e));
122 Some(id)
123 }
124
125 SetFmtBytes(id, y) => {
126 self.set(id, |x, _| x.set_fmt_as_bytes(y));
127 Some(id)
128 }
129
130 Accum(id, severity, msg) => {
131 self.set(id, |x, _| x.accums.push(Message { severity, msg }));
132 Some(id)
133 }
134
135 Finish(id) => {
136 self.set(id, |x, e| {
137 x.state = State::Completed {
138 duration: e.as_secs_f32(),
139 }
140 });
141
142 if self.last == Some(id) {
144 self.last = None;
145 }
146
147 Some(id)
148 }
149
150 Close(id) => {
151 self.ps.remove(&id);
152
153 if self.last == Some(id) {
154 self.last = None;
155 }
156
157 Some(id)
158 }
159
160 Cancel => {
161 self.cancelled = true;
162 None
163 }
164
165 Cancelled(tx) => {
166 tx.send(self.cancelled).ok();
167 None
168 }
169
170 Reset => {
171 *self = Self::default();
172 None
173 }
174 }
175 }
176
177 fn add_root(&mut self) -> Id {
178 let id = self.next_id();
179 self.ps.insert_root(
180 id,
181 Progress_ {
182 parent: None,
183 ..Progress_::root()
184 },
185 );
186 self.last = Some(id);
187 id
188 }
189
190 fn add_child(&mut self, parent: Id) -> Id {
191 let id = self.next_id();
192 match self.ps.get_mut(&parent) {
193 Some(p) => {
194 p.children.push(id);
195 self.ps.insert(
196 id,
197 Progress_ {
198 parent: Some(parent),
199 ..Progress_::root()
200 },
201 );
202 }
203 None => {
204 self.ps.insert_root(id, Progress_::root());
205 }
206 }
207
208 self.last = Some(id);
209 id
210 }
211
212 fn set<F: FnOnce(&mut Report, Duration)>(&mut self, id: Id, f: F) {
213 if let Some(x) = self.ps.get_mut(&id) {
214 f(&mut x.rpt, x.started.elapsed())
215 }
216 }
217
218 pub fn build_progress_tree(&self) -> Vec<Progress> {
222 self.ps
223 .roots()
224 .filter_map(|(id, _)| self.build_public_prg_(id))
225 .collect()
226 }
227
228 fn build_public_prg_(&self, id: &Id) -> Option<Progress> {
229 self.ps.get(id).map(
230 |Progress_ {
231 rpt,
232 children,
233 parent: _,
234 started: _,
235 }| {
236 let children = children
237 .iter()
238 .filter_map(|id| self.build_public_prg_(id))
239 .collect();
240
241 Progress {
242 report: rpt.clone(),
243 children,
244 }
245 },
246 )
247 }
248}
249
250struct Progress_ {
251 rpt: Report,
252 children: Vec<Id>,
253 parent: Option<Id>,
254 started: Instant,
255}
256
257impl Progress_ {
258 fn root() -> Self {
259 Self {
260 rpt: Default::default(),
261 children: Default::default(),
262 parent: None,
263 started: Instant::now(),
264 }
265 }
266}
267
268impl Report {
269 fn set_len(&mut self, len_: Option<u64>) {
270 if let State::InProgress { len, .. } = &mut self.state {
271 *len = len_
272 }
273 }
274
275 fn set_fmt_as_bytes(&mut self, x: bool) {
276 if let State::InProgress { bytes, .. } = &mut self.state {
277 *bytes = x
278 }
279 }
280
281 fn inc_pos(&mut self, ticks: u64, elapsed: Duration) {
282 if let State::InProgress { pos, .. } = &self.state {
283 self.update_pos(pos.saturating_add(ticks), elapsed)
284 }
285 }
286
287 fn update_pos(&mut self, pos_: u64, elapsed: Duration) {
288 if let State::InProgress {
289 len,
290 pos,
291 remaining,
292 ..
293 } = &mut self.state
294 {
295 *pos = len.map(|len| len.min(pos_)).unwrap_or(pos_);
296
297 if let Some(len) = *len {
298 let rate = elapsed.as_secs_f32() / *pos as f32;
299 *remaining = (len - *pos) as f32 * rate;
300 }
301 }
302 }
303}