melvin_ob/flight_control/
supervisor.rs

1use super::{FlightComputer, FlightState};
2use crate::imaging::CameraController;
3use crate::objective::{BeaconObjective, KnownImgObjective};
4use crate::http_handler::{
5    ZoneType, ImageObjective,
6    http_request::{
7        objective_list_get::ObjectiveListRequest, request_common::NoBodyHTTPRequestType,
8    },
9};
10use crate::{DT_0_STD, error, event, fatal, info, log, warn, obj};
11use chrono::{DateTime, NaiveTime, TimeDelta, TimeZone, Utc};
12use futures::StreamExt;
13use reqwest_eventsource::{Event, EventSource};
14use std::{collections::HashSet, env, sync::Arc, time::Duration};
15use tokio::{
16    sync::{Notify, RwLock, broadcast, mpsc, mpsc::Receiver},
17    time::Instant,
18};
19
20/// The [`Supervisor`] is responsible for high-level management of active operations,
21/// including observation tracking, secret objective handling, daily map uploads,
22/// safe-mode monitoring, and real-time event listening.
23///
24/// It acts as coordinator between observation, event streams, and
25/// objective scheduling, while ensuring asynchronous notifications and channel-based
26/// communication between system components.
27pub struct Supervisor {
28    /// Lock-protected reference to the [`FlightComputer`], used for updating the global observation.
29    f_cont_lock: Arc<RwLock<FlightComputer>>,
30    /// Notifier that signals when a safe-mode transition is detected.
31    safe_mon: Arc<Notify>,
32    /// Channel for sending newly discovered zoned objectives to the main scheduling system.
33    zo_mon: mpsc::Sender<KnownImgObjective>,
34    /// Channel for sending active beacon objectives to the main scheduling system.
35    bo_mon: mpsc::Sender<BeaconObjective>,
36    /// Broadcast channel for relaying real-time mission announcements or telemetry updates.
37    event_hub: broadcast::Sender<(DateTime<Utc>, String)>,
38    /// In-memory buffer of currently known secret imaging objectives that await triggering.
39    current_secret_objectives: RwLock<Vec<ImageObjective>>,
40}
41
42impl Supervisor {
43    /// Constant update interval for observation updates in the `run()` method
44    const OBS_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
45    /// Constant update interval for objective updates in the `run()` method
46    const OBJ_UPDATE_INTERVAL: TimeDelta = TimeDelta::seconds(15);
47    /// Constant minimum time delta to the objective start for sending the objective to `main`
48    const B_O_MIN_DT: TimeDelta = TimeDelta::minutes(20);
49    /// Environment variable used to skip known objectives by ID (comma-separated).
50    const ENV_SKIP_OBJ: &'static str = "SKIP_OBJ";
51
52    /// Creates a new [`Supervisor`] instance and returns associated receivers
53    /// for zoned and beacon objectives.
54    ///
55    /// # Arguments
56    /// * `f_cont_lock` – Shared lock to the flight computer state.
57    ///
58    /// # Returns
59    /// Tuple of ([`Supervisor`], `zo_receiver`, `bo_receiver`)
60    pub(crate) fn new(
61        f_cont_lock: Arc<RwLock<FlightComputer>>,
62    ) -> (
63        Supervisor,
64        Receiver<KnownImgObjective>,
65        Receiver<BeaconObjective>,
66    ) {
67        let (tx_obj, rx_obj) = mpsc::channel(10);
68        let (tx_beac, rx_beac) = mpsc::channel(10);
69        let (event_send, _) = broadcast::channel(10);
70        (
71            Self {
72                f_cont_lock,
73                safe_mon: Arc::new(Notify::new()),
74                zo_mon: tx_obj,
75                bo_mon: tx_beac,
76                event_hub: event_send,
77                current_secret_objectives: RwLock::new(vec![]),
78            },
79            rx_obj,
80            rx_beac,
81        )
82    }
83
84    /// Returns a clone of the safe-mode notifier.
85    pub(crate) fn safe_mon(&self) -> Arc<Notify> { Arc::clone(&self.safe_mon) }
86
87    /// Subscribes to the event hub to receive mission announcement broadcasts.
88    pub(crate) fn subscribe_event_hub(&self) -> broadcast::Receiver<(DateTime<Utc>, String)> {
89        self.event_hub.subscribe()
90    }
91
92    /// Listens to the `/announcements` Event Source endpoint and broadcasts messages to subscribers.
93    ///
94    /// Automatically closes on error and logs termination as fatal.
95    pub(crate) async fn run_announcement_hub(&self) {
96        let url = {
97            let client = self.f_cont_lock.read().await.client();
98            client.url().to_string()
99        };
100        let mut es = EventSource::get(url + "/announcements");
101        while let Some(event) = es.next().await {
102            match event {
103                Ok(Event::Open) => log!("Starting event supervisor loop!"),
104                Ok(Event::Message(msg)) => {
105                    let msg_str = format!("{msg:#?}");
106                    if self.event_hub.send((Utc::now(), msg_str)).is_err() {
107                        event!("No Receiver for: {msg:#?}");
108                    }
109                }
110                Err(err) => {
111                    error!("EventSource error: {err}");
112                    es.close();
113                }
114            }
115        }
116        fatal!("EventSource disconnected!");
117    }
118
119    /// Triggers daily full map export and upload at 22:55 UTC.
120    ///
121    /// This repeats daily and logs errors upon failure.
122    ///
123    /// # Arguments
124    /// * `c_cont` – Shared reference to the `CameraController`.
125    pub(crate) async fn run_daily_map_uploader(&self, c_cont: Arc<CameraController>) {
126        let now = Utc::now();
127        let end_of_day = NaiveTime::from_hms_opt(22, 55, 0).unwrap();
128        let upload_t = now.date_naive().and_time(end_of_day);
129        let mut next_upload_t = Utc.from_utc_datetime(&upload_t);
130        loop {
131            let next_upload_dt = (next_upload_t - Utc::now()).to_std().unwrap_or(DT_0_STD);
132            tokio::time::sleep(next_upload_dt).await;
133            c_cont.export_full_snapshot().await.unwrap_or_else(|e| {
134                error!("Error exporting full snapshot: {e}.");
135            });
136            c_cont.upload_daily_map_png().await.unwrap_or_else(|e| {
137                error!("Error uploading Daily Map: {e}.");
138            });
139            info!("Successfully uploaded Daily Map!");
140            next_upload_t = next_upload_t.checked_add_signed(TimeDelta::days(1)).unwrap();
141        }
142    }
143
144    /// Receive and schedule a secret objective `id` and assigns coordinates to it if valid.
145    /// This is called by the user console when assigning a zone to a secret objective.
146    ///
147    /// Only triggers if the objective is currently active and not expired.
148    ///
149    /// # Arguments
150    /// * `id` – Unique identifier of the objective.
151    /// * `zone` – Assigned coordinates `[x_1, y_1, x_2, y_2]`.
152    pub(crate) async fn schedule_secret_objective(&self, id: usize, zone: [i32; 4]) {
153        let mut secret_obj = self.current_secret_objectives.write().await;
154        if let Some(pos) =
155            secret_obj.iter().position(|obj| obj.id() == id && obj.end() > Utc::now() && obj.start() < Utc::now() + TimeDelta::hours(4))
156        {
157            obj!("Received position instructions for secret objective {id} from console!");
158            let obj = secret_obj.remove(pos);
159            self.zo_mon.send(KnownImgObjective::try_from((obj, zone)).unwrap()).await.unwrap();
160        }
161    }
162
163    /// Main observation loop that:
164    /// - Monitors for safe-mode transitions.
165    /// - Periodically polls objectives from the backend.
166    /// - Filters and sends active objectives to downstream systems.
167    ///
168    /// Includes ID caching, secret filtering, and fail-safe alerts.
169    #[allow(clippy::cast_precision_loss, clippy::too_many_lines)]
170    pub(crate) async fn run_obs_obj_mon(&self) {
171        let mut last_objective_check = Utc::now() - Self::OBJ_UPDATE_INTERVAL;
172        let mut id_list: HashSet<usize> = HashSet::new();
173        Self::prefill_id_list(&mut id_list);
174        log!("Starting obs/obj supervisor loop!");
175        loop {
176            let mut f_cont = self.f_cont_lock.write().await;
177            // Update observation and fetch new position
178            f_cont.update_observation().await;
179            let last_update = Instant::now();
180
181            let is_safe_trans = {
182                let current_state = f_cont.state();
183                let target_state = f_cont.target_state();
184                current_state == FlightState::Transition && target_state.is_none()
185            };
186            if is_safe_trans {
187                warn!("Unplanned Safe Mode Transition Detected! Notifying!");
188                self.safe_mon.notify_one();
189                self.f_cont_lock.write().await.safe_detected();
190            }
191
192            drop(f_cont); // Release the lock early to avoid blocking
193
194            if last_objective_check + Self::OBJ_UPDATE_INTERVAL < Utc::now() {
195                let handle = self.f_cont_lock.read().await.client();
196                let objective_list = ObjectiveListRequest {}.send_request(&handle).await.unwrap();
197                let mut send_img_objs = vec![];
198                let mut send_beac_objs = vec![];
199
200                let mut secret_list = self.current_secret_objectives.write().await;
201                for img_obj in objective_list.img_objectives() {
202                    let obj_on = img_obj.start() < Utc::now() && img_obj.end() > Utc::now();
203                    let is_secret = matches!(img_obj.zone_type(), ZoneType::SecretZone(_));
204                    let is_future = img_obj.start() > Utc::now();
205                    let is_future_short = img_obj.end() < Utc::now() + TimeDelta::hours(5);
206                    if !id_list.contains(&img_obj.id()) {
207                        if is_secret {
208                            secret_list.push(img_obj.clone());
209                            id_list.insert(img_obj.id());
210                        } else if obj_on || (is_future && is_future_short) {
211                            send_img_objs
212                                .push(KnownImgObjective::try_from(img_obj.clone()).unwrap());
213                        }
214                    }
215                }
216                drop(secret_list);
217                for b_o in objective_list.beacon_objectives() {
218                    let obj_on = b_o.start() < Utc::now() && b_o.end() > Utc::now();
219                    if obj_on && !id_list.contains(&b_o.id()) {
220                        send_beac_objs.push(BeaconObjective::from(b_o.clone()));
221                    }
222                }
223                for obj in send_img_objs {
224                    id_list.insert(obj.id());
225                    self.zo_mon.send(obj).await.unwrap();
226                }
227                for beac_obj in send_beac_objs {
228                    id_list.insert(beac_obj.id());
229                    self.bo_mon.send(beac_obj).await.unwrap();
230                }
231                last_objective_check = Utc::now();
232            }
233
234            tokio::time::sleep_until(last_update + Self::OBS_UPDATE_INTERVAL).await;
235        }
236    }
237
238    /// Reads the environment variable `SKIP_OBJ` and adds valid IDs to the internal filter list.
239    ///
240    /// Used to prevent repeat processing of already completed or irrelevant objectives.
241    ///
242    /// # Arguments
243    /// * `id_list` – A mutable reference to the set of objective IDs.
244    fn prefill_id_list(id_list: &mut HashSet<usize>) {
245        let done_ids: Vec<Option<usize>> = env::var(Self::ENV_SKIP_OBJ)
246            .unwrap_or_default()
247            .split(',')
248            .map(str::trim)
249            .filter(|s| !s.is_empty())
250            .map(|s| s.parse::<usize>().ok())
251            .collect();
252        for done_id in done_ids.into_iter().flatten() {
253            info!("Prefilling done obj id list with id: {done_id}");
254            id_list.insert(done_id);
255        }
256    }
257}