melvin_ob/flight_control/
supervisor.rs1use super::{FlightComputer, FlightState};
2use crate::imaging::CameraController;
3use crate::objective::{BeaconObjective, KnownImgObjective};
4use crate::http_handler::{
5 ZoneType, ImageObjective,
6 http_request::{
7 objective_list_get::ObjectiveListRequest, request_common::NoBodyHTTPRequestType,
8 },
9};
10use crate::{DT_0_STD, error, event, fatal, info, log, warn, obj};
11use chrono::{DateTime, NaiveTime, TimeDelta, TimeZone, Utc};
12use futures::StreamExt;
13use reqwest_eventsource::{Event, EventSource};
14use std::{collections::HashSet, env, sync::Arc, time::Duration};
15use tokio::{
16 sync::{Notify, RwLock, broadcast, mpsc, mpsc::Receiver},
17 time::Instant,
18};
19
20pub struct Supervisor {
28 f_cont_lock: Arc<RwLock<FlightComputer>>,
30 safe_mon: Arc<Notify>,
32 zo_mon: mpsc::Sender<KnownImgObjective>,
34 bo_mon: mpsc::Sender<BeaconObjective>,
36 event_hub: broadcast::Sender<(DateTime<Utc>, String)>,
38 current_secret_objectives: RwLock<Vec<ImageObjective>>,
40}
41
42impl Supervisor {
43 const OBS_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
45 const OBJ_UPDATE_INTERVAL: TimeDelta = TimeDelta::seconds(15);
47 const B_O_MIN_DT: TimeDelta = TimeDelta::minutes(20);
49 const ENV_SKIP_OBJ: &'static str = "SKIP_OBJ";
51
52 pub(crate) fn new(
61 f_cont_lock: Arc<RwLock<FlightComputer>>,
62 ) -> (
63 Supervisor,
64 Receiver<KnownImgObjective>,
65 Receiver<BeaconObjective>,
66 ) {
67 let (tx_obj, rx_obj) = mpsc::channel(10);
68 let (tx_beac, rx_beac) = mpsc::channel(10);
69 let (event_send, _) = broadcast::channel(10);
70 (
71 Self {
72 f_cont_lock,
73 safe_mon: Arc::new(Notify::new()),
74 zo_mon: tx_obj,
75 bo_mon: tx_beac,
76 event_hub: event_send,
77 current_secret_objectives: RwLock::new(vec![]),
78 },
79 rx_obj,
80 rx_beac,
81 )
82 }
83
84 pub(crate) fn safe_mon(&self) -> Arc<Notify> { Arc::clone(&self.safe_mon) }
86
87 pub(crate) fn subscribe_event_hub(&self) -> broadcast::Receiver<(DateTime<Utc>, String)> {
89 self.event_hub.subscribe()
90 }
91
92 pub(crate) async fn run_announcement_hub(&self) {
96 let url = {
97 let client = self.f_cont_lock.read().await.client();
98 client.url().to_string()
99 };
100 let mut es = EventSource::get(url + "/announcements");
101 while let Some(event) = es.next().await {
102 match event {
103 Ok(Event::Open) => log!("Starting event supervisor loop!"),
104 Ok(Event::Message(msg)) => {
105 let msg_str = format!("{msg:#?}");
106 if self.event_hub.send((Utc::now(), msg_str)).is_err() {
107 event!("No Receiver for: {msg:#?}");
108 }
109 }
110 Err(err) => {
111 error!("EventSource error: {err}");
112 es.close();
113 }
114 }
115 }
116 fatal!("EventSource disconnected!");
117 }
118
119 pub(crate) async fn run_daily_map_uploader(&self, c_cont: Arc<CameraController>) {
126 let now = Utc::now();
127 let end_of_day = NaiveTime::from_hms_opt(22, 55, 0).unwrap();
128 let upload_t = now.date_naive().and_time(end_of_day);
129 let mut next_upload_t = Utc.from_utc_datetime(&upload_t);
130 loop {
131 let next_upload_dt = (next_upload_t - Utc::now()).to_std().unwrap_or(DT_0_STD);
132 tokio::time::sleep(next_upload_dt).await;
133 c_cont.export_full_snapshot().await.unwrap_or_else(|e| {
134 error!("Error exporting full snapshot: {e}.");
135 });
136 c_cont.upload_daily_map_png().await.unwrap_or_else(|e| {
137 error!("Error uploading Daily Map: {e}.");
138 });
139 info!("Successfully uploaded Daily Map!");
140 next_upload_t = next_upload_t.checked_add_signed(TimeDelta::days(1)).unwrap();
141 }
142 }
143
144 pub(crate) async fn schedule_secret_objective(&self, id: usize, zone: [i32; 4]) {
153 let mut secret_obj = self.current_secret_objectives.write().await;
154 if let Some(pos) =
155 secret_obj.iter().position(|obj| obj.id() == id && obj.end() > Utc::now() && obj.start() < Utc::now() + TimeDelta::hours(4))
156 {
157 obj!("Received position instructions for secret objective {id} from console!");
158 let obj = secret_obj.remove(pos);
159 self.zo_mon.send(KnownImgObjective::try_from((obj, zone)).unwrap()).await.unwrap();
160 }
161 }
162
163 #[allow(clippy::cast_precision_loss, clippy::too_many_lines)]
170 pub(crate) async fn run_obs_obj_mon(&self) {
171 let mut last_objective_check = Utc::now() - Self::OBJ_UPDATE_INTERVAL;
172 let mut id_list: HashSet<usize> = HashSet::new();
173 Self::prefill_id_list(&mut id_list);
174 log!("Starting obs/obj supervisor loop!");
175 loop {
176 let mut f_cont = self.f_cont_lock.write().await;
177 f_cont.update_observation().await;
179 let last_update = Instant::now();
180
181 let is_safe_trans = {
182 let current_state = f_cont.state();
183 let target_state = f_cont.target_state();
184 current_state == FlightState::Transition && target_state.is_none()
185 };
186 if is_safe_trans {
187 warn!("Unplanned Safe Mode Transition Detected! Notifying!");
188 self.safe_mon.notify_one();
189 self.f_cont_lock.write().await.safe_detected();
190 }
191
192 drop(f_cont); if last_objective_check + Self::OBJ_UPDATE_INTERVAL < Utc::now() {
195 let handle = self.f_cont_lock.read().await.client();
196 let objective_list = ObjectiveListRequest {}.send_request(&handle).await.unwrap();
197 let mut send_img_objs = vec![];
198 let mut send_beac_objs = vec![];
199
200 let mut secret_list = self.current_secret_objectives.write().await;
201 for img_obj in objective_list.img_objectives() {
202 let obj_on = img_obj.start() < Utc::now() && img_obj.end() > Utc::now();
203 let is_secret = matches!(img_obj.zone_type(), ZoneType::SecretZone(_));
204 let is_future = img_obj.start() > Utc::now();
205 let is_future_short = img_obj.end() < Utc::now() + TimeDelta::hours(5);
206 if !id_list.contains(&img_obj.id()) {
207 if is_secret {
208 secret_list.push(img_obj.clone());
209 id_list.insert(img_obj.id());
210 } else if obj_on || (is_future && is_future_short) {
211 send_img_objs
212 .push(KnownImgObjective::try_from(img_obj.clone()).unwrap());
213 }
214 }
215 }
216 drop(secret_list);
217 for b_o in objective_list.beacon_objectives() {
218 let obj_on = b_o.start() < Utc::now() && b_o.end() > Utc::now();
219 if obj_on && !id_list.contains(&b_o.id()) {
220 send_beac_objs.push(BeaconObjective::from(b_o.clone()));
221 }
222 }
223 for obj in send_img_objs {
224 id_list.insert(obj.id());
225 self.zo_mon.send(obj).await.unwrap();
226 }
227 for beac_obj in send_beac_objs {
228 id_list.insert(beac_obj.id());
229 self.bo_mon.send(beac_obj).await.unwrap();
230 }
231 last_objective_check = Utc::now();
232 }
233
234 tokio::time::sleep_until(last_update + Self::OBS_UPDATE_INTERVAL).await;
235 }
236 }
237
238 fn prefill_id_list(id_list: &mut HashSet<usize>) {
245 let done_ids: Vec<Option<usize>> = env::var(Self::ENV_SKIP_OBJ)
246 .unwrap_or_default()
247 .split(',')
248 .map(str::trim)
249 .filter(|s| !s.is_empty())
250 .map(|s| s.parse::<usize>().ok())
251 .collect();
252 for done_id in done_ids.into_iter().flatten() {
253 info!("Prefilling done obj id list with id: {done_id}");
254 id_list.insert(done_id);
255 }
256 }
257}