Skip to main content

longbridge/market/
context.rs

1use std::sync::Arc;
2
3use longbridge_httpcli::{HttpClient, Json, Method};
4use serde::{Serialize, de::DeserializeOwned};
5use tracing::{Subscriber, dispatcher, instrument::WithSubscriber};
6
7use crate::{
8    Config, Result,
9    market::types::*,
10    utils::counter::{counter_id_to_symbol, index_symbol_to_counter_id, symbol_to_counter_id},
11};
12
13/// Convert a Unix-seconds value (integer or string) to RFC 3339.
14fn unix_secs_to_rfc3339(ts: i64) -> String {
15    time::OffsetDateTime::from_unix_timestamp(ts)
16        .map(|dt| {
17            use time::format_description::well_known::Rfc3339;
18            dt.format(&Rfc3339).unwrap_or_default()
19        })
20        .unwrap_or_else(|_| ts.to_string())
21}
22
23/// Convert a Unix-seconds string to RFC 3339.
24fn unix_secs_str_to_rfc3339(s: &str) -> String {
25    s.parse::<i64>()
26        .map(unix_secs_to_rfc3339)
27        .unwrap_or_else(|_| s.to_string())
28}
29
30struct InnerMarketContext {
31    http_cli: HttpClient,
32    log_subscriber: Arc<dyn Subscriber + Send + Sync>,
33}
34
35impl Drop for InnerMarketContext {
36    fn drop(&mut self) {
37        dispatcher::with_default(&self.log_subscriber.clone().into(), || {
38            tracing::info!("market context dropped");
39        });
40    }
41}
42
43/// Market data context — broker holdings, A/H premium, trade statistics,
44/// market anomalies, index constituents and more.
45#[derive(Clone)]
46pub struct MarketContext(Arc<InnerMarketContext>);
47
48impl MarketContext {
49    /// Create a [`MarketContext`]
50    pub fn new(config: Arc<Config>) -> Self {
51        let log_subscriber = config.create_log_subscriber("market");
52        dispatcher::with_default(&log_subscriber.clone().into(), || {
53            tracing::info!(language = ?config.language, "creating market context");
54        });
55        let ctx = Self(Arc::new(InnerMarketContext {
56            http_cli: config.create_http_client(),
57            log_subscriber,
58        }));
59        dispatcher::with_default(&ctx.0.log_subscriber.clone().into(), || {
60            tracing::info!("market context created");
61        });
62        ctx
63    }
64
65    /// Returns the log subscriber
66    #[inline]
67    pub fn log_subscriber(&self) -> Arc<dyn Subscriber + Send + Sync> {
68        self.0.log_subscriber.clone()
69    }
70
71    async fn get<R, Q>(&self, path: &'static str, query: Q) -> Result<R>
72    where
73        R: DeserializeOwned + Send + Sync + 'static,
74        Q: Serialize + Send + Sync,
75    {
76        Ok(self
77            .0
78            .http_cli
79            .request(Method::GET, path)
80            .query_params(query)
81            .response::<Json<R>>()
82            .send()
83            .with_subscriber(self.0.log_subscriber.clone())
84            .await?
85            .0)
86    }
87
88    async fn post<R, B>(&self, path: &'static str, body: B) -> Result<R>
89    where
90        R: DeserializeOwned + Send + Sync + 'static,
91        B: std::fmt::Debug + Serialize + Send + Sync + 'static,
92    {
93        Ok(self
94            .0
95            .http_cli
96            .request(Method::POST, path)
97            .body(Json(body))
98            .response::<Json<R>>()
99            .send()
100            .with_subscriber(self.0.log_subscriber.clone())
101            .await?
102            .0)
103    }
104
105    // ── market_status ─────────────────────────────────────────────
106
107    /// Get current trading status for all markets.
108    ///
109    /// Path: `GET /v1/quote/market-status`
110    pub async fn market_status(&self) -> Result<MarketStatusResponse> {
111        #[derive(Serialize)]
112        struct Empty {}
113        self.get("/v1/quote/market-status", Empty {}).await
114    }
115
116    // ── broker_holding ────────────────────────────────────────────
117
118    /// Get top broker holdings (buy/sell leaders) for a security.
119    ///
120    /// Path: `GET /v1/quote/broker-holding`
121    pub async fn broker_holding(
122        &self,
123        symbol: impl Into<String>,
124        period: BrokerHoldingPeriod,
125    ) -> Result<BrokerHoldingTop> {
126        let period_str = match period {
127            BrokerHoldingPeriod::Rct1 => "rct_1",
128            BrokerHoldingPeriod::Rct5 => "rct_5",
129            BrokerHoldingPeriod::Rct20 => "rct_20",
130            BrokerHoldingPeriod::Rct60 => "rct_60",
131        };
132        #[derive(Serialize)]
133        struct Query {
134            counter_id: String,
135            #[serde(rename = "type")]
136            period: &'static str,
137        }
138        self.get(
139            "/v1/quote/broker-holding",
140            Query {
141                counter_id: symbol_to_counter_id(&symbol.into()),
142                period: period_str,
143            },
144        )
145        .await
146    }
147
148    /// Get full broker holding details for a security.
149    ///
150    /// Path: `GET /v1/quote/broker-holding/detail`
151    pub async fn broker_holding_detail(
152        &self,
153        symbol: impl Into<String>,
154    ) -> Result<BrokerHoldingDetail> {
155        #[derive(Serialize)]
156        struct Query {
157            counter_id: String,
158        }
159        self.get(
160            "/v1/quote/broker-holding/detail",
161            Query {
162                counter_id: symbol_to_counter_id(&symbol.into()),
163            },
164        )
165        .await
166    }
167
168    /// Get daily holding history for a specific broker.
169    ///
170    /// Path: `GET /v1/quote/broker-holding/daily`
171    pub async fn broker_holding_daily(
172        &self,
173        symbol: impl Into<String>,
174        broker_id: impl Into<String>,
175    ) -> Result<BrokerHoldingDailyHistory> {
176        #[derive(Serialize)]
177        struct Query {
178            counter_id: String,
179            parti_number: String,
180        }
181        self.get(
182            "/v1/quote/broker-holding/daily",
183            Query {
184                counter_id: symbol_to_counter_id(&symbol.into()),
185                parti_number: broker_id.into(),
186            },
187        )
188        .await
189    }
190
191    // ── ah_premium ────────────────────────────────────────────────
192
193    /// Get A/H premium K-line data for a dual-listed security.
194    ///
195    /// Path: `GET /v1/quote/ahpremium/klines`
196    pub async fn ah_premium(
197        &self,
198        symbol: impl Into<String>,
199        period: AhPremiumPeriod,
200        count: u32,
201    ) -> Result<AhPremiumKlines> {
202        #[derive(Serialize)]
203        struct Query {
204            counter_id: String,
205            line_type: &'static str,
206            line_num: u32,
207        }
208        self.get(
209            "/v1/quote/ahpremium/klines",
210            Query {
211                counter_id: symbol_to_counter_id(&symbol.into()),
212                line_type: period.to_line_type(),
213                line_num: count,
214            },
215        )
216        .await
217    }
218
219    /// Get A/H premium intraday data for a dual-listed security.
220    ///
221    /// Path: `GET /v1/quote/ahpremium/timeshares`
222    pub async fn ah_premium_intraday(
223        &self,
224        symbol: impl Into<String>,
225    ) -> Result<AhPremiumIntraday> {
226        #[derive(Serialize)]
227        struct Query {
228            counter_id: String,
229            days: &'static str,
230        }
231        self.get(
232            "/v1/quote/ahpremium/timeshares",
233            Query {
234                counter_id: symbol_to_counter_id(&symbol.into()),
235                days: "1",
236            },
237        )
238        .await
239    }
240
241    // ── trade_stats ───────────────────────────────────────────────
242
243    /// Get buy/sell/neutral trade statistics for a security.
244    ///
245    /// Path: `GET /v1/quote/trades-statistics`
246    pub async fn trade_stats(&self, symbol: impl Into<String>) -> Result<TradeStatsResponse> {
247        #[derive(Serialize)]
248        struct Query {
249            counter_id: String,
250        }
251        self.get(
252            "/v1/quote/trades-statistics",
253            Query {
254                counter_id: symbol_to_counter_id(&symbol.into()),
255            },
256        )
257        .await
258    }
259
260    // ── anomaly ───────────────────────────────────────────────────
261
262    /// Get market anomaly alerts (unusual price/volume events).
263    ///
264    /// Path: `GET /v1/quote/changes`
265    pub async fn anomaly(&self, market: impl Into<String>) -> Result<AnomalyResponse> {
266        #[derive(Serialize)]
267        struct Query {
268            market: String,
269            category: &'static str,
270        }
271        self.get(
272            "/v1/quote/changes",
273            Query {
274                market: market.into().to_uppercase(),
275                category: "0",
276            },
277        )
278        .await
279    }
280
281    // ── constituent ───────────────────────────────────────────────
282
283    /// Get constituent stocks for an index.
284    ///
285    /// `symbol` should be an index symbol such as `"HSI.HK"`.
286    ///
287    /// Path: `GET /v1/quote/index-constituents`
288    pub async fn constituent(&self, symbol: impl Into<String>) -> Result<IndexConstituents> {
289        #[derive(Serialize)]
290        struct Query {
291            counter_id: String,
292        }
293        self.get(
294            "/v1/quote/index-constituents",
295            Query {
296                counter_id: index_symbol_to_counter_id(&symbol.into()),
297            },
298        )
299        .await
300    }
301
302    // ── top_movers ────────────────────────────────────────────────
303
304    /// Get top movers (stocks with unusual price movements) across one or more
305    /// markets.
306    ///
307    /// Path: `POST /v1/quote/market/stock-events`
308    ///
309    /// `sort` is the sort order code (0 = ascending, 1 = descending).
310    /// `date` is an optional date filter in `"YYYY-MM-DD"` format.
311    pub async fn top_movers(
312        &self,
313        markets: Vec<String>,
314        sort: u32,
315        date: Option<String>,
316        limit: u32,
317    ) -> Result<TopMoversResponse> {
318        #[derive(Debug, Serialize)]
319        struct Body {
320            limit: u32,
321            sort: u32,
322            markets: Vec<String>,
323            #[serde(skip_serializing_if = "Option::is_none")]
324            date: Option<String>,
325        }
326        let raw: serde_json::Value = self
327            .post(
328                "/v1/quote/market/stock-events",
329                Body {
330                    limit,
331                    sort,
332                    markets,
333                    date,
334                },
335            )
336            .await?;
337
338        let events = raw["events"]
339            .as_array()
340            .cloned()
341            .unwrap_or_default()
342            .into_iter()
343            .map(|ev| {
344                let ts = if let Some(n) = ev["timestamp"].as_i64() {
345                    unix_secs_to_rfc3339(n)
346                } else if let Some(s) = ev["timestamp"].as_str() {
347                    unix_secs_str_to_rfc3339(s)
348                } else {
349                    String::new()
350                };
351                let stock_val = &ev["stock"];
352                let stock = TopMoversStock {
353                    symbol: counter_id_to_symbol(stock_val["counter_id"].as_str().unwrap_or("")),
354                    code: stock_val["code"].as_str().unwrap_or("").to_string(),
355                    name: stock_val["name"].as_str().unwrap_or("").to_string(),
356                    full_name: stock_val["full_name"].as_str().unwrap_or("").to_string(),
357                    change: stock_val["change"].as_str().unwrap_or("").to_string(),
358                    last_done: stock_val["last_done"].as_str().unwrap_or("").to_string(),
359                    market: stock_val["market"].as_str().unwrap_or("").to_string(),
360                    labels: stock_val["labels"]
361                        .as_array()
362                        .map(|arr| {
363                            arr.iter()
364                                .filter_map(|l| l.as_str().map(|s| s.to_string()))
365                                .collect()
366                        })
367                        .unwrap_or_default(),
368                    logo: stock_val["logo"].as_str().unwrap_or("").to_string(),
369                };
370                TopMoversEvent {
371                    timestamp: ts,
372                    alert_reason: ev["alert_reason"].as_str().unwrap_or("").to_string(),
373                    alert_type: ev["alert_type"].as_i64().unwrap_or(0),
374                    stock,
375                    post: ev["post"].clone(),
376                }
377            })
378            .collect();
379        let next_params = raw["next_params"].clone();
380        Ok(TopMoversResponse {
381            events,
382            next_params,
383        })
384    }
385
386    // ── rank_categories ───────────────────────────────────────────
387
388    /// Get all available rank category keys and labels.
389    ///
390    /// Path: `GET /v1/quote/market/rank/categories`
391    pub async fn rank_categories(&self) -> Result<RankCategoriesResponse> {
392        #[derive(Serialize)]
393        struct Empty {}
394        let mut raw: serde_json::Value = self
395            .get("/v1/quote/market/rank/categories", Empty {})
396            .await?;
397        // Strip the "ib_" prefix from all key fields so callers get clean keys
398        // that can be passed back to rank_list without the prefix.
399        if let Some(tags) = raw["first_tags"].as_array_mut() {
400            for tag in tags.iter_mut() {
401                if let Some(k) = tag["key"].as_str() {
402                    let stripped = k.strip_prefix("ib_").unwrap_or(k).to_string();
403                    tag["key"] = serde_json::Value::String(stripped);
404                }
405                if let Some(subs) = tag["second_tags"].as_array_mut() {
406                    for sub in subs.iter_mut() {
407                        if let Some(sk) = sub["key"].as_str() {
408                            let stripped = sk.strip_prefix("ib_").unwrap_or(sk).to_string();
409                            sub["key"] = serde_json::Value::String(stripped);
410                        }
411                    }
412                }
413            }
414        }
415        Ok(RankCategoriesResponse { data: raw })
416    }
417
418    // ── rank_list ─────────────────────────────────────────────────
419
420    /// Get a ranked list of securities for the given category key.
421    ///
422    /// Path: `GET /v1/quote/market/rank/list`
423    pub async fn rank_list(
424        &self,
425        key: impl Into<String>,
426        need_article: bool,
427    ) -> Result<RankListResponse> {
428        #[derive(Serialize)]
429        struct Query {
430            key: String,
431            delay_bmp: &'static str,
432            need_article: &'static str,
433        }
434        let key_str = key.into();
435        // Add "ib_" prefix if the caller passed a clean key (without it).
436        let api_key = if key_str.starts_with("ib_") {
437            key_str
438        } else {
439            format!("ib_{key_str}")
440        };
441        let raw: serde_json::Value = self
442            .get(
443                "/v1/quote/market/rank/list",
444                Query {
445                    key: api_key,
446                    delay_bmp: "false",
447                    need_article: if need_article { "true" } else { "false" },
448                },
449            )
450            .await?;
451        let bmp = raw["bmp"].as_bool().unwrap_or(false);
452        let lists = raw["lists"]
453            .as_array()
454            .cloned()
455            .unwrap_or_default()
456            .into_iter()
457            .map(|item| RankListItem {
458                symbol: counter_id_to_symbol(item["counter_id"].as_str().unwrap_or("")),
459                code: item["code"].as_str().unwrap_or("").to_string(),
460                name: item["name"].as_str().unwrap_or("").to_string(),
461                last_done: item["last_done"].as_str().unwrap_or("").to_string(),
462                chg: item["chg"].as_str().unwrap_or("").to_string(),
463                change: item["change"].as_str().unwrap_or("").to_string(),
464                inflow: item["inflow"].as_str().unwrap_or("").to_string(),
465                market_cap: item["market_cap"].as_str().unwrap_or("").to_string(),
466                industry: item["industry"].as_str().unwrap_or("").to_string(),
467                pre_post_price: item["pre_post_price"].as_str().unwrap_or("").to_string(),
468                pre_post_chg: item["pre_post_chg"].as_str().unwrap_or("").to_string(),
469                amplitude: item["amplitude"].as_str().unwrap_or("").to_string(),
470                five_day_chg: item["five_day_chg"].as_str().unwrap_or("").to_string(),
471                turnover_rate: item["turnover_rate"].as_str().unwrap_or("").to_string(),
472                volume_rate: item["volume_rate"].as_str().unwrap_or("").to_string(),
473                pb_ttm: item["pb_ttm"].as_str().unwrap_or("").to_string(),
474            })
475            .collect();
476        Ok(RankListResponse { bmp, lists })
477    }
478}