corelib_rust/utils/
include_exclude_cron.rs

1// =============================================
2// FILE: rust/src/utils/include_exclude_cron.rs
3// PURPOSE: Dedicated file for the include/exclude cron helper
4// (moved out of utils.rs to follow the request).
5// Exact behavioural mirror of ts-core/src/utils/cron.ts.
6// =============================================
7
8use chrono::{DateTime, Duration, Utc};
9use cron::Schedule;
10use std::str::FromStr;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration as StdDuration;
15
16/// Handle returned by `include_exclude_cron`.
17///
18/// Allows the caller to gracefully stop the background cron thread.
19/// The thread will exit on the next tick after `stop()` is called.
20pub struct CronJobHandle {
21    stop_flag: Arc<AtomicBool>,
22}
23
24impl CronJobHandle {
25    /// Stops the cron job. The background thread will exit cleanly on the next tick.
26    pub fn stop(&self) {
27        self.stop_flag.store(true, Ordering::Relaxed);
28    }
29}
30
31/// Creates and starts a background cron job that ticks every second.
32///
33/// The provided `handler` is executed **only** when:
34/// - **ANY** of the `include_exprs` matches the current time, **AND**
35/// - **NONE** of the `exclude_exprs` matches the current time.
36///
37/// - Crons are pre-parsed once for maximum performance.
38/// - Supports second-level precision (7-field cron format).
39/// - Runs in a dedicated `std::thread` (no Tokio or async runtime required).
40/// - Returns a `CronJobHandle` that can be used to stop the job.
41///
42/// # Panics
43/// Panics (with a clear message) if any cron expression is invalid – same behaviour as the TS version.
44pub fn include_exclude_cron<F>(
45    include_exprs: Vec<String>,
46    exclude_exprs: Vec<String>,
47    handler: F,
48) -> CronJobHandle
49where
50    F: Fn() + Send + Sync + 'static,
51{
52    let handler = Arc::new(handler);
53
54    // Pre-parse all include crons (fast path)
55    let include_schedules: Vec<Schedule> = include_exprs
56        .into_iter()
57        .map(|expr| Schedule::from_str(&expr).expect("invalid include cron expression"))
58        .collect();
59
60    // Pre-parse all exclude crons
61    let exclude_schedules: Vec<Schedule> = exclude_exprs
62        .into_iter()
63        .map(|expr| Schedule::from_str(&expr).expect("invalid exclude cron expression"))
64        .collect();
65
66    let stop_flag = Arc::new(AtomicBool::new(false));
67    let stop_flag_clone = Arc::clone(&stop_flag);
68    let handler_clone = Arc::clone(&handler);
69
70    // Background tick thread
71    thread::spawn(move || {
72        let mut last_fire_ts = 0;
73        loop {
74            if stop_flag_clone.load(Ordering::Relaxed) {
75                break;
76            }
77
78            let now: DateTime<Utc> = Utc::now();
79            let now_ts = now.timestamp();
80
81            if now_ts <= last_fire_ts {
82                // Already handled this second, sleep until next one
83                let next_sec_ts = last_fire_ts + 1;
84                let next_sec = DateTime::from_timestamp(next_sec_ts, 0).expect("invalid timestamp");
85                let sleep_dur = next_sec - Utc::now();
86                if sleep_dur > Duration::zero() {
87                    thread::sleep(StdDuration::from_millis(sleep_dur.num_milliseconds() as u64));
88                } else {
89                    thread::sleep(StdDuration::from_millis(10));
90                }
91                continue;
92            }
93
94            let test_time = now - Duration::seconds(1);
95
96            // ANY include matches?
97            let included = include_schedules.iter().any(|schedule| {
98                let mut upcoming = schedule.after(&test_time);
99                upcoming
100                    .next()
101                    .is_some_and(|next| next.timestamp() == now_ts)
102            });
103
104            if !included {
105                // Not a fire second for us, sleep a bit but check again soon-ish
106                // to stay aligned with the start of seconds.
107                thread::sleep(StdDuration::from_millis(100));
108                continue;
109            }
110
111            // ANY exclude matches?
112            let excluded = exclude_schedules.iter().any(|schedule| {
113                let mut upcoming = schedule.after(&test_time);
114                upcoming
115                    .next()
116                    .is_some_and(|next| next.timestamp() == now_ts)
117            });
118
119            if !excluded {
120                (handler_clone)();
121            }
122
123            last_fire_ts = now_ts;
124        }
125    });
126
127    CronJobHandle { stop_flag }
128}
129
130// =============================================
131// EXHAUSTIVE TESTS
132// =============================================
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use std::sync::atomic::{AtomicUsize, Ordering};
138    use std::time::Duration;
139
140    #[test]
141    fn every_second_include_no_exclude_handler_is_called() {
142        let counter = Arc::new(AtomicUsize::new(0));
143        let c = Arc::clone(&counter);
144
145        let handle = include_exclude_cron(vec!["* * * * * * *".to_string()], vec![], move || {
146            c.fetch_add(1, Ordering::SeqCst);
147        });
148
149        thread::sleep(Duration::from_secs(4));
150        handle.stop();
151
152        let calls = counter.load(Ordering::SeqCst);
153        assert!(calls >= 3 && calls <= 5);
154    }
155
156    #[test]
157    fn exclude_blocks_execution() {
158        let counter = Arc::new(AtomicUsize::new(0));
159        let c = Arc::clone(&counter);
160
161        let handle = include_exclude_cron(
162            vec!["* * * * * * *".to_string()],
163            vec!["* * * * * * *".to_string()],
164            move || {
165                c.fetch_add(1, Ordering::SeqCst);
166            },
167        );
168
169        thread::sleep(Duration::from_secs(3));
170        handle.stop();
171
172        assert_eq!(counter.load(Ordering::SeqCst), 0);
173    }
174
175    #[test]
176    fn fires_only_on_specific_seconds() {
177        let counter = Arc::new(AtomicUsize::new(0));
178        let c = Arc::clone(&counter);
179
180        // Fire every 2 seconds
181        let handle = include_exclude_cron(vec!["*/2 * * * * * *".to_string()], vec![], move || {
182            c.fetch_add(1, Ordering::SeqCst);
183        });
184
185        thread::sleep(Duration::from_secs(6));
186        handle.stop();
187
188        let calls = counter.load(Ordering::SeqCst);
189        // In 6 seconds, every 2 seconds should fire ~3 times
190        assert!(
191            calls >= 2 && calls <= 4,
192            "Should have fired ~3 times in 6 seconds (every 2s), got {}",
193            calls
194        );
195    }
196
197    #[test]
198    fn empty_include_never_runs() {
199        let counter = Arc::new(AtomicUsize::new(0));
200        let c = Arc::clone(&counter);
201
202        let handle = include_exclude_cron(vec![], vec![], move || {
203            c.fetch_add(1, Ordering::SeqCst);
204        });
205
206        thread::sleep(Duration::from_secs(3));
207        handle.stop();
208
209        assert_eq!(counter.load(Ordering::SeqCst), 0);
210    }
211
212    #[test]
213    fn stop_prevents_further_execution() {
214        let counter = Arc::new(AtomicUsize::new(0));
215        let c = Arc::clone(&counter);
216
217        let handle = include_exclude_cron(vec!["* * * * * * *".to_string()], vec![], move || {
218            c.fetch_add(1, Ordering::SeqCst);
219        });
220
221        thread::sleep(Duration::from_secs(2));
222        handle.stop();
223        let before = counter.load(Ordering::SeqCst);
224
225        thread::sleep(Duration::from_secs(3));
226        assert_eq!(before, counter.load(Ordering::SeqCst));
227    }
228
229    #[test]
230    #[should_panic(expected = "invalid include cron expression")]
231    fn invalid_include_cron_panics() {
232        let _ = include_exclude_cron(vec!["invalid".to_string()], vec![], || {});
233    }
234
235    #[test]
236    #[should_panic(expected = "invalid exclude cron expression")]
237    fn invalid_exclude_cron_panics() {
238        let _ = include_exclude_cron(
239            vec!["* * * * * * *".to_string()],
240            vec!["bad".to_string()],
241            || {},
242        );
243    }
244}