1use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use parking_lot::RwLock;
10use tracing::{debug, info, warn};
11
12use super::health::HealthState;
13use super::strategy::{FailoverStrategy, FallbackBehavior, StrategySelector};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
17pub struct CaId(pub String);
18
19impl std::fmt::Display for CaId {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 f.write_str(&self.0)
22 }
23}
24
25impl std::borrow::Borrow<str> for CaId {
26 fn borrow(&self) -> &str {
27 &self.0
28 }
29}
30
31impl std::borrow::Borrow<String> for CaId {
32 fn borrow(&self) -> &String {
33 &self.0
34 }
35}
36
37impl From<String> for CaId {
38 fn from(s: String) -> Self {
39 Self(s)
40 }
41}
42
43impl From<&str> for CaId {
44 fn from(s: &str) -> Self {
45 Self(s.to_owned())
46 }
47}
48
49#[derive(Debug, Clone)]
51pub struct CaStatus {
52 pub health: HealthState,
54 pub consecutive_failures: u32,
56 pub last_success: Option<Instant>,
58 pub circuit_open_since: Option<Instant>,
60 pub latency_ema_ms: f64,
62 pub last_latency: Option<Duration>,
64}
65
66impl CaStatus {
67 fn new() -> Self {
68 Self {
69 health: HealthState::Healthy,
70 consecutive_failures: 0,
71 last_success: None,
72 circuit_open_since: None,
73 latency_ema_ms: 0.0,
74 last_latency: None,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct CaConnection {
82 pub id: CaId,
84 pub endpoint: String,
86 pub weight: u32,
88 pub priority: u32,
90}
91
92#[derive(Debug, Clone)]
94pub struct CircuitBreakerConfig {
95 pub failure_threshold: u32,
97 pub cooldown: Duration,
99}
100
101impl Default for CircuitBreakerConfig {
102 fn default() -> Self {
103 Self {
104 failure_threshold: 3,
105 cooldown: Duration::from_secs(60),
106 }
107 }
108}
109
110#[derive(Debug, Clone)]
112pub struct PoolConfig {
113 pub strategy: FailoverStrategy,
115 pub fallback: FallbackBehavior,
117 pub circuit_breaker: CircuitBreakerConfig,
119}
120
121pub struct CaPool {
127 connections: Vec<CaConnection>,
129 statuses: RwLock<HashMap<CaId, CaStatus>>,
131 config: PoolConfig,
133 selector: StrategySelector,
135}
136
137impl CaPool {
138 pub fn new(connections: Vec<CaConnection>, config: PoolConfig) -> Self {
140 let mut statuses = HashMap::new();
141 for conn in &connections {
142 statuses.insert(conn.id.clone(), CaStatus::new());
143 }
144 let selector = StrategySelector::new(config.strategy.clone());
145
146 Self {
147 connections,
148 statuses: RwLock::new(statuses),
149 config,
150 selector,
151 }
152 }
153
154 pub fn select(&self) -> Option<CaConnection> {
159 let statuses = self.statuses.read();
160 let healthy: Vec<&CaConnection> = self
161 .connections
162 .iter()
163 .filter(|c| {
164 statuses
165 .get(&c.id)
166 .map(|s| s.health.is_available())
167 .unwrap_or(false)
168 })
169 .collect();
170
171 if healthy.is_empty() {
172 warn!("no healthy CA backends available");
173 return match self.config.fallback {
174 FallbackBehavior::Reject => None,
175 FallbackBehavior::QueueAndRetry => {
176 warn!("queue-and-retry fallback not yet implemented; rejecting");
179 None
180 }
181 };
182 }
183
184 let snapshot: Vec<(&CaConnection, &CaStatus)> = healthy
185 .iter()
186 .filter_map(|c| statuses.get(&c.id).map(|s| (*c, s)))
187 .collect();
188
189 self.selector.select(&snapshot)
190 }
191
192 pub fn record_success(&self, id: &CaId, latency: Duration) {
194 let mut statuses = self.statuses.write();
195 if let Some(status) = statuses.get_mut(id) {
196 status.consecutive_failures = 0;
197 status.last_success = Some(Instant::now());
198 status.circuit_open_since = None;
199 status.last_latency = Some(latency);
200
201 let ms = latency.as_secs_f64() * 1000.0;
202 status.latency_ema_ms = status.latency_ema_ms * 0.7 + ms * 0.3;
204
205 if status.health != HealthState::Healthy {
206 info!(ca = %id, "CA recovered, marking healthy");
207 status.health = HealthState::Healthy;
208 }
209 }
210 }
211
212 pub fn record_failure(&self, id: &CaId) {
214 let mut statuses = self.statuses.write();
215 if let Some(status) = statuses.get_mut(id) {
216 status.consecutive_failures += 1;
217 debug!(
218 ca = %id,
219 failures = status.consecutive_failures,
220 "CA request failed"
221 );
222
223 if status.consecutive_failures >= self.config.circuit_breaker.failure_threshold {
224 if status.health != HealthState::Unavailable {
225 warn!(
226 ca = %id,
227 failures = status.consecutive_failures,
228 "circuit breaker tripped, marking CA unavailable"
229 );
230 status.health = HealthState::Unavailable;
231 status.circuit_open_since = Some(Instant::now());
232 }
233 } else if status.health == HealthState::Healthy {
234 status.health = HealthState::Degraded;
235 }
236 }
237 }
238
239 pub fn should_reprobe(&self, id: &CaId) -> bool {
241 let statuses = self.statuses.read();
242 statuses.get(id).is_some_and(|s| {
243 s.circuit_open_since
244 .is_some_and(|opened| opened.elapsed() >= self.config.circuit_breaker.cooldown)
245 })
246 }
247
248 pub fn set_health(&self, id: &CaId, state: HealthState) {
250 let mut statuses = self.statuses.write();
251 if let Some(status) = statuses.get_mut(id) {
252 let prev = status.health.clone();
253 status.health = state.clone();
254 if prev != state {
255 info!(ca = %id, from = ?prev, to = ?state, "CA health state transition");
256 }
257 }
258 }
259
260 pub fn status_snapshot(&self) -> HashMap<CaId, CaStatus> {
262 self.statuses.read().clone()
263 }
264
265 pub fn connections(&self) -> &[CaConnection] {
267 &self.connections
268 }
269
270 pub fn config(&self) -> &PoolConfig {
272 &self.config
273 }
274}