Make GM commands able to be executed at runtime properly, networking and eventgraph code adjustment

This commit is contained in:
xeon 2025-05-31 17:22:44 +03:00
parent 8b03b7c099
commit 6ff7dd3ff0
23 changed files with 561 additions and 277 deletions

View file

@ -1,3 +1,4 @@
pub mod config_util;
pub mod logging;
pub mod ref_util;
pub mod time_util;

View file

@ -0,0 +1,29 @@
use std::ops::Deref;
pub enum Ref<T: 'static> {
Static(&'static T),
Owned(T),
}
impl<T> Deref for Ref<T> {
type Target = T;
fn deref(&self) -> &T {
match self {
Self::Static(s) => s,
Self::Owned(s) => s,
}
}
}
impl<T> From<&'static T> for Ref<T> {
fn from(value: &'static T) -> Self {
Self::Static(value)
}
}
impl<T> From<T> for Ref<T> {
fn from(value: T) -> Self {
Self::Owned(value)
}
}

View file

@ -17,6 +17,7 @@ pub enum SectionEvent {
OnEnter,
OnInteract,
OnStart,
GM,
#[serde(untagged)]
Custom(String),
}

View file

@ -42,6 +42,9 @@ pub enum GMCmd {
SetControlGuiseAvatar {
avatar_id: u32,
},
UnlockHollowQuest {
quest_id: u32,
},
Jump {
section_id: u32,
transform_id: String,

View file

@ -5,12 +5,13 @@ mod uid;
use std::collections::HashMap;
pub use action::ActionBase;
use common::ref_util::Ref;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use tracing::error;
pub use uid::EventUID;
use config::{ConfigEvent, ConfigEventAction, GraphReference, SectionEvent};
use vivian_proto::ActionInfo;
use vivian_proto::{ActionInfo, server_only::GraphReferenceType};
use crate::listener::LogicEventListener;
@ -28,26 +29,39 @@ pub enum EventState {
pub struct Event {
pub ty: SectionEvent,
pub graph: GraphReference,
pub tag: u32,
pub config: &'static ConfigEvent,
pub graph_id: GraphID,
pub config: Ref<ConfigEvent>,
pub cur_action_index: isize,
pub state: EventState,
pub specials: HashMap<String, i32>,
}
pub struct GraphID(pub u32, pub GraphReferenceType);
impl From<GraphReference> for GraphID {
fn from(value: GraphReference) -> Self {
match value {
GraphReference::MainCitySection(id) => Self(id, GraphReferenceType::MainCitySection),
GraphReference::Interact(id) => Self(id, GraphReferenceType::Interact),
GraphReference::Quest(id) => Self(id, GraphReferenceType::Quest),
GraphReference::HollowEvent(id) => Self(id, GraphReferenceType::HollowEvent),
}
}
}
impl Event {
pub fn new(
ty: SectionEvent,
tag: u32,
graph: GraphReference,
config: &'static ConfigEvent,
graph_id: impl Into<GraphID>,
config: impl Into<Ref<ConfigEvent>>,
) -> Self {
Self {
ty,
graph,
graph_id: graph_id.into(),
tag,
config,
config: config.into(),
cur_action_index: -1,
state: EventState::Initing,
specials: HashMap::new(), // TODO: take initial ones from graph config
@ -87,7 +101,7 @@ impl Event {
{
self.cur_action_index = index as isize;
let uid = ((self.graph.id() as u64) << 32) | own_uid.event_id() as u64;
let uid = ((self.graph_id.0 as u64) << 32) | own_uid.event_id() as u64;
if action_listener.should_execute_action(uid, action, logic_listener, &self.specials) {
action_listener.execute_action(uid, action, logic_listener, &self.specials);
if let Some(client_action_info) = action::action_to_proto(action) {
@ -106,7 +120,7 @@ impl Event {
}
pub fn is_persistent(&self) -> bool {
self.ty != SectionEvent::OnInteract
!matches!(self.ty, SectionEvent::OnInteract | SectionEvent::GM)
}
pub fn is_finished(&self) -> bool {

View file

@ -7,12 +7,12 @@ use npc::{Interact, InteractTarget, SceneUnit};
use tracing::{error, warn};
use vivian_proto::{
EnterSceneScNotify, EventGraphOwnerType, FinishEventGraphScNotify, SectionEventScNotify,
common::TimePeriodType,
common::TimePeriodType, server_only::GraphReferenceType,
};
use crate::{
LogicResources,
event::{ActionListener, Event, EventState, EventUID, event_util},
event::{ActionListener, Event, EventState, EventUID, GraphID, event_util},
listener::{LogicEventListener, NotifyListener},
math::{Scale, Transform},
scene::SceneType,
@ -365,6 +365,21 @@ impl GameHallState {
self.refresh_required = false;
}
pub fn execute_gm_event(&mut self, config: ConfigEvent, listener: &mut dyn LogicEventListener) {
let event_uid = EventUID::new(EventGraphOwnerType::Scene, config.id);
let mut event = Event::new(
SectionEvent::GM,
0,
GraphID(0, GraphReferenceType::None),
config,
);
event.wakeup(event_uid, self, listener);
if !event.is_finished() {
self.running_events.insert(event_uid, event);
}
}
fn initiate_event(
&mut self,
graph: GraphReference,

View file

@ -6,10 +6,9 @@ use std::{
use super::*;
use config::{GraphReference, SectionEvent};
use property::{PrimitiveProperty, Property, PropertyHashMap};
use tracing::warn;
use vivian_logic::{
dungeon::{Dungeon, DungeonEquipment},
event::{EventState, EventUID},
event::{EventState, EventUID, GraphID},
hall::npc::InteractTarget,
math::Scale,
scene::ELocalPlayType,
@ -54,7 +53,8 @@ pub struct HallSectionSnapshot {
}
pub struct EventSnapshot {
pub graph: GraphReference,
pub graph_id: u32,
pub graph_type: GraphReferenceType,
pub ty: SectionEvent,
pub uid: EventUID,
pub tag: u32,
@ -371,18 +371,29 @@ impl HallSceneSnapshot {
.attached_graph_list
.into_iter()
.filter_map(|info| {
load_graph_reference(info.reference_type(), info.reference_id)
Some(match info.reference_type() {
GraphReferenceType::Interact => {
GraphReference::Interact(info.reference_id)
}
GraphReferenceType::HollowEvent => {
GraphReference::HollowEvent(info.reference_id)
}
GraphReferenceType::Quest => {
GraphReference::Quest(info.reference_id)
}
GraphReferenceType::MainCitySection => {
GraphReference::MainCitySection(info.reference_id)
}
GraphReferenceType::None => return None,
})
})
.collect(),
event_snapshots: section
.event_state_list
.into_iter()
.map(|info| EventSnapshot {
graph: load_graph_reference(
info.graph_reference_type(),
info.graph_reference_id,
)
.unwrap(),
graph_id: info.graph_reference_id,
graph_type: info.graph_reference_type(),
ty: SectionEvent::from_str(&info.name).unwrap(),
uid: info.event_uid.into(),
state: EventState::try_from(info.event_state).unwrap(),
@ -446,7 +457,7 @@ impl HallSceneSnapshot {
.attached_graphs
.iter()
.map(|graph_ref| {
let (ty, id) = save_graph_reference(graph_ref);
let GraphID(id, ty) = GraphID::from(*graph_ref);
AttachedGraphInfo {
reference_id: id,
reference_type: ty.into(),
@ -456,18 +467,14 @@ impl HallSceneSnapshot {
event_state_list: section
.event_snapshots
.iter()
.map(|event| {
let (ty, id) = save_graph_reference(&event.graph);
EventStateInfo {
graph_reference_id: id,
graph_reference_type: ty.into(),
name: event.ty.to_string(),
event_uid: *event.uid,
event_state: event.state.into(),
cur_action_idx: event.cur_action_idx as i32,
tag: event.tag,
}
.map(|event| EventStateInfo {
graph_reference_id: event.graph_id,
graph_reference_type: event.graph_type.into(),
name: event.ty.to_string(),
event_uid: *event.uid,
event_state: event.state.into(),
cur_action_idx: event.cur_action_idx as i32,
tag: event.tag,
})
.collect(),
already_executed_event_uid_list: section
@ -524,24 +531,14 @@ impl LongFightSceneSnapshot {
}
}
fn load_graph_reference(ty: GraphReferenceType, id: u32) -> Option<GraphReference> {
match ty {
GraphReferenceType::MainCitySection => Some(GraphReference::MainCitySection(id)),
GraphReferenceType::Interact => Some(GraphReference::Interact(id)),
GraphReferenceType::Quest => Some(GraphReference::Quest(id)),
GraphReferenceType::HollowEvent => Some(GraphReference::HollowEvent(id)),
invalid => {
warn!("invalid graph reference type in snapshot: {invalid:?}");
None
}
}
}
fn save_graph_reference(graph_ref: &GraphReference) -> (GraphReferenceType, u32) {
match graph_ref {
GraphReference::MainCitySection(id) => (GraphReferenceType::MainCitySection, *id),
GraphReference::Interact(id) => (GraphReferenceType::Interact, *id),
GraphReference::Quest(id) => (GraphReferenceType::Quest, *id),
GraphReference::HollowEvent(id) => (GraphReferenceType::HollowEvent, *id),
impl EventSnapshot {
pub fn graph_reference(&self) -> Option<GraphReference> {
Some(match self.graph_type {
GraphReferenceType::Interact => GraphReference::Interact(self.graph_id),
GraphReferenceType::HollowEvent => GraphReference::HollowEvent(self.graph_id),
GraphReferenceType::Quest => GraphReference::Quest(self.graph_id),
GraphReferenceType::MainCitySection => GraphReference::MainCitySection(self.graph_id),
GraphReferenceType::None => return None,
})
}
}

View file

@ -7,6 +7,8 @@ pub struct PacketHead {
pub player_uid: u32,
#[prost(uint64, tag = "3")]
pub session_id: u64,
#[prost(uint64, tag = "4")]
pub gate_session_id: u64,
#[prost(uint32, tag = "11")]
pub ack_packet_id: u32,
}

View file

@ -770,4 +770,11 @@ pub struct GmTalkByMuipRsp {
pub retcode: i32,
#[prost(string, tag = "2")]
pub retmsg: ::prost::alloc::string::String,
}
#[derive(::proto_derive::NetCmd)]
#[cmd_id(10010)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ClientPerformNotify {
#[prost(message, repeated, tag = "1")]
pub notify_list: ::prost::alloc::vec::Vec<NetCommand>,
}

View file

@ -155,6 +155,7 @@ impl NetworkClient {
stream,
Arc::clone(self.listener.get().unwrap()),
None,
None,
));
let _ = self

View file

@ -1,4 +1,7 @@
use std::sync::{Arc, OnceLock};
use std::{
net::SocketAddr,
sync::{Arc, OnceLock},
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
@ -17,6 +20,7 @@ use super::{
};
pub struct NetworkEntity {
pub local_addr: Option<SocketAddr>,
sender: mpsc::UnboundedSender<NetPacket>,
encryption_state: Arc<EncryptionState>,
cancellation: CancellationToken,
@ -47,6 +51,7 @@ impl NetworkEntity {
id: u64,
stream: TcpStream,
listener: Arc<dyn NetworkEventListener>,
local_addr: Option<SocketAddr>,
xorpad: Option<&'static [u8; 4096]>,
) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
@ -70,6 +75,7 @@ impl NetworkEntity {
tokio::spawn(Self::send_loop(w, rx));
Self {
local_addr,
sender: tx,
encryption_state,
cancellation,

View file

@ -26,33 +26,38 @@ pub trait NetworkEventListener: Send + Sync + 'static {
}
pub struct NetworkServer {
bind_addr: SocketAddr,
bind_addresses: Vec<SocketAddr>,
}
pub struct NetworkEntityManager {
entity_id_counter: AtomicU64,
entity_map: HashMap<u64, Arc<NetworkEntity>>,
event_listener: Arc<dyn NetworkEventListener>,
xorpad: Option<&'static [u8; 4096]>,
xorpad_map: std::collections::HashMap<SocketAddr, &'static [u8; 4096]>,
}
impl NetworkEntityManager {
pub fn new(
event_listener: impl NetworkEventListener,
xorpad: Option<&'static [u8; 4096]>,
xorpad_map: std::collections::HashMap<SocketAddr, &'static [u8; 4096]>,
) -> Self {
Self {
entity_id_counter: AtomicU64::new(1),
entity_map: HashMap::new(),
event_listener: Arc::new(event_listener),
xorpad,
xorpad_map,
}
}
pub(crate) fn on_connect(&self, stream: TcpStream) {
pub(crate) fn on_connect(&self, stream: TcpStream, local_addr: SocketAddr) {
let id = self.entity_id_counter.fetch_add(1, Ordering::SeqCst);
let entity =
NetworkEntity::start(id, stream, Arc::clone(&self.event_listener), self.xorpad);
let entity = NetworkEntity::start(
id,
stream,
Arc::clone(&self.event_listener),
Some(local_addr),
self.xorpad_map.get(&local_addr).copied(),
);
let _ = self.entity_map.insert(id, Arc::new(entity));
}
@ -67,20 +72,28 @@ impl NetworkEntityManager {
}
impl NetworkServer {
async fn accept_loop(service: Arc<ServiceContext>, listener: TcpListener) {
async fn accept_loop(
service: Arc<ServiceContext>,
local_addr: SocketAddr,
listener: TcpListener,
) {
loop {
if let Ok((stream, _addr)) = listener.accept().await {
service.resolve::<NetworkEntityManager>().on_connect(stream);
service
.resolve::<NetworkEntityManager>()
.on_connect(stream, local_addr);
}
}
}
}
impl ConfigurableServiceModule for NetworkServer {
type Config = SocketAddr;
type Config = Vec<SocketAddr>;
fn new(_context: &crate::ServiceContext, config: Self::Config) -> Self {
Self { bind_addr: config }
Self {
bind_addresses: config,
}
}
}
@ -92,12 +105,15 @@ impl Startable for NetworkEntityManager {
impl ServiceModule for NetworkServer {
fn run(self: Arc<Self>, service: Arc<crate::ServiceContext>) -> Result<(), Box<dyn Error>> {
tokio::spawn(Self::accept_loop(
service,
net_util::tcp_bind_sync(self.bind_addr)?,
));
for &bind_addr in self.bind_addresses.iter() {
tokio::spawn(Self::accept_loop(
Arc::clone(&service),
bind_addr,
net_util::tcp_bind_sync(bind_addr)?,
));
debug!("server is listening at {}", self.bind_addr);
debug!("server is listening at {bind_addr}");
}
Ok(())
}

View file

@ -120,10 +120,10 @@ async fn handle_player_get_token(
let entity = scope.fetch::<Arc<NetworkEntity>>().unwrap();
entity.send(NetPacket::new(
PacketHead {
packet_id: 0,
session_id: head.session_id,
player_uid: head.player_uid,
ack_packet_id: head.packet_id,
..Default::default()
},
response,
));
@ -145,10 +145,10 @@ async fn handle_player_get_data(scope: &ServiceScope, head: PacketHead, request:
entity.send(NetPacket::new(
PacketHead {
packet_id: 0,
player_uid: head.player_uid,
session_id: head.session_id,
ack_packet_id: head.packet_id,
..Default::default()
},
PlayerGetDataRsp {
retcode: 0,

View file

@ -1,3 +1,5 @@
use std::collections::HashMap;
use config::ServerConfig;
use const_format::concatcp;
use database::DbConnection;
@ -40,9 +42,9 @@ async fn main() -> Result<(), StartupError> {
let service = ServiceContext::new()
.insert_module(db_connection)
.insert_module(NetworkEntityManager::new(listener, None))
.insert_module(NetworkEntityManager::new(listener, HashMap::new()))
.configure_module::<TokenVerificationModule>(config.auth)
.configure_module::<NetworkServer>(env.services.get(&SERVICE_TYPE).unwrap().addr)
.configure_module::<NetworkServer>(vec![env.services.get(&SERVICE_TYPE).unwrap().addr])
.start()?;
let _ = service_tx.send(service);

View file

@ -0,0 +1,158 @@
use std::collections::HashMap;
use tokio::sync::mpsc;
use vivian_logic::GameState;
use crate::{
handlers::{NetContext, NotifyQueue},
player::{ModelManager, Player},
resources::NapResources,
util,
};
use super::{ClusterCommand, ClusterPerformanceReport, PlayerCommandResult, PlayerUpdate};
#[derive(Default)]
struct ClusterState {
cluster_id: u32,
slots: HashMap<u32, PlayerSlot>,
report_tx: Option<mpsc::Sender<ClusterPerformanceReport>>,
player_update_tx: Option<tokio::sync::mpsc::Sender<PlayerUpdate>>,
}
struct PlayerSlot {
player: Player,
game_state: Option<GameState>,
}
pub fn logic_loop(
cluster_id: u32,
resources: &'static NapResources,
command_rx: std::sync::mpsc::Receiver<ClusterCommand>,
) {
let mut state = ClusterState {
cluster_id,
..Default::default()
};
while let Ok(command) = command_rx.recv() {
match command {
ClusterCommand::SetReportListener(tx) => {
state.report_tx = Some(tx);
}
ClusterCommand::SetPlayerUpdateListener(tx) => {
state.player_update_tx = Some(tx);
}
ClusterCommand::CreateSlot {
player_uid,
player_data,
} => {
let mut player = Player::load_from_pb(player_uid, *player_data, resources);
player.on_login();
state.slots.insert(
player_uid,
PlayerSlot {
player,
game_state: None,
},
);
send_performance_report(&state);
}
ClusterCommand::RemovePlayer { player_uid } => {
if let Some(mut slot) = state.slots.remove(&player_uid) {
if let Some(player_update_tx) = state.player_update_tx.as_ref() {
if let Some(mut state) = slot.game_state {
slot.player.save_scene_snapshot(&mut state);
let data = slot.player.build_full_update();
let _ = player_update_tx.blocking_send(PlayerUpdate {
uid: slot.player.uid,
data,
});
}
}
send_performance_report(&state);
}
}
ClusterCommand::PushPlayerCommand {
player_uid,
cmd_id,
body,
result_awaiter_tx,
} => {
if let Some(slot) = state.slots.get_mut(&player_uid) {
let mut context =
NetContext::new(&mut slot.player, &mut slot.game_state, resources);
crate::handlers::handle_command(&mut context, cmd_id, body);
enqueue_player_notifies(
context.player,
context.game_state.as_mut(),
&mut context.notifies,
);
let _ = result_awaiter_tx.send(PlayerCommandResult {
notifies: context.notifies.drain(),
response: context.response,
});
if let Some(player_update_tx) = state.player_update_tx.as_ref() {
if slot.player.is_any_model_modified() {
let data = slot.player.build_partial_update();
let _ = player_update_tx.blocking_send(PlayerUpdate {
uid: slot.player.uid,
data,
});
slot.player.changes_acknowledged();
}
}
}
}
ClusterCommand::PushGmCommand {
player_uid,
cmd,
result_awaiter_tx,
} => {
if let Some(slot) = state.slots.get_mut(&player_uid) {
util::gm_util::execute_gm_cmd(&mut slot.player, slot.game_state.as_mut(), cmd);
let mut queue = NotifyQueue::default();
enqueue_player_notifies(&slot.player, slot.game_state.as_mut(), &mut queue);
let _ = result_awaiter_tx.send(queue);
}
}
}
}
}
fn enqueue_player_notifies(
player: &Player,
state: Option<&mut GameState>,
queue: &mut NotifyQueue,
) {
if player.loading_finished() && player.has_models_to_synchronize() {
queue.prepend_notify(player.build_player_sync_notify());
// TODO: more generic way to send notifies from models?
player.avatar_model.send_add_avatar_notify(queue);
}
if let Some(state) = state {
state.flush_notifies(queue);
}
}
fn send_performance_report(state: &ClusterState) {
state.report_tx.as_ref().inspect(|tx| {
let _ = tx.blocking_send(ClusterPerformanceReport {
cluster_id: state.cluster_id,
player_slots: state.slots.len(),
});
});
}

View file

@ -8,9 +8,9 @@ use std::{
thread,
};
use tokio::sync::oneshot;
use tokio::{sync::oneshot, task::JoinSet};
use tracing::info;
use vivian_logic::{GameState, debug::GMCmd};
use vivian_logic::debug::GMCmd;
use vivian_proto::{
head::PacketHead,
server_only::{PlayerData, PlayerDataChangedNotify},
@ -20,19 +20,32 @@ use vivian_service::{
network::client::NetworkClient,
};
use crate::{
config::ClusterConfig,
handlers::NetContext,
player::{ModelManager, Player},
resources::NapResources,
util::gm_util,
};
use crate::{config::ClusterConfig, handlers::NotifyQueue, resources::NapResources};
mod logic_loop;
pub struct PlayerCommandResult {
pub notifies: Vec<(u16, Vec<u8>)>,
pub response: Option<(u16, Vec<u8>)>,
}
#[derive(Clone)]
pub struct PlayerLogicCluster {
#[expect(dead_code)]
pub id: u32,
command_tx: mpsc::Sender<ClusterCommand>,
}
pub struct PlayerLogicClusterManager {
cluster_map: HashMap<u32, PlayerLogicCluster>,
cluster_load_map: Arc<HashMap<u32, AtomicUsize>>,
}
pub struct LogicClusterConfig {
pub cluster: ClusterConfig,
pub resources: &'static NapResources,
}
enum ClusterCommand {
SetReportListener(tokio::sync::mpsc::Sender<ClusterPerformanceReport>),
SetPlayerUpdateListener(tokio::sync::mpsc::Sender<PlayerUpdate>),
@ -49,6 +62,7 @@ enum ClusterCommand {
PushGmCommand {
player_uid: u32,
cmd: GMCmd,
result_awaiter_tx: oneshot::Sender<NotifyQueue>,
},
RemovePlayer {
player_uid: u32,
@ -66,28 +80,6 @@ struct PlayerUpdate {
data: PlayerData,
}
#[derive(Clone)]
pub struct PlayerLogicCluster {
#[expect(dead_code)]
pub id: u32,
command_tx: mpsc::Sender<ClusterCommand>,
}
pub struct PlayerSlot {
pub player: Player,
pub game_state: Option<GameState>,
}
pub struct PlayerLogicClusterManager {
cluster_map: HashMap<u32, PlayerLogicCluster>,
cluster_load_map: Arc<HashMap<u32, AtomicUsize>>,
}
pub struct LogicClusterConfig {
pub cluster: ClusterConfig,
pub resources: &'static NapResources,
}
impl Startable for PlayerLogicClusterManager {
fn start(
&self,
@ -102,9 +94,16 @@ impl Startable for PlayerLogicClusterManager {
cluster.set_player_update_listener(player_update_tx.clone());
});
tokio::spawn(Self::player_update_receiver_loop(player_update_rx, service));
let mut join_set = JoinSet::new();
join_set.spawn(Self::player_update_receiver_loop(player_update_rx, service));
join_set.spawn(Self::report_receiver_loop(
report_rx,
Arc::clone(&self.cluster_load_map),
));
Self::report_receiver_loop(report_rx, Arc::clone(&self.cluster_load_map))
async {
let _ = join_set.join_all().await;
}
}
}
@ -180,7 +179,7 @@ impl PlayerLogicClusterManager {
impl PlayerLogicCluster {
pub fn spawn(id: u32, resources: &'static NapResources) -> Self {
let (tx, rx) = mpsc::channel();
thread::spawn(move || Self::logic_loop(id, resources, rx));
thread::spawn(move || logic_loop::logic_loop(id, resources, rx));
Self { id, command_tx: tx }
}
@ -220,10 +219,14 @@ impl PlayerLogicCluster {
rx.await.unwrap()
}
pub fn push_gm_command(&self, player_uid: u32, cmd: GMCmd) {
pub async fn push_gm_command(&self, player_uid: u32, cmd: GMCmd) -> NotifyQueue {
let (tx, rx) = oneshot::channel();
self.command_tx
.send(ClusterCommand::PushGmCommand { player_uid, cmd })
.send(ClusterCommand::PushGmCommand { player_uid, cmd, result_awaiter_tx: tx })
.unwrap();
rx.await.unwrap()
}
fn set_report_listener(&self, tx: tokio::sync::mpsc::Sender<ClusterPerformanceReport>) {
@ -237,123 +240,4 @@ impl PlayerLogicCluster {
.send(ClusterCommand::SetPlayerUpdateListener(tx))
.unwrap();
}
fn logic_loop(
cluster_id: u32,
resources: &'static NapResources,
command_rx: mpsc::Receiver<ClusterCommand>,
) {
let mut slots = HashMap::new();
let mut report_tx = None;
let mut player_update_tx = None;
while let Ok(command) = command_rx.recv() {
match command {
ClusterCommand::SetReportListener(tx) => {
report_tx = Some(tx);
}
ClusterCommand::SetPlayerUpdateListener(tx) => {
player_update_tx = Some(tx);
}
ClusterCommand::CreateSlot {
player_uid,
player_data,
} => {
let mut player = Player::load_from_pb(player_uid, *player_data, resources);
player.on_login();
slots.insert(
player_uid,
PlayerSlot {
player,
game_state: None,
},
);
report_tx.as_ref().inspect(|tx| {
let _ = tx.blocking_send(ClusterPerformanceReport {
cluster_id,
player_slots: slots.len(),
});
});
}
ClusterCommand::RemovePlayer { player_uid } => {
if let Some(mut slot) = slots.remove(&player_uid) {
if let Some(player_update_tx) = player_update_tx.as_ref() {
if let Some(mut state) = slot.game_state {
slot.player.save_scene_snapshot(&mut state);
let data = slot.player.build_full_update();
let _ = player_update_tx.blocking_send(PlayerUpdate {
uid: slot.player.uid,
data,
});
}
}
report_tx.as_ref().inspect(|tx| {
let _ = tx.blocking_send(ClusterPerformanceReport {
cluster_id,
player_slots: slots.len(),
});
});
}
}
ClusterCommand::PushPlayerCommand {
player_uid,
cmd_id,
body,
result_awaiter_tx,
} => {
if let Some(slot) = slots.get_mut(&player_uid) {
let mut context =
NetContext::new(&mut slot.player, &mut slot.game_state, resources);
super::handlers::handle_command(&mut context, cmd_id, body);
if context.player.loading_finished()
&& context.player.has_models_to_synchronize()
{
context
.notifies
.prepend_notify(context.player.build_player_sync_notify());
// TODO: more generic way to send notifies from models?
context
.player
.avatar_model
.send_add_avatar_notify(&mut context.notifies);
}
if let Some(state) = context.game_state.as_mut() {
state.flush_notifies(&mut context.notifies);
}
let _ = result_awaiter_tx.send(PlayerCommandResult {
notifies: context.notifies.drain(),
response: context.response,
});
if let Some(player_update_tx) = player_update_tx.as_ref() {
if slot.player.is_any_model_modified() {
let data = slot.player.build_partial_update();
let _ = player_update_tx.blocking_send(PlayerUpdate {
uid: slot.player.uid,
data,
});
slot.player.changes_acknowledged();
}
}
}
}
ClusterCommand::PushGmCommand { player_uid, cmd } => {
if let Some(slot) = slots.get_mut(&player_uid) {
gm_util::execute_gm_cmd(&mut slot.player, cmd);
}
}
}
}
}
}

View file

@ -1,4 +1,4 @@
use std::sync::OnceLock;
use std::{collections::HashMap, sync::OnceLock};
use cluster::{LogicClusterConfig, PlayerLogicClusterManager};
use common::logging::init_tracing;
@ -57,8 +57,8 @@ async fn main() -> Result<(), StartupError> {
let (service_tx, listener) = session::start_handler_task();
let service = ServiceContext::new()
.insert_module(NetworkEntityManager::new(listener, None))
.configure_module::<NetworkServer>(env_cfg.services.get(&SERVICE_TYPE).unwrap().addr)
.insert_module(NetworkEntityManager::new(listener, HashMap::new()))
.configure_module::<NetworkServer>(vec![env_cfg.services.get(&SERVICE_TYPE).unwrap().addr])
.configure_module::<NetworkClient>(env_cfg.services)
.configure_module::<PlayerLogicClusterManager>(LogicClusterConfig {
cluster: config.cluster,

View file

@ -161,7 +161,7 @@ impl Player {
gm_group
.commands
.iter()
.for_each(|cmd| gm_util::execute_gm_cmd(self, cmd.clone()));
.for_each(|cmd| gm_util::execute_gm_cmd(self, None, cmd.clone()));
});
}
@ -402,23 +402,25 @@ impl Player {
section_snapshot.already_executed_events.clone();
for snapshot in section_snapshot.event_snapshots.iter() {
if let Some(config) = self
.resources
.event_graphs
.get(snapshot.graph, section_snapshot.section_id)
{
if let Some(event) = config.events.get(&snapshot.ty) {
let mut event = Event::new(
snapshot.ty.clone(),
snapshot.tag,
snapshot.graph,
event,
);
if let Some(reference) = snapshot.graph_reference() {
if let Some(config) = self
.resources
.event_graphs
.get(reference, section_snapshot.section_id)
{
if let Some(event) = config.events.get(&snapshot.ty) {
let mut event = Event::new(
snapshot.ty.clone(),
snapshot.tag,
reference,
event,
);
event.state = snapshot.state;
event.cur_action_index = snapshot.cur_action_idx;
event.state = snapshot.state;
event.cur_action_index = snapshot.cur_action_idx;
state.running_events.insert(snapshot.uid, event);
state.running_events.insert(snapshot.uid, event);
}
}
}
}
@ -594,7 +596,8 @@ impl Player {
.iter()
.filter(|(_, event)| event.is_persistent())
.map(|(&uid, event)| EventSnapshot {
graph: event.graph,
graph_id: event.graph_id.0,
graph_type: event.graph_id.1,
ty: event.ty.clone(),
uid,
state: event.state,

View file

@ -11,8 +11,9 @@ use vivian_proto::{
PlayerLoginCsReq, PlayerLoginScRsp,
head::PacketHead,
server_only::{
ExecuteClientCommandReq, ExecuteClientCommandRsp, GmTalkByMuipReq, GmTalkByMuipRsp,
NetCommand, PlayerGetDataReq, PlayerGetDataRsp, StopPlayerLogicReq, StopPlayerLogicRsp,
ClientPerformNotify, ExecuteClientCommandReq, ExecuteClientCommandRsp, GmTalkByMuipReq,
GmTalkByMuipRsp, NetCommand, PlayerGetDataReq, PlayerGetDataRsp, StopPlayerLogicReq,
StopPlayerLogicRsp,
},
};
use vivian_service::{
@ -28,6 +29,7 @@ use vivian_service::{
pub struct PlayerSession {
pub player_uid: u32,
pub gate_session_id: u64,
pub cluster: PlayerLogicCluster,
}
@ -86,6 +88,7 @@ async fn handler_loop(
async fn process_cmd(service: Arc<ServiceContext>, entity_id: u64, packet: NetPacket) {
if let Some(entity) = service.resolve::<NetworkEntityManager>().get(entity_id) {
let scope = service.new_scope().with_variable(entity).build();
if let Err(err) = handle_cmd(scope.as_ref(), packet).await {
error!("failed to decode client cmd: {err}");
}
@ -150,6 +153,7 @@ async fn handle_player_login_cs_req(
head.player_uid,
Arc::new(PlayerSession {
player_uid: head.player_uid,
gate_session_id: head.gate_session_id,
cluster,
}),
);
@ -235,7 +239,32 @@ async fn handle_gm_talk_by_muip_req(
}
};
session.cluster.push_gm_command(session.player_uid, cmd);
let mut notifies = session
.cluster
.push_gm_command(session.player_uid, cmd)
.await;
scope
.resolve::<NetworkClient>()
.send_notify(
ServiceType::Gate,
PacketHead {
gate_session_id: session.gate_session_id,
player_uid: session.player_uid,
..Default::default()
},
ClientPerformNotify {
notify_list: notifies
.drain()
.into_iter()
.map(|(cmd_id, body)| NetCommand {
cmd_id: cmd_id as u32,
body,
})
.collect(),
},
)
.await;
GmTalkByMuipRsp {
retcode: 0,
@ -263,10 +292,10 @@ macro_rules! handlers {
let entity = scope.fetch::<Arc<NetworkEntity>>().unwrap();
entity.send(NetPacket::new(
PacketHead {
packet_id: 0,
player_uid: packet.head.player_uid,
session_id: packet.head.session_id,
ack_packet_id: packet.head.packet_id,
..Default::default()
},
response,
));

View file

@ -1,8 +1,10 @@
use std::cmp;
use config::{ActionSwitchSection, ConfigEvent, ConfigEventAction};
use itertools::Itertools;
use tracing::{error, instrument};
use vivian_logic::{
GameState,
debug::GMCmd,
item::{EAvatarSkillType, EquipItem},
};
@ -10,11 +12,11 @@ use vivian_models::SceneSnapshotExt;
use crate::{
player::{AddItemSource, Player},
util::{avatar_util, item_util},
util::{avatar_util, item_util, quest_util},
};
#[instrument(skip(player))]
pub fn execute_gm_cmd(player: &mut Player, cmd: GMCmd) {
#[instrument(skip(player, state))]
pub fn execute_gm_cmd(player: &mut Player, state: Option<&mut GameState>, cmd: GMCmd) {
use GMCmd::*;
match cmd {
@ -164,6 +166,21 @@ pub fn execute_gm_cmd(player: &mut Player, cmd: GMCmd) {
}
SetControlGuiseAvatar { avatar_id } => {
player.basic_model.control_guise_avatar_id.set(avatar_id);
if let Some(GameState::Hall(hall)) = state {
hall.control_guise_avatar_id = avatar_id;
hall.force_refresh();
}
}
UnlockHollowQuest { quest_id } => {
if player
.resources
.templates
.hollow_quest_template_tb()
.any(|q| q.id() == quest_id)
{
quest_util::add_hollow_quest(player, quest_id);
}
}
Jump {
section_id,
@ -177,6 +194,25 @@ pub fn execute_gm_cmd(player: &mut Player, cmd: GMCmd) {
if let SceneSnapshotExt::Hall(hall) = &mut default_scene.ext {
hall.cur_section_id = section_id;
player.main_city_model.transform_id.set(&transform_id);
if let Some(GameState::Hall(hall)) = state {
hall.execute_gm_event(
ConfigEvent {
id: 1337,
actions: vec![ConfigEventAction::ActionSwitchSection(
ActionSwitchSection {
id: 100,
section_id,
transform: transform_id,
camera_x: 6000,
camera_y: 0,
predicates: Vec::new(),
},
)],
},
player,
);
}
}
}
}

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, warn};
@ -6,7 +6,7 @@ use vivian_proto::{
KeepAliveNotify, NetCmd, PlayerGetTokenCsReq, PlayerGetTokenScRsp, PlayerLoginCsReq,
PlayerLoginScRsp, PlayerLogoutCsReq,
head::PacketHead,
server_only::{StopPlayerLogicReq, StopPlayerLogicRsp},
server_only::{ClientPerformNotify, StopPlayerLogicReq, StopPlayerLogicRsp},
};
use vivian_service::{
ServiceContext, ServiceScope,
@ -28,6 +28,8 @@ enum NetworkEvent {
Disconnect(u64),
}
struct InternalAddr(SocketAddr);
impl NetworkEventListener for NetworkListener {
fn on_receive(&self, entity_id: u64, packet: NetPacket) {
self.0
@ -41,19 +43,22 @@ impl NetworkEventListener for NetworkListener {
}
}
pub fn start_handler_task() -> (
pub fn start_handler_task(
internal_addr: SocketAddr,
) -> (
oneshot::Sender<Arc<ServiceContext>>,
impl NetworkEventListener,
) {
let (sv_tx, sv_rx) = oneshot::channel();
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(handler_loop(sv_rx, rx));
tokio::spawn(handler_loop(sv_rx, internal_addr, rx));
(sv_tx, NetworkListener(tx))
}
async fn handler_loop(
lazy_service: oneshot::Receiver<Arc<ServiceContext>>,
internal_addr: SocketAddr,
mut rx: mpsc::UnboundedReceiver<NetworkEvent>,
) {
let service = lazy_service.await.unwrap();
@ -61,7 +66,7 @@ async fn handler_loop(
while let Some(event) = rx.recv().await {
match event {
NetworkEvent::Receive(id, packet) => {
tokio::spawn(process_cmd(Arc::clone(&service), id, packet));
tokio::spawn(process_cmd(Arc::clone(&service), id, internal_addr, packet));
}
NetworkEvent::Disconnect(id) => {
tokio::spawn(process_disconnect(Arc::clone(&service), id));
@ -70,10 +75,20 @@ async fn handler_loop(
}
}
async fn process_cmd(service: Arc<ServiceContext>, entity_id: u64, mut packet: NetPacket) {
async fn process_cmd(
service: Arc<ServiceContext>,
entity_id: u64,
internal_addr: SocketAddr,
mut packet: NetPacket,
) {
if let Some(entity) = service.resolve::<NetworkEntityManager>().get(entity_id) {
packet.head.session_id = entity_id;
let scope = service.new_scope().with_variable(entity).build();
let scope = service
.new_scope()
.with_variable(InternalAddr(internal_addr))
.with_variable(entity)
.build();
if let Err(err) = handle_cmd(scope.as_ref(), packet).await {
error!("failed to decode client cmd: {err}");
}
@ -93,10 +108,9 @@ async fn process_disconnect(service: Arc<ServiceContext>, entity_id: u64) {
.send_request::<_, StopPlayerLogicRsp>(
ServiceType::Game,
PacketHead {
packet_id: 0,
player_uid: session.uid(),
session_id: entity_id,
ack_packet_id: 0,
..Default::default()
},
StopPlayerLogicReq {
player_uid: session.uid(),
@ -133,6 +147,9 @@ async fn handle_cmd(scope: &ServiceScope, packet: NetPacket) -> Result<(), GetPr
handle_player_logout(scope, packet.head, packet.get_proto()?).await
}
KeepAliveNotify::CMD_ID => (),
ClientPerformNotify::CMD_ID => {
handle_client_perform(scope, packet.head, packet.get_proto()?).await
}
cmd_id if cmd_id < 10000 => {
if let Some(session) = scope
.resolve::<PlayerSessionManager>()
@ -249,6 +266,7 @@ async fn handle_player_login(scope: &ServiceScope, head: PacketHead, request: Pl
PacketHead {
player_uid: session.uid(),
session_id: head.session_id,
gate_session_id: head.session_id,
..Default::default()
},
request,
@ -275,6 +293,7 @@ async fn handle_player_login(scope: &ServiceScope, head: PacketHead, request: Pl
player_uid: session.uid(),
session_id: head.session_id,
ack_packet_id: head.packet_id,
..Default::default()
},
rsp,
));
@ -290,10 +309,9 @@ async fn handle_player_logout(scope: &ServiceScope, head: PacketHead, _request:
.send_request::<_, StopPlayerLogicRsp>(
ServiceType::Game,
PacketHead {
packet_id: 0,
player_uid: session.uid(),
session_id: head.session_id,
ack_packet_id: 0,
gate_session_id: head.session_id,
..Default::default()
},
StopPlayerLogicReq {
player_uid: session.uid(),
@ -305,3 +323,33 @@ async fn handle_player_logout(scope: &ServiceScope, head: PacketHead, _request:
}
}
}
async fn handle_client_perform(
scope: &ServiceScope,
head: PacketHead,
notify: ClientPerformNotify,
) {
let InternalAddr(internal_addr) = scope.fetch().unwrap();
let entity = scope.fetch::<Arc<NetworkEntity>>().unwrap();
if Some(internal_addr) != entity.local_addr.as_ref() {
warn!("received server-only packet from client!");
return;
}
if let Some(session) = scope.resolve::<PlayerSessionManager>().get(head.gate_session_id) {
debug!("pushing notifies: {:?}", notify.notify_list);
session
.push_notifies(
notify
.notify_list
.into_iter()
.map(|notify| (notify.cmd_id as u16, notify.body))
.collect(),
)
.await;
} else {
debug!("no session with id {}", head.session_id);
}
}

View file

@ -1,4 +1,4 @@
use std::sync::LazyLock;
use std::{collections::HashMap, sync::LazyLock};
use config::ServerConfig;
use const_format::concatcp;
@ -6,7 +6,7 @@ use encryption::SecurityModule;
use session::PlayerSessionManager;
use vivian_service::{
ServiceContext, ServiceError,
config::load_environment_config,
config::{ServiceType, load_environment_config},
network::{NetworkEntityManager, NetworkServer, client::NetworkClient},
};
@ -15,6 +15,7 @@ mod encryption;
mod handlers;
mod session;
const SERVICE_TYPE: ServiceType = ServiceType::Gate;
const CONFIG_DIR: &str = "config/20-gate-server/";
#[tokio::main]
@ -27,18 +28,19 @@ async fn main() -> Result<(), ServiceError> {
});
let env_cfg = load_environment_config();
let internal_addr = env_cfg.services.get(&SERVICE_TYPE).unwrap().addr;
common::logging::init_tracing(tracing::Level::DEBUG);
let (service_tx, listener) = handlers::start_handler_task();
let (service_tx, listener) = handlers::start_handler_task(internal_addr);
let service = ServiceContext::new()
.insert_module(NetworkEntityManager::new(
listener,
Some(&CONFIG.client_secret_key.xorpad),
HashMap::from([(CONFIG.bind_addr, &CONFIG.client_secret_key.xorpad)]),
))
.with_module::<PlayerSessionManager>()
.configure_module::<SecurityModule>(&CONFIG.rsa_versions)
.configure_module::<NetworkServer>(CONFIG.bind_addr)
.configure_module::<NetworkServer>(vec![CONFIG.bind_addr, internal_addr])
.configure_module::<NetworkClient>(env_cfg.services)
.start()?;

View file

@ -24,7 +24,12 @@ pub struct PlayerSession {
login_failed_status: OnceLock<i32>,
is_logged_in: OnceLock<()>,
is_logged_out: OnceLock<()>,
packet_tx: OnceLock<mpsc::Sender<NetPacket>>,
packet_tx: OnceLock<mpsc::Sender<QueueItem>>,
}
enum QueueItem {
FromClient(NetPacket),
FromServer(Vec<(u16, Vec<u8>)>),
}
impl PlayerSessionManager {
@ -59,12 +64,18 @@ impl PlayerSession {
pub async fn enqueue(&self, packet: NetPacket) {
if let Some(tx) = self.packet_tx.get() {
let _ = tx.send(packet).await;
let _ = tx.send(QueueItem::FromClient(packet)).await;
}
}
pub async fn push_notifies(&self, notifies: Vec<(u16, Vec<u8>)>) {
if let Some(tx) = self.packet_tx.get() {
let _ = tx.send(QueueItem::FromServer(notifies)).await;
}
}
async fn packet_handler_loop(
mut rx: mpsc::Receiver<NetPacket>,
mut rx: mpsc::Receiver<QueueItem>,
uid: u32,
ctx: Arc<ServiceContext>,
network_entity: Arc<NetworkEntity>,
@ -77,6 +88,26 @@ impl PlayerSession {
let mut buffered_requests = HashMap::with_capacity(BUFFERED_REQUESTS_LIMIT);
while let Some(packet) = rx.recv().await {
let packet = match packet {
QueueItem::FromClient(packet) => packet,
QueueItem::FromServer(packets) => {
packets.into_iter().for_each(|(cmd_id, body)| {
last_server_packet_id += 1;
network_entity.send(NetPacket {
cmd_id,
body,
head: PacketHead {
packet_id: last_server_packet_id,
player_uid: uid,
..Default::default()
},
});
});
continue;
}
};
let packet_id = packet.head.packet_id;
if packet_id != 0 && packet_id != (last_client_packet_id + 1) {
@ -130,10 +161,9 @@ impl PlayerSession {
.send_request::<_, ExecuteClientCommandRsp>(
ServiceType::Game,
PacketHead {
packet_id: 0,
player_uid: uid,
session_id: packet.head.session_id,
ack_packet_id: 0,
..Default::default()
},
ExecuteClientCommandReq {
command: Some(NetCommand {