wicked-waifus-rs/wicked-waifus-game-server/src/logic/thread_mgr.rs
2025-05-20 04:16:51 +03:00

269 lines
8.6 KiB
Rust

use super::{ecs::world::World, player::Player, utils::world_util};
use crate::logic::ecs::world::WorldEntity;
use crate::{
logic,
player_save_task::{self, PlayerSaveReason},
session::Session,
};
use std::collections::hash_map::Entry::Vacant;
use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc, OnceLock,
},
thread,
time::Duration,
};
use wicked_waifus_commons::time_util;
use wicked_waifus_protocol::{
message::Message, AfterJoinSceneNotify, EnterGameResponse, JoinSceneNotify, SilenceNpcNotify,
TransitionOptionPb,
};
use wicked_waifus_protocol::{FormationAttr, FormationAttrNotify};
use wicked_waifus_protocol_internal::PlayerSaveData;
pub enum LogicInput {
AddPlayer {
player_id: i32,
enter_rpc_id: u16,
session: Arc<Session>,
player_save_data: Box<PlayerSaveData>,
},
RemovePlayer {
player_id: i32,
},
ProcessMessage {
player_id: i32,
message: Message,
},
}
#[derive(Clone)]
pub struct LogicThreadHandle {
sender: mpsc::Sender<LogicInput>,
load: Arc<AtomicUsize>,
}
static THREAD_HANDLES: OnceLock<Box<[LogicThreadHandle]>> = OnceLock::new();
pub fn start_logic_threads(num_threads: usize) {
if THREAD_HANDLES.get().is_some() {
tracing::error!("start_logic_threads: logic threads are already running!");
return;
}
let _ = THREAD_HANDLES.set(
(0..num_threads)
.map(|_| {
let (tx, rx) = mpsc::channel();
let load = Arc::new(AtomicUsize::new(0));
let handle = LogicThreadHandle {
sender: tx,
load: load.clone(),
};
thread::spawn(move || logic_thread_func(rx, load));
handle
})
.collect(),
);
}
// Thread-local logic state
struct LogicState {
thread_load: Arc<AtomicUsize>, // shared parameter for load-balancing
worlds: HashMap<i32, Rc<RefCell<World>>>, // owner_id - world
players: HashMap<i32, RefCell<Player>>, // id - player
}
fn logic_thread_func(receiver: mpsc::Receiver<LogicInput>, load: Arc<AtomicUsize>) {
const RECV_TIMEOUT: Duration = Duration::from_millis(50);
const PLAYER_SAVE_PERIOD: u64 = 30;
let mut state = LogicState {
thread_load: load,
worlds: HashMap::new(),
players: HashMap::new(),
};
let mut input_queue = VecDeque::with_capacity(32);
loop {
if let Ok(input) = receiver.recv_timeout(RECV_TIMEOUT) {
input_queue.push_back(input);
while let Ok(input) = receiver.try_recv() {
input_queue.push_back(input);
}
}
while let Some(input) = input_queue.pop_front() {
handle_logic_input(&mut state, input);
}
state.worlds.values().for_each(|world| {
let mut world = world.borrow_mut();
let mut players = world
.player_ids()
.flat_map(|id| state.players.get(id).map(|pl| pl.borrow_mut()))
.collect::<Box<_>>();
super::systems::tick_systems(&mut world, &mut players);
});
state.players.values().for_each(|player| {
let mut player = player.borrow_mut();
if time_util::unix_timestamp() - player.last_save_time > PLAYER_SAVE_PERIOD {
player_save_task::push(
player.basic_info.id,
player.build_save_data(),
PlayerSaveReason::PeriodicalSave,
);
player.last_save_time = time_util::unix_timestamp();
}
})
}
}
fn handle_logic_input(state: &mut LogicState, input: LogicInput) {
match input {
LogicInput::AddPlayer {
player_id,
enter_rpc_id,
session,
player_save_data,
} => {
let (player, is_player) = if let Vacant(e) = state.players.entry(player_id) {
(
e.insert(RefCell::new(Player::load_from_save(*player_save_data))),
true,
)
} else if let Some(player) = state.players.get_mut(&player_id) {
(player, false)
} else {
tracing::warn!("logic_thread: get player requested, but player {player_id} with data doesn't exist");
return;
};
let mut player = player.borrow_mut();
if is_player {
player
.world
.borrow_mut()
.world_entitys
.insert(player.basic_info.cur_map_id, WorldEntity::default());
state.worlds.insert(player_id, player.world.clone());
}
player.init();
player.set_session(session);
player.respond(EnterGameResponse::default(), enter_rpc_id);
player.notify_general_data();
player
.world
.borrow_mut()
.set_in_world_player_data(player.build_in_world_player());
world_util::add_player_entities(&player, player.formation_list.get(&player.cur_formation_id).unwrap(), None);
let scene_info = world_util::build_scene_information(&player);
player.notify(SilenceNpcNotify::default());
player.notify(JoinSceneNotify {
scene_info: Some(scene_info),
max_entity_id: i64::MAX,
transition_option: Some(TransitionOptionPb::default()),
});
player.notify(FormationAttrNotify {
duration: 1534854458,
formation_attrs: vec![
FormationAttr {
attr_id: 1,
ratio: 2400,
base_max_value: 24000,
max_value: 24000,
current_value: 24000,
},
FormationAttr {
attr_id: 10,
ratio: 2400,
base_max_value: 15000,
max_value: 15000,
current_value: 15000,
},
],
});
player.notify(AfterJoinSceneNotify::default());
player.notify(player.build_update_formation_notify());
let map = logic::utils::quadrant_util::get_map(player.basic_info.cur_map_id);
let quadrant_id = map.get_quadrant_id(
player.location.position.position.x * 100.0,
player.location.position.position.y * 100.0,
);
player.quadrant_id = quadrant_id;
player.notify_month_card();
let entities = map.get_initial_entities(quadrant_id);
world_util::add_entities(&player, &entities, false);
drop(player);
state
.thread_load
.store(state.players.len(), Ordering::Relaxed);
}
LogicInput::ProcessMessage { player_id, message } => {
let Some(player) = state.players.get_mut(&player_id) else {
tracing::warn!("logic_thread: process message requested, but player with id {player_id} doesn't exist");
return;
};
super::handler::handle_logic_message(&mut player.borrow_mut(), message);
}
LogicInput::RemovePlayer { player_id } => {
let Some(player) = state.players.remove(&player_id) else {
tracing::warn!(
"logic_thread: player remove requested, but it doesn't exist (id: {player_id})"
);
return;
};
let removed_world = state.worlds.remove(&player_id).unwrap();
let mut removed_world_ref = removed_world.borrow_mut();
let world = removed_world_ref.get_mut_world_entity();
for entity_id in world.get_all_entity_ids() {
world.remove_entity(entity_id);
}
// TODO: kick co-op players from removed world
player_save_task::push(
player_id,
player.borrow().build_save_data(),
PlayerSaveReason::PlayerLogicStopped,
);
}
}
}
impl LogicThreadHandle {
pub fn input(&self, input: LogicInput) {
let _ = self.sender.send(input);
}
}
pub fn get_least_loaded_thread() -> LogicThreadHandle {
let handles = THREAD_HANDLES.get().unwrap();
handles
.iter()
.min_by_key(|h| h.load.load(Ordering::Relaxed))
.unwrap()
.clone()
}