corelib_rust/utils/
include_exclude_cron.rs1use chrono::{DateTime, Duration, Utc};
9use cron::Schedule;
10use std::str::FromStr;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration as StdDuration;
15
16pub struct CronJobHandle {
21 stop_flag: Arc<AtomicBool>,
22}
23
24impl CronJobHandle {
25 pub fn stop(&self) {
27 self.stop_flag.store(true, Ordering::Relaxed);
28 }
29}
30
31pub fn include_exclude_cron<F>(
45 include_exprs: Vec<String>,
46 exclude_exprs: Vec<String>,
47 handler: F,
48) -> CronJobHandle
49where
50 F: Fn() + Send + Sync + 'static,
51{
52 let handler = Arc::new(handler);
53
54 let include_schedules: Vec<Schedule> = include_exprs
56 .into_iter()
57 .map(|expr| Schedule::from_str(&expr).expect("invalid include cron expression"))
58 .collect();
59
60 let exclude_schedules: Vec<Schedule> = exclude_exprs
62 .into_iter()
63 .map(|expr| Schedule::from_str(&expr).expect("invalid exclude cron expression"))
64 .collect();
65
66 let stop_flag = Arc::new(AtomicBool::new(false));
67 let stop_flag_clone = Arc::clone(&stop_flag);
68 let handler_clone = Arc::clone(&handler);
69
70 thread::spawn(move || {
72 let mut last_fire_ts = 0;
73 loop {
74 if stop_flag_clone.load(Ordering::Relaxed) {
75 break;
76 }
77
78 let now: DateTime<Utc> = Utc::now();
79 let now_ts = now.timestamp();
80
81 if now_ts <= last_fire_ts {
82 let next_sec_ts = last_fire_ts + 1;
84 let next_sec = DateTime::from_timestamp(next_sec_ts, 0).expect("invalid timestamp");
85 let sleep_dur = next_sec - Utc::now();
86 if sleep_dur > Duration::zero() {
87 thread::sleep(StdDuration::from_millis(sleep_dur.num_milliseconds() as u64));
88 } else {
89 thread::sleep(StdDuration::from_millis(10));
90 }
91 continue;
92 }
93
94 let test_time = now - Duration::seconds(1);
95
96 let included = include_schedules.iter().any(|schedule| {
98 let mut upcoming = schedule.after(&test_time);
99 upcoming
100 .next()
101 .is_some_and(|next| next.timestamp() == now_ts)
102 });
103
104 if !included {
105 thread::sleep(StdDuration::from_millis(100));
108 continue;
109 }
110
111 let excluded = exclude_schedules.iter().any(|schedule| {
113 let mut upcoming = schedule.after(&test_time);
114 upcoming
115 .next()
116 .is_some_and(|next| next.timestamp() == now_ts)
117 });
118
119 if !excluded {
120 (handler_clone)();
121 }
122
123 last_fire_ts = now_ts;
124 }
125 });
126
127 CronJobHandle { stop_flag }
128}
129
130#[cfg(test)]
135mod tests {
136 use super::*;
137 use std::sync::atomic::{AtomicUsize, Ordering};
138 use std::time::Duration;
139
140 #[test]
141 fn every_second_include_no_exclude_handler_is_called() {
142 let counter = Arc::new(AtomicUsize::new(0));
143 let c = Arc::clone(&counter);
144
145 let handle = include_exclude_cron(vec!["* * * * * * *".to_string()], vec![], move || {
146 c.fetch_add(1, Ordering::SeqCst);
147 });
148
149 thread::sleep(Duration::from_secs(4));
150 handle.stop();
151
152 let calls = counter.load(Ordering::SeqCst);
153 assert!(calls >= 3 && calls <= 5);
154 }
155
156 #[test]
157 fn exclude_blocks_execution() {
158 let counter = Arc::new(AtomicUsize::new(0));
159 let c = Arc::clone(&counter);
160
161 let handle = include_exclude_cron(
162 vec!["* * * * * * *".to_string()],
163 vec!["* * * * * * *".to_string()],
164 move || {
165 c.fetch_add(1, Ordering::SeqCst);
166 },
167 );
168
169 thread::sleep(Duration::from_secs(3));
170 handle.stop();
171
172 assert_eq!(counter.load(Ordering::SeqCst), 0);
173 }
174
175 #[test]
176 fn fires_only_on_specific_seconds() {
177 let counter = Arc::new(AtomicUsize::new(0));
178 let c = Arc::clone(&counter);
179
180 let handle = include_exclude_cron(vec!["*/2 * * * * * *".to_string()], vec![], move || {
182 c.fetch_add(1, Ordering::SeqCst);
183 });
184
185 thread::sleep(Duration::from_secs(6));
186 handle.stop();
187
188 let calls = counter.load(Ordering::SeqCst);
189 assert!(
191 calls >= 2 && calls <= 4,
192 "Should have fired ~3 times in 6 seconds (every 2s), got {}",
193 calls
194 );
195 }
196
197 #[test]
198 fn empty_include_never_runs() {
199 let counter = Arc::new(AtomicUsize::new(0));
200 let c = Arc::clone(&counter);
201
202 let handle = include_exclude_cron(vec![], vec![], move || {
203 c.fetch_add(1, Ordering::SeqCst);
204 });
205
206 thread::sleep(Duration::from_secs(3));
207 handle.stop();
208
209 assert_eq!(counter.load(Ordering::SeqCst), 0);
210 }
211
212 #[test]
213 fn stop_prevents_further_execution() {
214 let counter = Arc::new(AtomicUsize::new(0));
215 let c = Arc::clone(&counter);
216
217 let handle = include_exclude_cron(vec!["* * * * * * *".to_string()], vec![], move || {
218 c.fetch_add(1, Ordering::SeqCst);
219 });
220
221 thread::sleep(Duration::from_secs(2));
222 handle.stop();
223 let before = counter.load(Ordering::SeqCst);
224
225 thread::sleep(Duration::from_secs(3));
226 assert_eq!(before, counter.load(Ordering::SeqCst));
227 }
228
229 #[test]
230 #[should_panic(expected = "invalid include cron expression")]
231 fn invalid_include_cron_panics() {
232 let _ = include_exclude_cron(vec!["invalid".to_string()], vec![], || {});
233 }
234
235 #[test]
236 #[should_panic(expected = "invalid exclude cron expression")]
237 fn invalid_exclude_cron_panics() {
238 let _ = include_exclude_cron(
239 vec!["* * * * * * *".to_string()],
240 vec!["bad".to_string()],
241 || {},
242 );
243 }
244}