mcaptcha/
survey.rs

1// Copyright (C) 2022  Aravinth Manivannan <realaravinth@batsense.net>
2// SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
3//
4// SPDX-License-Identifier: AGPL-3.0-or-later
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::RwLock;
8use std::time::Duration;
9
10use reqwest::Client;
11use serde::{Deserialize, Serialize};
12use tokio::sync::oneshot;
13use tokio::task::JoinHandle;
14use tokio::time::sleep;
15
16use crate::errors::*;
17use crate::settings::Settings;
18use crate::AppData;
19use crate::V1_API_ROUTES;
20
21#[async_trait::async_trait]
22pub trait SurveyClientTrait {
23    async fn start_job(&self) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)>;
24    async fn schedule_upload_job(&self) -> ServiceResult<()>;
25    async fn is_online(&self) -> ServiceResult<bool>;
26    async fn register(&self) -> ServiceResult<()>;
27}
28
29#[derive(Clone, Debug, Default)]
30pub struct SecretsStore {
31    store: Arc<RwLock<HashMap<String, String>>>,
32}
33
34impl SecretsStore {
35    pub fn get(&self, key: &str) -> Option<String> {
36        let r = self.store.read().unwrap();
37        r.get(key).map(|x| x.to_owned())
38    }
39
40    pub fn rm(&self, key: &str) {
41        let mut w = self.store.write().unwrap();
42        w.remove(key);
43        drop(w);
44    }
45
46    pub fn set(&self, key: String, value: String) {
47        let mut w = self.store.write().unwrap();
48        w.insert(key, value);
49        drop(w);
50    }
51}
52
53#[derive(Clone)]
54pub struct Survey {
55    client: Client,
56    app_ctx: AppData,
57}
58impl Survey {
59    pub fn new(app_ctx: AppData) -> Self {
60        if app_ctx.settings.survey.is_none() {
61            panic!("Survey uploader shouldn't be initialized it isn't configured, please report this bug")
62        }
63        Survey {
64            client: Client::new(),
65            app_ctx,
66        }
67    }
68}
69
70#[async_trait::async_trait]
71impl SurveyClientTrait for Survey {
72    async fn start_job(&self) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)> {
73        fn can_run(rx: &mut oneshot::Receiver<()>) -> bool {
74            match rx.try_recv() {
75                Err(oneshot::error::TryRecvError::Empty) => true,
76                _ => false,
77            }
78        }
79
80        let (tx, mut rx) = oneshot::channel();
81        let this = self.clone();
82        let mut register = false;
83        let fut = async move {
84            loop {
85                if !can_run(&mut rx) {
86                    log::info!("Stopping survey uploads");
87                    break;
88                }
89
90                if !register {
91                    loop {
92                        if this.is_online().await.unwrap() {
93                            this.register().await.unwrap();
94                            register = true;
95                            break;
96                        } else {
97                            sleep(Duration::new(1, 0)).await;
98                        }
99                    }
100                }
101
102                for i in 0..this.app_ctx.settings.survey.as_ref().unwrap().rate_limit {
103                    if !can_run(&mut rx) {
104                        log::info!("Stopping survey uploads");
105                        break;
106                    }
107                    sleep(Duration::new(1, 0)).await;
108                }
109                let _ = this.schedule_upload_job().await;
110
111                // for url in this.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() {
112                //     if !can_run(&mut rx) {
113                //         log::info!("Stopping survey uploads");
114                //         break;
115                //     }
116                //     log::info!("Uploading to survey instance {}", url);
117                // }
118            }
119        };
120        let handle = tokio::spawn(fut);
121        Ok((tx, handle))
122    }
123    async fn is_online(&self) -> ServiceResult<bool> {
124        let res = self
125            .client
126            .get(format!(
127                "http://{}{}",
128                self.app_ctx.settings.server.get_ip(),
129                V1_API_ROUTES.meta.health
130            ))
131            .send()
132            .await
133            .unwrap();
134        Ok(res.status() == 200)
135    }
136
137    async fn schedule_upload_job(&self) -> ServiceResult<()> {
138        log::debug!("Running upload job");
139        #[derive(Serialize)]
140        struct Secret {
141            secret: String,
142        }
143        let mut page = 0;
144        loop {
145            let psuedo_ids = self.app_ctx.db.analytics_get_all_psuedo_ids(page).await?;
146            if psuedo_ids.is_empty() {
147                log::debug!("upload job complete, no more IDs to upload");
148                break;
149            }
150            for id in psuedo_ids {
151                for url in self.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() {
152                    if let Some(secret) = self.app_ctx.survey_secrets.get(url.as_str()) {
153                        let payload = Secret { secret };
154
155                        log::info!("Uploading to survey instance {} campaign {id}", url);
156                        let mut url = url.clone();
157                        url.set_path(&format!("/mcaptcha/api/v1/{id}/upload"));
158                        let resp =
159                            self.client.post(url).json(&payload).send().await.unwrap();
160                        println!("{}", resp.text().await.unwrap());
161                    }
162                }
163            }
164            page += 1;
165        }
166        Ok(())
167    }
168
169    async fn register(&self) -> ServiceResult<()> {
170        #[derive(Serialize)]
171        struct MCaptchaInstance {
172            url: url::Url,
173            auth_token: String,
174        }
175
176        let this_instance_url = self
177            .app_ctx
178            .settings
179            .survey
180            .as_ref()
181            .unwrap()
182            .instance_root_url
183            .clone();
184        for url in self.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() {
185            // mCaptcha/survey must send this token while uploading secret to authenticate itself
186            // this token must be sent to mCaptcha/survey with the registration payload
187            let secret_upload_auth_token = crate::api::v1::mcaptcha::get_random(20);
188
189            let payload = MCaptchaInstance {
190                url: this_instance_url.clone(),
191                auth_token: secret_upload_auth_token.clone(),
192            };
193
194            // SecretsStore will store auth tokens generated by both mCaptcha/mCaptcha and
195            // mCaptcha/survey
196            //
197            // Storage schema:
198            // - mCaptcha/mCaptcha generated auth token: (<auth_token>, <survey_instance_url>)
199            // - mCaptcha/survey generated auth token (<survey_instance_url>, <auth_token)
200            self.app_ctx
201                .survey_secrets
202                .set(secret_upload_auth_token, url.to_string());
203            let mut url = url.clone();
204            url.set_path("/mcaptcha/api/v1/register");
205            let resp = self.client.post(url).json(&payload).send().await.unwrap();
206        }
207        Ok(())
208    }
209}