1use std::sync::Arc;
2
3use longbridge_httpcli::{HttpClient, Json, Method};
4use serde::{Serialize, de::DeserializeOwned};
5use tracing::{Subscriber, dispatcher, instrument::WithSubscriber};
6
7use crate::{
8 Config, Result,
9 market::types::*,
10 utils::counter::{counter_id_to_symbol, index_symbol_to_counter_id, symbol_to_counter_id},
11};
12
13fn unix_secs_to_rfc3339(ts: i64) -> String {
15 time::OffsetDateTime::from_unix_timestamp(ts)
16 .map(|dt| {
17 use time::format_description::well_known::Rfc3339;
18 dt.format(&Rfc3339).unwrap_or_default()
19 })
20 .unwrap_or_else(|_| ts.to_string())
21}
22
23fn unix_secs_str_to_rfc3339(s: &str) -> String {
25 s.parse::<i64>()
26 .map(unix_secs_to_rfc3339)
27 .unwrap_or_else(|_| s.to_string())
28}
29
30struct InnerMarketContext {
31 http_cli: HttpClient,
32 log_subscriber: Arc<dyn Subscriber + Send + Sync>,
33}
34
35impl Drop for InnerMarketContext {
36 fn drop(&mut self) {
37 dispatcher::with_default(&self.log_subscriber.clone().into(), || {
38 tracing::info!("market context dropped");
39 });
40 }
41}
42
43#[derive(Clone)]
46pub struct MarketContext(Arc<InnerMarketContext>);
47
48impl MarketContext {
49 pub fn new(config: Arc<Config>) -> Self {
51 let log_subscriber = config.create_log_subscriber("market");
52 dispatcher::with_default(&log_subscriber.clone().into(), || {
53 tracing::info!(language = ?config.language, "creating market context");
54 });
55 let ctx = Self(Arc::new(InnerMarketContext {
56 http_cli: config.create_http_client(),
57 log_subscriber,
58 }));
59 dispatcher::with_default(&ctx.0.log_subscriber.clone().into(), || {
60 tracing::info!("market context created");
61 });
62 ctx
63 }
64
65 #[inline]
67 pub fn log_subscriber(&self) -> Arc<dyn Subscriber + Send + Sync> {
68 self.0.log_subscriber.clone()
69 }
70
71 async fn get<R, Q>(&self, path: &'static str, query: Q) -> Result<R>
72 where
73 R: DeserializeOwned + Send + Sync + 'static,
74 Q: Serialize + Send + Sync,
75 {
76 Ok(self
77 .0
78 .http_cli
79 .request(Method::GET, path)
80 .query_params(query)
81 .response::<Json<R>>()
82 .send()
83 .with_subscriber(self.0.log_subscriber.clone())
84 .await?
85 .0)
86 }
87
88 async fn post<R, B>(&self, path: &'static str, body: B) -> Result<R>
89 where
90 R: DeserializeOwned + Send + Sync + 'static,
91 B: std::fmt::Debug + Serialize + Send + Sync + 'static,
92 {
93 Ok(self
94 .0
95 .http_cli
96 .request(Method::POST, path)
97 .body(Json(body))
98 .response::<Json<R>>()
99 .send()
100 .with_subscriber(self.0.log_subscriber.clone())
101 .await?
102 .0)
103 }
104
105 pub async fn market_status(&self) -> Result<MarketStatusResponse> {
111 #[derive(Serialize)]
112 struct Empty {}
113 self.get("/v1/quote/market-status", Empty {}).await
114 }
115
116 pub async fn broker_holding(
122 &self,
123 symbol: impl Into<String>,
124 period: BrokerHoldingPeriod,
125 ) -> Result<BrokerHoldingTop> {
126 let period_str = match period {
127 BrokerHoldingPeriod::Rct1 => "rct_1",
128 BrokerHoldingPeriod::Rct5 => "rct_5",
129 BrokerHoldingPeriod::Rct20 => "rct_20",
130 BrokerHoldingPeriod::Rct60 => "rct_60",
131 };
132 #[derive(Serialize)]
133 struct Query {
134 counter_id: String,
135 #[serde(rename = "type")]
136 period: &'static str,
137 }
138 self.get(
139 "/v1/quote/broker-holding",
140 Query {
141 counter_id: symbol_to_counter_id(&symbol.into()),
142 period: period_str,
143 },
144 )
145 .await
146 }
147
148 pub async fn broker_holding_detail(
152 &self,
153 symbol: impl Into<String>,
154 ) -> Result<BrokerHoldingDetail> {
155 #[derive(Serialize)]
156 struct Query {
157 counter_id: String,
158 }
159 self.get(
160 "/v1/quote/broker-holding/detail",
161 Query {
162 counter_id: symbol_to_counter_id(&symbol.into()),
163 },
164 )
165 .await
166 }
167
168 pub async fn broker_holding_daily(
172 &self,
173 symbol: impl Into<String>,
174 broker_id: impl Into<String>,
175 ) -> Result<BrokerHoldingDailyHistory> {
176 #[derive(Serialize)]
177 struct Query {
178 counter_id: String,
179 parti_number: String,
180 }
181 self.get(
182 "/v1/quote/broker-holding/daily",
183 Query {
184 counter_id: symbol_to_counter_id(&symbol.into()),
185 parti_number: broker_id.into(),
186 },
187 )
188 .await
189 }
190
191 pub async fn ah_premium(
197 &self,
198 symbol: impl Into<String>,
199 period: AhPremiumPeriod,
200 count: u32,
201 ) -> Result<AhPremiumKlines> {
202 #[derive(Serialize)]
203 struct Query {
204 counter_id: String,
205 line_type: &'static str,
206 line_num: u32,
207 }
208 self.get(
209 "/v1/quote/ahpremium/klines",
210 Query {
211 counter_id: symbol_to_counter_id(&symbol.into()),
212 line_type: period.to_line_type(),
213 line_num: count,
214 },
215 )
216 .await
217 }
218
219 pub async fn ah_premium_intraday(
223 &self,
224 symbol: impl Into<String>,
225 ) -> Result<AhPremiumIntraday> {
226 #[derive(Serialize)]
227 struct Query {
228 counter_id: String,
229 days: &'static str,
230 }
231 self.get(
232 "/v1/quote/ahpremium/timeshares",
233 Query {
234 counter_id: symbol_to_counter_id(&symbol.into()),
235 days: "1",
236 },
237 )
238 .await
239 }
240
241 pub async fn trade_stats(&self, symbol: impl Into<String>) -> Result<TradeStatsResponse> {
247 #[derive(Serialize)]
248 struct Query {
249 counter_id: String,
250 }
251 self.get(
252 "/v1/quote/trades-statistics",
253 Query {
254 counter_id: symbol_to_counter_id(&symbol.into()),
255 },
256 )
257 .await
258 }
259
260 pub async fn anomaly(&self, market: impl Into<String>) -> Result<AnomalyResponse> {
266 #[derive(Serialize)]
267 struct Query {
268 market: String,
269 category: &'static str,
270 }
271 self.get(
272 "/v1/quote/changes",
273 Query {
274 market: market.into().to_uppercase(),
275 category: "0",
276 },
277 )
278 .await
279 }
280
281 pub async fn constituent(&self, symbol: impl Into<String>) -> Result<IndexConstituents> {
289 #[derive(Serialize)]
290 struct Query {
291 counter_id: String,
292 }
293 self.get(
294 "/v1/quote/index-constituents",
295 Query {
296 counter_id: index_symbol_to_counter_id(&symbol.into()),
297 },
298 )
299 .await
300 }
301
302 pub async fn top_movers(
312 &self,
313 markets: Vec<String>,
314 sort: u32,
315 date: Option<String>,
316 limit: u32,
317 ) -> Result<TopMoversResponse> {
318 #[derive(Debug, Serialize)]
319 struct Body {
320 limit: u32,
321 sort: u32,
322 markets: Vec<String>,
323 #[serde(skip_serializing_if = "Option::is_none")]
324 date: Option<String>,
325 }
326 let raw: serde_json::Value = self
327 .post(
328 "/v1/quote/market/stock-events",
329 Body {
330 limit,
331 sort,
332 markets,
333 date,
334 },
335 )
336 .await?;
337
338 let events = raw["events"]
339 .as_array()
340 .cloned()
341 .unwrap_or_default()
342 .into_iter()
343 .map(|ev| {
344 let ts = if let Some(n) = ev["timestamp"].as_i64() {
345 unix_secs_to_rfc3339(n)
346 } else if let Some(s) = ev["timestamp"].as_str() {
347 unix_secs_str_to_rfc3339(s)
348 } else {
349 String::new()
350 };
351 let stock_val = &ev["stock"];
352 let stock = TopMoversStock {
353 symbol: counter_id_to_symbol(stock_val["counter_id"].as_str().unwrap_or("")),
354 code: stock_val["code"].as_str().unwrap_or("").to_string(),
355 name: stock_val["name"].as_str().unwrap_or("").to_string(),
356 full_name: stock_val["full_name"].as_str().unwrap_or("").to_string(),
357 change: stock_val["change"].as_str().unwrap_or("").to_string(),
358 last_done: stock_val["last_done"].as_str().unwrap_or("").to_string(),
359 market: stock_val["market"].as_str().unwrap_or("").to_string(),
360 labels: stock_val["labels"]
361 .as_array()
362 .map(|arr| {
363 arr.iter()
364 .filter_map(|l| l.as_str().map(|s| s.to_string()))
365 .collect()
366 })
367 .unwrap_or_default(),
368 logo: stock_val["logo"].as_str().unwrap_or("").to_string(),
369 };
370 TopMoversEvent {
371 timestamp: ts,
372 alert_reason: ev["alert_reason"].as_str().unwrap_or("").to_string(),
373 alert_type: ev["alert_type"].as_i64().unwrap_or(0),
374 stock,
375 post: ev["post"].clone(),
376 }
377 })
378 .collect();
379 let next_params = raw["next_params"].clone();
380 Ok(TopMoversResponse {
381 events,
382 next_params,
383 })
384 }
385
386 pub async fn rank_categories(&self) -> Result<RankCategoriesResponse> {
392 #[derive(Serialize)]
393 struct Empty {}
394 let mut raw: serde_json::Value = self
395 .get("/v1/quote/market/rank/categories", Empty {})
396 .await?;
397 if let Some(tags) = raw["first_tags"].as_array_mut() {
400 for tag in tags.iter_mut() {
401 if let Some(k) = tag["key"].as_str() {
402 let stripped = k.strip_prefix("ib_").unwrap_or(k).to_string();
403 tag["key"] = serde_json::Value::String(stripped);
404 }
405 if let Some(subs) = tag["second_tags"].as_array_mut() {
406 for sub in subs.iter_mut() {
407 if let Some(sk) = sub["key"].as_str() {
408 let stripped = sk.strip_prefix("ib_").unwrap_or(sk).to_string();
409 sub["key"] = serde_json::Value::String(stripped);
410 }
411 }
412 }
413 }
414 }
415 Ok(RankCategoriesResponse { data: raw })
416 }
417
418 pub async fn rank_list(
424 &self,
425 key: impl Into<String>,
426 need_article: bool,
427 ) -> Result<RankListResponse> {
428 #[derive(Serialize)]
429 struct Query {
430 key: String,
431 delay_bmp: &'static str,
432 need_article: &'static str,
433 }
434 let key_str = key.into();
435 let api_key = if key_str.starts_with("ib_") {
437 key_str
438 } else {
439 format!("ib_{key_str}")
440 };
441 let raw: serde_json::Value = self
442 .get(
443 "/v1/quote/market/rank/list",
444 Query {
445 key: api_key,
446 delay_bmp: "false",
447 need_article: if need_article { "true" } else { "false" },
448 },
449 )
450 .await?;
451 let bmp = raw["bmp"].as_bool().unwrap_or(false);
452 let lists = raw["lists"]
453 .as_array()
454 .cloned()
455 .unwrap_or_default()
456 .into_iter()
457 .map(|item| RankListItem {
458 symbol: counter_id_to_symbol(item["counter_id"].as_str().unwrap_or("")),
459 code: item["code"].as_str().unwrap_or("").to_string(),
460 name: item["name"].as_str().unwrap_or("").to_string(),
461 last_done: item["last_done"].as_str().unwrap_or("").to_string(),
462 chg: item["chg"].as_str().unwrap_or("").to_string(),
463 change: item["change"].as_str().unwrap_or("").to_string(),
464 inflow: item["inflow"].as_str().unwrap_or("").to_string(),
465 market_cap: item["market_cap"].as_str().unwrap_or("").to_string(),
466 industry: item["industry"].as_str().unwrap_or("").to_string(),
467 pre_post_price: item["pre_post_price"].as_str().unwrap_or("").to_string(),
468 pre_post_chg: item["pre_post_chg"].as_str().unwrap_or("").to_string(),
469 amplitude: item["amplitude"].as_str().unwrap_or("").to_string(),
470 five_day_chg: item["five_day_chg"].as_str().unwrap_or("").to_string(),
471 turnover_rate: item["turnover_rate"].as_str().unwrap_or("").to_string(),
472 volume_rate: item["volume_rate"].as_str().unwrap_or("").to_string(),
473 pb_ttm: item["pb_ttm"].as_str().unwrap_or("").to_string(),
474 })
475 .collect();
476 Ok(RankListResponse { bmp, lists })
477 }
478}