x-algorithm/home-mixer/candidate_hydrators/subscription_hydrator.rs
2026-01-20 02:31:49 +00:00

51 lines
1.7 KiB
Rust

use crate::candidate_pipeline::candidate::PostCandidate;
use crate::candidate_pipeline::query::ScoredPostsQuery;
use crate::clients::tweet_entity_service_client::TESClient;
use std::sync::Arc;
use tonic::async_trait;
use xai_candidate_pipeline::hydrator::Hydrator;
pub struct SubscriptionHydrator {
pub tes_client: Arc<dyn TESClient + Send + Sync>,
}
impl SubscriptionHydrator {
pub async fn new(tes_client: Arc<dyn TESClient + Send + Sync>) -> Self {
Self { tes_client }
}
}
#[async_trait]
impl Hydrator<ScoredPostsQuery, PostCandidate> for SubscriptionHydrator {
#[xai_stats_macro::receive_stats]
async fn hydrate(
&self,
_query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Result<Vec<PostCandidate>, String> {
let client = &self.tes_client;
let tweet_ids = candidates.iter().map(|c| c.tweet_id).collect::<Vec<_>>();
let post_features = client.get_subscription_author_ids(tweet_ids.clone()).await;
let post_features = post_features.map_err(|e| e.to_string())?;
let mut hydrated_candidates = Vec::with_capacity(candidates.len());
for tweet_id in tweet_ids {
let post_features = post_features.get(&tweet_id);
let subscription_author_id = post_features.and_then(|x| *x);
let hydrated = PostCandidate {
subscription_author_id,
..Default::default()
};
hydrated_candidates.push(hydrated);
}
Ok(hydrated_candidates)
}
fn update(&self, candidate: &mut PostCandidate, hydrated: PostCandidate) {
candidate.subscription_author_id = hydrated.subscription_author_id;
}
}