From 1609636e90565aa5ec948b46bed37ab384719187 Mon Sep 17 00:00:00 2001 From: xeon Date: Sun, 2 Jun 2024 23:32:00 +0300 Subject: [PATCH] Queue up Arg packets instead of sending instantly Change send_rpc_arg to push_rpc_arg. Queue up and Send all Arg packets after handler finish --- gameserver/src/game/context.rs | 10 ++--- gameserver/src/game/manager/dungeon.rs | 2 +- gameserver/src/game/manager/item.rs | 2 +- gameserver/src/net/handlers/hollow.rs | 60 +++++++++++++------------- gameserver/src/net/handlers/mod.rs | 1 + gameserver/src/net/handlers/role.rs | 2 +- gameserver/src/net/handlers/world.rs | 30 ++++++------- gameserver/src/net/session.rs | 30 +++++++++++-- 8 files changed, 81 insertions(+), 56 deletions(-) diff --git a/gameserver/src/game/context.rs b/gameserver/src/game/context.rs index f644081..20cef03 100644 --- a/gameserver/src/game/context.rs +++ b/gameserver/src/game/context.rs @@ -52,21 +52,21 @@ impl PlayerOperationResult where T: Send + Sync, { - pub const fn unwrap(&self) -> &T { - &self.result + pub fn take(self) -> T { + self.result } - pub async fn send_changes(&mut self, session: &NetworkSession) -> Result<&T> { + pub async fn send_changes(mut self, session: &NetworkSession) -> Result { if self.player_info_changes.is_some() { let ptc_player_info_changed = PtcPlayerInfoChangedArg { player_uid: session.player_uid().raw(), player_info: self.player_info_changes.take().unwrap(), }; - session.send_rpc_arg(101, &ptc_player_info_changed).await?; + session.push_rpc_arg(101, ptc_player_info_changed).await?; } - Ok(self.unwrap()) + Ok(self.take()) } pub const fn ret(result: T) -> Self { diff --git a/gameserver/src/game/manager/dungeon.rs b/gameserver/src/game/manager/dungeon.rs index 6519160..2da6481 100644 --- a/gameserver/src/game/manager/dungeon.rs +++ b/gameserver/src/game/manager/dungeon.rs @@ -421,7 +421,7 @@ impl DungeonManager { pub fn enter_battle(&self, scene_uid: u64) -> PlayerOperationResult { let hollow_scene_uid = *self.player.read().scene_uid.as_ref().unwrap(); let hollow_scene = self.set_cur_hollow_battle(scene_uid, hollow_scene_uid); - let ptc_enter_scene = self.enter_scene(scene_uid).unwrap().unwrap().clone(); + let ptc_enter_scene = self.enter_scene(scene_uid).unwrap().take().clone(); let player = self.player.read(); let dungeon_collection = player.dungeon_collection.as_ref().unwrap(); diff --git a/gameserver/src/game/manager/item.rs b/gameserver/src/game/manager/item.rs index a70a18c..822c71a 100644 --- a/gameserver/src/game/manager/item.rs +++ b/gameserver/src/game/manager/item.rs @@ -86,7 +86,7 @@ impl ItemManager { }; // Unlock & equip default weapon - let weapon_uid = *self.unlock_weapon(10012).unwrap(); + let weapon_uid = self.unlock_weapon(10012).take(); self.equip_weapon(weapon_uid, uid); let mut player_info = self.player_info.write(); diff --git a/gameserver/src/net/handlers/hollow.rs b/gameserver/src/net/handlers/hollow.rs index 701b66c..4d65dc1 100644 --- a/gameserver/src/net/handlers/hollow.rs +++ b/gameserver/src/net/handlers/hollow.rs @@ -21,12 +21,12 @@ pub async fn on_rpc_hollow_move( .move_to(destination_pos, scene_uid); session - .send_rpc_arg(PTC_HOLLOW_GRID_ID, &ptc_hollow_grid) + .push_rpc_arg(PTC_HOLLOW_GRID_ID, ptc_hollow_grid) .await?; if let Some(ptc_sync_hollow_event) = ptc_sync_hollow_event { session - .send_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, &ptc_sync_hollow_event) + .push_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, ptc_sync_hollow_event) .await?; } @@ -37,7 +37,7 @@ pub async fn on_rpc_hollow_move( }; session - .send_rpc_arg(PTC_POSITION_IN_HOLLOW_CHANGED_ID, &pos) + .push_rpc_arg(PTC_POSITION_IN_HOLLOW_CHANGED_ID, pos) .await?; Ok(RpcHollowMoveRet::new( @@ -57,10 +57,10 @@ pub async fn on_rpc_end_battle( if !hollow_finished { session - .send_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, &sync_event) + .push_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, sync_event) .await?; } else { - let _ = *session + let _ = session .context .dungeon_manager .hollow_finished() @@ -76,7 +76,7 @@ pub async fn on_rpc_end_battle( }; session - .send_rpc_arg(PTC_DUNGEON_QUEST_FINISHED_ID, &ptc_dungeon_quest_finished) + .push_rpc_arg(PTC_DUNGEON_QUEST_FINISHED_ID, ptc_dungeon_quest_finished) .await?; } @@ -90,9 +90,9 @@ pub async fn on_rpc_end_battle( .clone(); session - .send_rpc_arg( + .push_rpc_arg( PTC_SYNC_HOLLOW_GRID_MAPS_ID, - &session.context.hollow_grid_manager.sync_hollow_maps( + session.context.hollow_grid_manager.sync_hollow_maps( player_uid, session.context.dungeon_manager.get_cur_scene_uid(), ), @@ -109,14 +109,14 @@ pub async fn on_rpc_end_battle( }; session - .send_rpc_arg( + .push_rpc_arg( PTC_POSITION_IN_HOLLOW_CHANGED_ID, - &ptc_position_in_hollow_changed, + ptc_position_in_hollow_changed, ) .await?; session - .send_rpc_arg(PTC_ENTER_SCENE_ID, &ptc_enter_scene) + .push_rpc_arg(PTC_ENTER_SCENE_ID, ptc_enter_scene) .await?; Ok(RpcEndBattleRet::new( @@ -156,18 +156,18 @@ pub async fn on_rpc_run_hollow_event_graph( specials: phashmap![], }; session - .send_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, &finish_perform) + .push_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, finish_perform) .await?; let (ptc_hollow_grid, ptc_sync_hollow_event) = session.context.hollow_grid_manager.move_to(22, scene_uid); session - .send_rpc_arg(PTC_HOLLOW_GRID_ID, &ptc_hollow_grid) + .push_rpc_arg(PTC_HOLLOW_GRID_ID, ptc_hollow_grid) .await?; if let Some(ptc_sync_hollow_event) = ptc_sync_hollow_event { session - .send_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, &ptc_sync_hollow_event) + .push_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, ptc_sync_hollow_event) .await?; } } else { @@ -178,15 +178,15 @@ pub async fn on_rpc_run_hollow_event_graph( if !hollow_finished { session - .send_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, &sync_hollow_event) + .push_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, sync_hollow_event) .await?; } session - .send_rpc_arg(PTC_HOLLOW_GRID_ID, &hollow_grid) + .push_rpc_arg(PTC_HOLLOW_GRID_ID, hollow_grid) .await?; if hollow_finished { - let _ = *session + let _ = session .context .dungeon_manager .hollow_finished() @@ -202,7 +202,7 @@ pub async fn on_rpc_run_hollow_event_graph( }; session - .send_rpc_arg(PTC_DUNGEON_QUEST_FINISHED_ID, &ptc_dungeon_quest_finished) + .push_rpc_arg(PTC_DUNGEON_QUEST_FINISHED_ID, ptc_dungeon_quest_finished) .await?; } @@ -214,7 +214,7 @@ pub async fn on_rpc_run_hollow_event_graph( .scene_uid .as_ref() .unwrap(); - let battle_scene_uid = *session + let battle_scene_uid = session .context .dungeon_manager .create_fight(trigger_battle_id, hollow_uid) @@ -231,14 +231,14 @@ pub async fn on_rpc_run_hollow_event_graph( }; session - .send_rpc_arg( + .push_rpc_arg( PTC_POSITION_IN_HOLLOW_CHANGED_ID, - &ptc_position_in_hollow_changed, + ptc_position_in_hollow_changed, ) .await?; session - .send_rpc_arg( + .push_rpc_arg( PTC_ENTER_SCENE_ID, session .context @@ -288,7 +288,7 @@ pub async fn on_rpc_start_hollow_quest( }; session - .send_rpc_arg(PTC_PROPERTY_CHANGED_ID, &update_properties) + .push_rpc_arg(PTC_PROPERTY_CHANGED_ID, update_properties) .await?; } @@ -298,7 +298,7 @@ pub async fn on_rpc_start_hollow_quest( .sorted_by_key(|kv| kv.0) .map(|(_idx, uid)| *uid) .collect::>(); - let (dungeon_uid, scene_uid) = *session + let (dungeon_uid, scene_uid) = session .context .dungeon_manager .create_hollow(10001, 10010001, &avatars) @@ -336,9 +336,9 @@ pub async fn on_rpc_start_hollow_quest( session.context.hollow_grid_manager.init_default_map(); session - .send_rpc_arg( + .push_rpc_arg( PTC_SYNC_HOLLOW_GRID_MAPS_ID, - &session + session .context .hollow_grid_manager .sync_hollow_maps(session.player_uid().raw(), scene_uid), @@ -355,9 +355,9 @@ pub async fn on_rpc_start_hollow_quest( }; session - .send_rpc_arg( + .push_rpc_arg( PTC_POSITION_IN_HOLLOW_CHANGED_ID, - &ptc_position_in_hollow_changed, + ptc_position_in_hollow_changed, ) .await?; @@ -380,11 +380,11 @@ pub async fn on_rpc_start_hollow_quest( }; session - .send_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, &ptc_sync_hollow_event_info) + .push_rpc_arg(PTC_SYNC_HOLLOW_EVENT_INFO_ID, ptc_sync_hollow_event_info) .await?; session - .send_rpc_arg(PTC_ENTER_SCENE_ID, &ptc_enter_scene) + .push_rpc_arg(PTC_ENTER_SCENE_ID, ptc_enter_scene) .await?; Ok(RpcStartHollowQuestRet::new()) } diff --git a/gameserver/src/net/handlers/mod.rs b/gameserver/src/net/handlers/mod.rs index 4a88068..62fcc6e 100644 --- a/gameserver/src/net/handlers/mod.rs +++ b/gameserver/src/net/handlers/mod.rs @@ -39,6 +39,7 @@ macro_rules! protocol_handlers { .instrument(tracing::info_span!(stringify!([]), protocol_id = protocol_id)) .await?; + session.flush_rpc_queue().await?; session.send_rpc_ret(ret).await } )* diff --git a/gameserver/src/net/handlers/role.rs b/gameserver/src/net/handlers/role.rs index 4225f53..7de883a 100644 --- a/gameserver/src/net/handlers/role.rs +++ b/gameserver/src/net/handlers/role.rs @@ -22,7 +22,7 @@ pub async fn on_rpc_mod_nick_name( }; session - .send_rpc_arg(PTC_PLAYER_INFO_CHANGED_ID, &player_info_changed) + .push_rpc_arg(PTC_PLAYER_INFO_CHANGED_ID, player_info_changed) .await?; Ok(RpcModNickNameRet::new()) } diff --git a/gameserver/src/net/handlers/world.rs b/gameserver/src/net/handlers/world.rs index f307052..e36c815 100644 --- a/gameserver/src/net/handlers/world.rs +++ b/gameserver/src/net/handlers/world.rs @@ -56,7 +56,7 @@ pub async fn on_rpc_run_event_graph( ); session - .send_rpc_arg(PTC_SYNC_EVENT_INFO_ID, &ptc_sync_event_info) + .push_rpc_arg(PTC_SYNC_EVENT_INFO_ID, ptc_sync_event_info) .await?; Ok(RpcRunEventGraphRet::new()) @@ -89,7 +89,7 @@ pub async fn on_rpc_finish_event_graph_perform_show( ); session - .send_rpc_arg(PTC_SYNC_EVENT_INFO_ID, &ptc_sync_event_info) + .push_rpc_arg(PTC_SYNC_EVENT_INFO_ID, ptc_sync_event_info) .await?; Ok(RpcFinishEventGraphPerformShowRet::new()) @@ -152,7 +152,7 @@ pub async fn on_rpc_interact_with_unit( ); session - .send_rpc_arg(PTC_SYNC_EVENT_INFO_ID, &ptc_sync_event_info) + .push_rpc_arg(PTC_SYNC_EVENT_INFO_ID, ptc_sync_event_info) .await?; } @@ -206,25 +206,25 @@ pub async fn enter_main_city(session: &NetworkSession) -> Result<()> { let hall_scene_uid = session.context.dungeon_manager.get_default_scene_uid(); session - .send_rpc_arg( + .push_rpc_arg( PTC_ENTER_SECTION_ID, session .context .dungeon_manager .enter_scene_section(hall_scene_uid, 2) - .unwrap(), + .take(), ) .await?; session - .send_rpc_arg( + .push_rpc_arg( PTC_SYNC_SCENE_UNIT_ID, - &session.context.scene_unit_manager.sync(hall_scene_uid, 2), + session.context.scene_unit_manager.sync(hall_scene_uid, 2), ) .await?; session - .send_rpc_arg( + .push_rpc_arg( PTC_ENTER_SCENE_ID, session .context @@ -305,24 +305,24 @@ pub async fn on_rpc_enter_world( if CONFIGURATION.skip_tutorial { Box::pin(enter_main_city(session)).await?; } else { - let fresh_scene_uid = *session.context.dungeon_manager.create_fresh().unwrap(); + let fresh_scene_uid = session.context.dungeon_manager.create_fresh().take(); session - .send_rpc_arg( + .push_rpc_arg( PTC_ENTER_SCENE_ID, session .context .dungeon_manager .enter_scene(fresh_scene_uid) .unwrap() - .unwrap(), + .take(), ) .await?; } session - .send_rpc_arg( + .push_rpc_arg( PTC_SYNC_SCENE_TIME_ID, - &PtcSyncSceneTimeArg { + PtcSyncSceneTimeArg { timestamp: 3600 * 8 * 1000, last_timestamp: 0, }, @@ -341,9 +341,9 @@ pub async fn on_rpc_reenter_world( tracing::warn!("OnRpcReenterWorld: world re-entrance is not implemented yet, kicking player!"); session - .send_rpc_arg( + .push_rpc_arg( PTC_KICK_PLAYER_ID, - &PtcKickPlayerArg { + PtcKickPlayerArg { reason_id: 2, reason_str: String::new(), }, diff --git a/gameserver/src/net/session.rs b/gameserver/src/net/session.rs index b5e7b48..9489a5b 100644 --- a/gameserver/src/net/session.rs +++ b/gameserver/src/net/session.rs @@ -1,6 +1,7 @@ use anyhow::Result; use protocol::{AccountInfo, PlayerInfo}; use qwer::{OctData, ProtocolHeader}; +use std::collections::VecDeque; use std::io::Cursor; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -33,9 +34,12 @@ impl PlayerUID { } } +struct QueueItem(pub u16, pub Vec); + pub struct NetworkSession { client_socket: Arc>, cur_rpc_uid: u64, + outgoing_rpc_queue: Mutex>, pub ns_prop_mgr: net_stream::PropertyManager, pub context: GameContext, account_uid: OnceCell, @@ -49,6 +53,7 @@ impl NetworkSession { Self { client_socket: Arc::new(Mutex::new(client_socket)), cur_rpc_uid: 0, + outgoing_rpc_queue: Mutex::new(VecDeque::new()), context: GameContext::new(ns_prop_mgr.player_info.clone()), ns_prop_mgr, account_uid: OnceCell::new(), @@ -108,13 +113,32 @@ impl NetworkSession { } } - pub async fn send_rpc_arg(&self, protocol_id: u16, data: &impl OctData) -> Result<()> { - let header: Vec = ProtocolHeader::default().into(); - + pub async fn push_rpc_arg(&self, protocol_id: u16, data: impl OctData) -> Result<()> { let mut payload = Vec::new(); let mut cursor = Cursor::new(&mut payload); data.marshal_to(&mut cursor, 0)?; + self.outgoing_rpc_queue + .lock() + .await + .push_back(QueueItem(protocol_id, payload)); + + Ok(()) + } + + pub async fn flush_rpc_queue(&self) -> Result<()> { + let mut queue = self.outgoing_rpc_queue.lock().await; + + while let Some(QueueItem(protocol_id, payload)) = queue.pop_front() { + self.send_rpc_arg(protocol_id, payload).await?; + } + + Ok(()) + } + + async fn send_rpc_arg(&self, protocol_id: u16, payload: Vec) -> Result<()> { + let header: Vec = ProtocolHeader::default().into(); + let body: Vec = RequestBody { protocol_id, payload,