melvin_ob/objective/
beacon_controller.rs1use super::{BeaconObjective, BeaconMeas, beacon_objective_done::BeaconObjectiveDone};
2use crate::flight_control::FlightComputer;
3use crate::http_handler::http_client::HTTPClient;
4use crate::util::logger::JsonDump;
5use crate::{event, obj, warn};
6use chrono::{DateTime, TimeDelta, Utc};
7use regex::Regex;
8use std::{collections::HashMap, sync::{Arc, LazyLock}, time::Duration};
9use tokio::{time::interval, sync::{mpsc::Receiver, Mutex, RwLock, watch}};
10
11pub struct BeaconController {
22 active_bo: RwLock<HashMap<usize, BeaconObjective>>,
24 done_bo: RwLock<HashMap<usize, BeaconObjectiveDone>>,
26 beacon_rx: Mutex<Receiver<BeaconObjective>>,
28 state_rx: watch::Sender<BeaconControllerState>,
30}
31
32#[derive(Copy, Clone)]
34pub enum BeaconControllerState {
35 ActiveBeacons,
37 NoActiveBeacons,
39}
40
41static BO_REGEX: LazyLock<Regex> = LazyLock::new(|| {
44 Regex::new(r"(?i)ID[_, ]?(\d+).*?DISTANCE[_, ]?(([0-9]*[.])?[0-9]+)").unwrap()
45});
46
47impl BeaconController {
48 const TIME_TO_NEXT_PASSIVE_CHECK: Duration = Duration::from_secs(30);
50 const MAX_ESTIMATE_GUESSES: usize = 5;
52
53 pub fn new(
61 rx_beac: Receiver<BeaconObjective>,
62 ) -> (Self, watch::Receiver<BeaconControllerState>) {
63 let (tx, rx) = watch::channel(BeaconControllerState::NoActiveBeacons);
64 (
65 Self {
66 active_bo: RwLock::new(HashMap::new()),
67 done_bo: RwLock::new(HashMap::new()),
68 beacon_rx: Mutex::new(rx_beac),
69 state_rx: tx,
70 },
71 rx,
72 )
73 }
74
75 pub async fn run(self: Arc<Self>, handler: Arc<HTTPClient>) {
84 let mut approaching_end_interval = interval(Self::TIME_TO_NEXT_PASSIVE_CHECK);
85 let mut beac_rx_locked = self.beacon_rx.lock().await;
86 loop {
87 tokio::select! {
88 _ = approaching_end_interval.tick() =>
89 {self.check_approaching_end(&handler).await}
90
91 Some(beac_obj) = beac_rx_locked.recv() => {
92 self.add_beacon(beac_obj).await;
93 }
94 }
95 }
96 }
97
98 pub async fn last_active_beac_end(&self) -> Option<DateTime<Utc>> {
103 self.active_bo.read().await.values().map(BeaconObjective::end).max()
104 }
105
106 fn extract_id_and_d(input: &str) -> Option<(usize, f64)> {
114 if let Some(captures) = BO_REGEX.captures(input) {
116 if let (Some(beacon_id), Some(d_noisy)) = (captures.get(1), captures.get(2)) {
118 let id: usize = beacon_id.as_str().parse().unwrap();
119 let d_n: f64 = d_noisy.as_str().parse().unwrap();
120 return Some((id, d_n));
121 }
122 }
123 None }
125
126 pub async fn handle_poss_bo_ping(
134 &self,
135 msg: (DateTime<Utc>, String),
136 f_cont: Arc<RwLock<FlightComputer>>,
137 ) {
138 let (t, val) = msg;
139 if let Some((id, d_noisy)) = Self::extract_id_and_d(val.as_str()) {
140 let f_cont_lock = f_cont.read().await;
141 let pos = f_cont_lock.current_pos();
142
143 let msg_delay = Utc::now() - t;
144 let meas = BeaconMeas::new(id, pos, d_noisy, msg_delay);
145 obj!("Received BO measurement at {pos} for ID {id} with distance {d_noisy}.");
146 let mut active_lock = self.active_bo.write().await;
147 if let Some(obj) = active_lock.get_mut(&id) {
148 obj!("Updating BO {id} measurement list!");
149 obj.append_measurement(meas);
150 } else {
151 warn!("Unknown BO ID {id}. Ignoring!");
152 }
153 } else {
154 event!("Message has unknown format {val:#?}. Ignoring.");
155 }
156 }
157
158 async fn add_beacon(&self, obj: BeaconObjective) {
165 obj!(
166 "The Beacon {}-'{}' is lit! Gondor calls for Aid! Available Timeframe {} - {}.",
167 obj.id(),
168 obj.name(),
169 obj.start().format("%d %H:%M:%S").to_string(),
170 obj.end().format("%d %H:%M:%S").to_string()
171 );
172 let empty = self.active_bo.read().await.is_empty();
173 self.active_bo.write().await.insert(obj.id(), obj);
174 if empty {
175 self.state_rx.send(BeaconControllerState::ActiveBeacons).expect("Failed to send state");
176 }
177 }
178
179 async fn move_to_done(&self, finished: HashMap<usize, BeaconObjective>) {
186 let mut done_bo = self.done_bo.write().await;
187 for (id, beacon) in finished {
188 beacon.dump_json();
189 let done_beacon = BeaconObjectiveDone::from(beacon);
190 let guesses = done_beacon.guesses().len();
191 obj!("Finished Beacon objective: ID {id} with {guesses} guesses.");
192 done_bo.insert(done_beacon.id(), done_beacon.clone());
193 }
194 }
195
196 async fn check_approaching_end(&self, handler: &Arc<HTTPClient>) {
205 let mut finished = HashMap::new();
206 let deadline = Utc::now() + Self::TIME_TO_NEXT_PASSIVE_CHECK + TimeDelta::seconds(10);
207 let no_more_beacons = {
208 let mut active_beacon_tasks = self.active_bo.write().await;
209 active_beacon_tasks.retain(|id, beacon: &mut BeaconObjective| {
210 let finished_cond = beacon
211 .measurements()
212 .is_some_and(|b| b.guess_estimate() < Self::MAX_ESTIMATE_GUESSES);
213 let deadline_cond = beacon.end() < deadline;
214 if deadline_cond || finished_cond {
215 obj!(
216 "Active BO end is less than {} s away: ID {id}. Submitting this now!",
217 Self::TIME_TO_NEXT_PASSIVE_CHECK.as_secs(),
218 );
219 finished.insert(*id, beacon.clone());
220 false
221 } else {
222 true
223 }
224 });
225 active_beacon_tasks.is_empty()
226 };
227 self.move_to_done(finished).await;
228 if no_more_beacons {
229 self.state_rx
230 .send(BeaconControllerState::NoActiveBeacons)
231 .expect("Failed to send state");
232 }
233 self.handle_beacon_submission(handler).await;
234 }
235
236 async fn handle_beacon_submission(&self, handler: &Arc<HTTPClient>) {
243 let mut done_beacons = self.done_bo.write().await;
244 for beacon in done_beacons.values_mut() {
245 if !beacon.submitted() {
246 beacon.set_submitted();
247 if beacon.guesses().is_empty() {
248 beacon.randomize_no_meas_guesses(Arc::clone(handler)).await;
249 } else {
250 beacon.guess_max(Arc::clone(handler)).await;
251 }
252 }
253 }
254 }
255}