Compare commits

...

1 commit

Author SHA1 Message Date
dca8dfa93c Rewrite server networking
Remove qwer-rpc and implement new efficient protocol on top of ZeroMQ. This may fix random disconnects. NOTE: you'll have to reset database and remove existing config files, format was changed
2025-01-02 14:36:01 +03:00
62 changed files with 2074 additions and 2674 deletions

127
Cargo.lock generated
View file

@ -379,6 +379,19 @@ dependencies = [
"rustc_version",
]
[[package]]
name = "asynchronous-codec"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233"
dependencies = [
"bytes",
"futures-sink",
"futures-util",
"memchr",
"pin-project-lite",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@ -932,6 +945,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -1241,6 +1263,14 @@ dependencies = [
"tracing",
]
[[package]]
name = "evelyn-encoding"
version = "0.1.0"
dependencies = [
"byteorder",
"paste",
]
[[package]]
name = "evelyn-encryption"
version = "0.0.1"
@ -1273,10 +1303,10 @@ dependencies = [
"evelyn-data",
"evelyn-eventgraph",
"evelyn-http-client",
"evelyn-network",
"paste",
"protocol",
"qwer",
"qwer-rpc",
"rbase64",
"serde",
"serde_json",
@ -1295,13 +1325,13 @@ dependencies = [
"common",
"evelyn-encryption",
"evelyn-http-client",
"evelyn-network",
"evelyn-proto",
"hex",
"kcp",
"paste",
"protocol",
"qwer",
"qwer-rpc",
"rand",
"rbase64",
"serde",
@ -1323,6 +1353,22 @@ dependencies = [
"ureq",
]
[[package]]
name = "evelyn-network"
version = "0.0.1"
dependencies = [
"byteorder",
"evelyn-codegen",
"evelyn-encoding",
"futures",
"iter-read",
"num_enum",
"serde",
"tokio",
"tracing",
"zeromq",
]
[[package]]
name = "evelyn-proto"
version = "0.0.1"
@ -1382,9 +1428,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flatbuffers"
version = "24.3.25"
version = "24.12.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f"
checksum = "4f1baf0dbf96932ec9a3038d57900329c015b0bfb7b63d904f3bc27e2b02a096"
dependencies = [
"bitflags 1.3.2",
"rustc_version",
@ -1951,6 +1997,12 @@ version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "iter-read"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071ed4cc1afd86650602c7b11aa2e1ce30762a1c27193201cb5cee9c6ebb1294"
[[package]]
name = "itertools"
version = "0.10.5"
@ -3007,19 +3059,6 @@ dependencies = [
"qwer-derive",
]
[[package]]
name = "qwer-client-example"
version = "0.1.0"
dependencies = [
"anyhow",
"common",
"protocol",
"qwer",
"qwer-rpc",
"tokio",
"tracing",
]
[[package]]
name = "qwer-derive"
version = "0.0.1"
@ -3030,33 +3069,6 @@ dependencies = [
"syn 2.0.87",
]
[[package]]
name = "qwer-rpc"
version = "0.0.1"
dependencies = [
"byteorder",
"common",
"dashmap 6.1.0",
"futures",
"qwer",
"thiserror 2.0.0",
"tokio",
"tracing",
]
[[package]]
name = "qwer-server-example"
version = "0.1.0"
dependencies = [
"anyhow",
"common",
"protocol",
"qwer",
"qwer-rpc",
"tokio",
"tracing",
]
[[package]]
name = "radium"
version = "0.7.0"
@ -5146,3 +5158,30 @@ name = "zeroize"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
[[package]]
name = "zeromq"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a4528179201f6eecf211961a7d3276faa61554c82651ecc66387f68fc3004bd"
dependencies = [
"async-trait",
"asynchronous-codec",
"bytes",
"crossbeam-queue",
"dashmap 5.5.3",
"futures-channel",
"futures-io",
"futures-task",
"futures-util",
"log",
"num-traits",
"once_cell",
"parking_lot",
"rand",
"regex",
"thiserror 1.0.64",
"tokio",
"tokio-util",
"uuid",
]

View file

@ -1,5 +1,5 @@
[workspace]
members = ["crates/*", "crates/gate-server/kcp", "crates/evelyn-proto/evelyn-proto-derive", "crates/qwer/qwer-derive", "crates/protocol/protocol-macros", "crates/qwer-rpc/qwer-server-example", "crates/qwer-rpc/qwer-client-example", "crates/evelyn-data/blockfile"]
members = ["crates/*", "crates/gate-server/kcp", "crates/evelyn-proto/evelyn-proto-derive", "crates/evelyn-network/evelyn-encoding", "crates/qwer/qwer-derive", "crates/protocol/protocol-macros", "crates/evelyn-data/blockfile"]
resolver = "2"
[workspace.package]
@ -11,6 +11,7 @@ tokio = { version = "1.40.0", features = ["full"] }
futures = "0.3.31"
axum = { version = "0.7.6" }
axum-server = "0.7.1"
zeromq = { version = "0.4.1", features = ["tokio-runtime", "tcp-transport"] }
# Http
ureq = "2.10.1"
@ -25,7 +26,7 @@ rbase64 = "2.0.3"
hex = "0.4.3"
# Flatbuffers
flatbuffers = "24.3.25"
flatbuffers = "24.12.23"
flatc-rust = "0.2.0"
# Protobuf
@ -54,6 +55,7 @@ pbkdf2 = { version = "0.12.2", features = ["simple"] }
paste = "1.0.15"
const_format = "0.2.33"
num_enum = "0.7.3"
iter-read = "1.1.0"
# Tracing
tracing = "0.1.40"
@ -62,10 +64,11 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
# Internal
common = { path = "crates/common" }
qwer = { path = "crates/qwer" }
qwer-rpc = { path = "crates/qwer-rpc" }
qwer-derive = { path = "crates/qwer/qwer-derive" }
protocol = { path = "crates/protocol" }
evelyn-data = { path = "crates/evelyn-data" }
evelyn-encoding = { path = "crates/evelyn-network/evelyn-encoding" }
evelyn-network = { path = "crates/evelyn-network" }
evelyn-eventgraph = { path = "crates/evelyn-eventgraph" }
evelyn-proto = { path = "crates/evelyn-proto" }
evelyn-encryption = { path = "crates/evelyn-encryption" }

View file

@ -50,5 +50,5 @@ Your support for this project is greatly appreciated! If you'd like to contribut
## Friendly reminder
The server is in a very early state. Right now, it's NOT recommended to run this on a production environment. Please don't open issues about missing features, I'm well aware of this.
## Sanity
If you want to lose your sanity, consider checking [this](crates/qwer-rpc/src/)
## CompSoy
[Functional programming in Rust](crates/evelyn-network/evelyn-encoding/src/lib.rs#L140)

View file

@ -1,8 +1,13 @@
{
"services": [
"servers": [
{
"type": "game-server",
"id": 0,
"server_type": "gate-server",
"server_id": 0,
"addr": "127.0.0.1:10100"
},
{
"server_type": "game-server",
"server_id": 0,
"addr": "127.0.0.1:10101"
}
]

View file

@ -1,35 +0,0 @@
use std::net::SocketAddr;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy)]
pub enum ServiceType {
#[serde(rename = "dispatch")]
Dispatch,
#[serde(rename = "gate-server")]
GateServer,
#[serde(rename = "game-server")]
GameServer,
}
#[derive(Serialize, Deserialize)]
pub struct ServiceConfiguration {
#[serde(rename = "type")]
pub ty: ServiceType,
pub id: u32,
pub addr: SocketAddr,
}
#[derive(Serialize, Deserialize)]
pub struct EnvironmentConfiguration {
pub services: Vec<ServiceConfiguration>,
}
impl EnvironmentConfiguration {
pub fn get_server_end_point(&self, ty: ServiceType, id: u32) -> Option<SocketAddr> {
self.services
.iter()
.find(|svc| svc.ty == ty && svc.id == id)
.map(|svc| svc.addr)
}
}

View file

@ -3,12 +3,10 @@ pub use local_toml::{DatabaseSettings, TomlConfig};
mod app;
mod encryption;
mod environment;
mod server_list;
pub use app::*;
pub use encryption::*;
pub use environment::*;
use serde::{Deserialize, Deserializer};
pub use server_list::*;

View file

@ -0,0 +1,35 @@
use proc_macro2::TokenStream;
use quote::quote;
use syn::{parse_macro_input, Data, DeriveInput, Fields};
pub fn impl_decodeable(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
let input = parse_macro_input!(item as DeriveInput);
let Data::Struct(st) = &input.data else {
panic!("#[derive(Decodeable)] only supports structures.");
};
let Fields::Named(fields) = &st.fields else {
panic!("#[derive(Decodeable)] only supports structures with named fields");
};
let mut decode_calls = TokenStream::new();
for field in fields.named.iter() {
let name = field.ident.as_ref().unwrap();
decode_calls.extend(quote! {
#name: ::evelyn_encoding::Decodeable::decode(r)?,
});
}
let struct_name = &input.ident;
quote! {
impl ::evelyn_encoding::Decodeable for #struct_name {
fn decode<R: ::std::io::Read>(r: &mut R) -> ::std::io::Result<Self> {
Ok(Self {
#decode_calls
})
}
}
}
.into()
}

View file

@ -0,0 +1,45 @@
use proc_macro2::TokenStream;
use quote::quote;
use syn::{parse_macro_input, Data, DeriveInput, Fields};
pub fn impl_encodeable(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
let input = parse_macro_input!(item as DeriveInput);
let Data::Struct(st) = &input.data else {
panic!("#[derive(Encodeable)] only supports structures.");
};
let Fields::Named(fields) = &st.fields else {
panic!("#[derive(Encodeable)] only supports structures with named fields");
};
let mut encode_calls = TokenStream::new();
let mut encoding_length_calls = TokenStream::new();
for field in fields.named.iter() {
let name = field.ident.as_ref().unwrap();
encode_calls.extend(quote! {
::evelyn_encoding::Encodeable::encode(&self.#name, w)?;
});
encoding_length_calls.extend(quote! {
length += self.#name.encoding_length();
});
}
let struct_name = &input.ident;
quote! {
impl ::evelyn_encoding::Encodeable for #struct_name {
fn encode<W: ::std::io::Write>(&self, w: &mut W) -> ::std::io::Result<()> {
#encode_calls
Ok(())
}
fn encoding_length(&self) -> usize {
let mut length = 0;
#encoding_length_calls
length
}
}
}
.into()
}

View file

@ -1,8 +1,20 @@
use proc_macro::TokenStream;
mod decodeable;
mod encodeable;
mod rpc;
#[proc_macro_attribute]
pub fn handlers(_attr: TokenStream, input: TokenStream) -> TokenStream {
rpc::impl_handlers_module(input.into()).into()
}
#[proc_macro_derive(Encodeable)]
pub fn derive_encodeable(item: TokenStream) -> TokenStream {
encodeable::impl_encodeable(item)
}
#[proc_macro_derive(Decodeable)]
pub fn derive_decodeable(item: TokenStream) -> TokenStream {
decodeable::impl_decodeable(item)
}

View file

@ -12,7 +12,7 @@ pub fn impl_handlers_module(input: proc_macro::TokenStream) -> proc_macro::Token
let mut output_module = TokenStream::new();
let mut output_match_arms = TokenStream::new();
let mut rpc_registers = TokenStream::new();
let mut supports_message_cmp = TokenStream::new();
for item in items.iter() {
let Item::Fn(func) = item else {
@ -58,8 +58,10 @@ pub fn impl_handlers_module(input: proc_macro::TokenStream) -> proc_macro::Token
.to_string()
};
rpc_registers.extend(quote! {
point.register_rpc_recv(#rpc_arg_type::PROTOCOL_ID, process_rpc);
supports_message_cmp.extend(quote! {
if cmd_id == #rpc_arg_type::PROTOCOL_ID {
return true;
}
});
let rpc_base_ident = Ident::new(&rpc_base_name, Span::call_site());
@ -68,39 +70,46 @@ pub fn impl_handlers_module(input: proc_macro::TokenStream) -> proc_macro::Token
output_match_arms.extend(quote! {
#rpc_arg_type::PROTOCOL_ID => {
let Ok(arg) = rpc_ptc.get_arg::<#rpc_arg_type>() else {
return;
let Ok(arg) = #rpc_arg_type::unmarshal_from(&mut ::std::io::Cursor::new(&blob), 0) else {
return (None, Vec::with_capacity(0));
};
let mut ctx = NetworkContext {
arg,
rpc_ptc,
notifies: Vec::new(),
session: &mut session,
globals: ::common::util::Ptr::Static(crate::GLOBALS.get().unwrap()),
};
match #mod_name::#name(&mut ctx).await {
Ok(ret) => ctx.rpc_ptc.send_ret(ret).await,
Err(retcode) => ctx.rpc_ptc.send_ret(#rpc_ret_type {
let ret = match #mod_name::#name(&mut ctx).await {
Ok(ret) => ret,
Err(retcode) => #rpc_ret_type {
retcode,
..Default::default()
}).await
}
}
};
::tracing::info!("successfully handled {}", stringify!(#rpc_base_ident));
super::post_rpc_handle(ctx.session).await;
let mut blob = Vec::new();
ret.marshal_to(&mut ::std::io::Cursor::new(&mut blob), 0).unwrap();
(Some(::evelyn_network::message::ProtocolUnit {
cmd_id: ret.get_protocol_id(),
blob,
}), ctx.notifies)
}
});
} else if rpc_base_name.starts_with("Ptc") {
output_match_arms.extend(quote! {
#rpc_arg_type::PROTOCOL_ID => {
let Ok(arg) = rpc_ptc.get_arg::<#rpc_arg_type>() else {
return;
let Ok(arg) = #rpc_arg_type::unmarshal_from(&mut ::std::io::Cursor::new(&blob), 0) else {
return (None, Vec::with_capacity(0));
};
let mut ctx = NetworkContext {
arg,
rpc_ptc,
notifies: Vec::new(),
session: &mut session,
globals: ::common::util::Ptr::Static(crate::GLOBALS.get().unwrap()),
};
@ -108,48 +117,43 @@ pub fn impl_handlers_module(input: proc_macro::TokenStream) -> proc_macro::Token
#mod_name::#name(&mut ctx).await;
::tracing::info!("successfully handled {}", stringify!(#rpc_base_ident));
super::post_rpc_handle(ctx.session).await;
(None, ctx.notifies)
}
});
}
}
quote! {
pub fn register_handlers(point: &::qwer_rpc::RpcPtcPoint) {
pub fn supports_message(cmd_id: u16) -> bool {
use ::protocol::*;
use ::qwer::ProtocolID;
#rpc_registers
#supports_message_cmp
false
}
pub async fn process_rpc(rpc_ptc: qwer_rpc::RpcPtcContext) {
pub async fn handle_message(cmd_id: u16, blob: &[u8], session_id: u32) -> (Option<::evelyn_network::message::ProtocolUnit>, Vec<::evelyn_network::message::ProtocolUnit>) {
use ::protocol::*;
use ::qwer::ProtocolID;
use crate::rpc_ptc::*;
use ::qwer::OctData;
use super::NetworkContext;
let Some(::qwer_rpc::middleware::MiddlewareModel::Account(account_mw)) = rpc_ptc
.middleware_list
.iter()
.find(|&mw| matches!(mw, ::qwer_rpc::middleware::MiddlewareModel::Account(_)))
else {
::tracing::warn!("failed to handle rpc: account middleware is missing");
return;
let Some(mut session) = crate::SESSION_MAP.get_mut(&session_id) else {
::tracing::warn!("failed to handle rpc: session id {} is not active", session_id);
return (None, Vec::with_capacity(0));
};
let Some(mut session) = crate::PLAYER_MAP.get_mut(&account_mw.player_uid) else {
::tracing::warn!("failed to handle rpc: player session with uid {} is not active", account_mw.player_uid);
return;
};
match rpc_ptc.protocol_id {
match cmd_id {
#output_match_arms
_ => (),
_ => (None, Vec::with_capacity(0)),
}
}
mod #mod_name {
#output_module
}
}.into()
}
.into()
}
fn get_underlying_type(ty: &Type) -> &Type {

View file

@ -46,9 +46,13 @@ impl<'b> flatbuffers::Push for Property {
type Output = Property;
#[inline]
unsafe fn push(&self, dst: &mut [u8], _written_len: usize) {
let src = ::core::slice::from_raw_parts(self as *const Property as *const u8, Self::size());
let src = ::core::slice::from_raw_parts(self as *const Property as *const u8, <Self as flatbuffers::Push>::size());
dst.copy_from_slice(src);
}
#[inline]
fn alignment() -> flatbuffers::PushAlignment {
flatbuffers::PushAlignment::new(4)
}
}
impl<'a> flatbuffers::Verifiable for Property {
@ -170,9 +174,13 @@ impl<'b> flatbuffers::Push for RefineCost {
type Output = RefineCost;
#[inline]
unsafe fn push(&self, dst: &mut [u8], _written_len: usize) {
let src = ::core::slice::from_raw_parts(self as *const RefineCost as *const u8, Self::size());
let src = ::core::slice::from_raw_parts(self as *const RefineCost as *const u8, <Self as flatbuffers::Push>::size());
dst.copy_from_slice(src);
}
#[inline]
fn alignment() -> flatbuffers::PushAlignment {
flatbuffers::PushAlignment::new(4)
}
}
impl<'a> flatbuffers::Verifiable for RefineCost {

View file

@ -0,0 +1,19 @@
[package]
name = "evelyn-network"
edition = "2021"
version.workspace = true
[dependencies]
tokio.workspace = true
futures.workspace = true
zeromq.workspace = true
serde.workspace = true
tracing.workspace = true
byteorder.workspace = true
iter-read.workspace = true
num_enum.workspace = true
evelyn-encoding.workspace = true
evelyn-codegen.workspace = true

View file

@ -0,0 +1,8 @@
[package]
name = "evelyn-encoding"
version = "0.1.0"
edition = "2021"
[dependencies]
paste.workspace = true
byteorder.workspace = true

View file

@ -0,0 +1,163 @@
use byteorder::{ReadBytesExt, WriteBytesExt, BE};
use std::io::{self, Read, Write};
pub trait Encodeable {
fn encode<W: Write>(&self, w: &mut W) -> io::Result<()>;
fn encoding_length(&self) -> usize;
}
pub trait Decodeable: Sized {
fn decode<R: Read>(r: &mut R) -> io::Result<Self>;
}
macro_rules! impl_primitives {
($($ty:ident),*) => {
$(impl Encodeable for $ty {
fn encode<W: Write>(&self, w: &mut W) -> io::Result<()> {
paste::paste!(w.[<write_ $ty>]::<BE>(*self))
}
fn encoding_length(&self) -> usize {
$ty::BITS as usize / 8
}
}
impl Decodeable for $ty {
fn decode<R: Read>(r: &mut R) -> io::Result<Self> {
paste::paste!(r.[<read_ $ty>]::<BE>())
}
})*
};
}
impl_primitives! {
u16, u32, u64,
i16, i32, i64
}
// u8
impl Encodeable for u8 {
fn encode<W: Write>(&self, w: &mut W) -> io::Result<()> {
w.write_u8(*self)
}
fn encoding_length(&self) -> usize {
1
}
}
impl Decodeable for u8 {
fn decode<R: Read>(r: &mut R) -> io::Result<Self> {
r.read_u8()
}
}
// i8
impl Encodeable for i8 {
fn encode<W: Write>(&self, w: &mut W) -> io::Result<()> {
w.write_i8(*self)
}
fn encoding_length(&self) -> usize {
1
}
}
impl Decodeable for i8 {
fn decode<R: Read>(r: &mut R) -> io::Result<Self> {
r.read_i8()
}
}
// bool
impl Encodeable for bool {
fn encode<W: Write>(&self, w: &mut W) -> io::Result<()> {
(*self as u8).encode(w)
}
fn encoding_length(&self) -> usize {
1
}
}
impl Decodeable for bool {
fn decode<R: Read>(r: &mut R) -> io::Result<Self> {
Ok(u8::decode(r)? != 0)
}
}
// String
impl Encodeable for String {
fn encode<W: Write>(&self, w: &mut W) -> io::Result<()> {
(self.len() as u32).encode(w)?;
w.write_all(self.as_bytes())
}
fn encoding_length(&self) -> usize {
4 + self.len()
}
}
impl Decodeable for String {
fn decode<R: Read>(r: &mut R) -> io::Result<Self> {
let length = u32::decode(r)?;
let mut data = vec![0u8; length as usize];
r.read_exact(&mut data)?;
// TODO: maybe we want custom error type to combine io error and this one?
Ok(String::from_utf8(data).expect("failed to decode string as UTF-8"))
}
}
// Vec<T>
impl<T> Encodeable for Vec<T>
where
T: Encodeable,
{
fn encode<W: Write>(&self, w: &mut W) -> io::Result<()> {
(self.len() as u32).encode(w)?;
self.iter().try_for_each(|item| item.encode(w))
}
fn encoding_length(&self) -> usize {
self.iter()
.fold(4, |length, item| length + item.encoding_length())
}
}
impl<T> Decodeable for Vec<T>
where
T: Decodeable,
{
fn decode<R: Read>(r: &mut R) -> io::Result<Self> {
(0..u32::decode(r)?)
.map(|_| T::decode(r))
.collect::<Result<Vec<_>, _>>()
}
}
// Option<T>
impl<T> Encodeable for Option<T>
where
T: Encodeable,
{
fn encode<W: Write>(&self, w: &mut W) -> io::Result<()> {
self.is_some().encode(w)?;
self.as_ref().map(|item| item.encode(w)).unwrap_or(Ok(()))
}
fn encoding_length(&self) -> usize {
self.as_ref()
.iter()
.fold(1, |length, item| length + item.encoding_length())
}
}
impl<T> Decodeable for Option<T>
where
T: Decodeable,
{
fn decode<R: Read>(r: &mut R) -> io::Result<Self> {
(bool::decode(r)?).then(|| T::decode(r)).transpose()
}
}

View file

@ -0,0 +1,22 @@
use std::net::SocketAddr;
use serde::Deserialize;
use crate::{ServerID, ServerType};
#[derive(Debug, Deserialize)]
pub struct ServerEnvironmentConfiguration {
pub servers: Vec<ServerConfigurationEntry>,
}
#[derive(Debug, Deserialize)]
pub struct ServerConfigurationEntry {
pub addr: SocketAddr,
pub server_type: ServerType,
pub server_id: ServerID,
}
#[derive(Debug, Deserialize)]
pub struct ServerNodeConfiguration {
pub server_id: ServerID,
}

View file

@ -0,0 +1,97 @@
use std::{collections::HashMap, io::Cursor, net::SocketAddr};
use config::ServerConfigurationEntry;
use evelyn_encoding::Encodeable;
use listener::RecvCallback;
use message::{Header, NetworkPacket, WithOpcode};
use serde::Deserialize;
use socket::ServerSocket;
use tokio::task::JoinHandle;
use tracing::error;
use zeromq::ZmqError;
pub mod config;
mod listener;
pub mod message;
pub mod socket;
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum ServerType {
GateServer = 1,
GameServer = 2,
}
pub type ServerID = u32;
pub struct ServerNetworkManager {
pub own_server_type: ServerType,
pub own_server_id: ServerID,
pub own_socket_addr: SocketAddr,
pub server_sockets: HashMap<(ServerType, u32), ServerSocket>,
}
impl ServerNetworkManager {
pub fn new(
server_type: ServerType,
server_id: ServerID,
server_list: &[ServerConfigurationEntry],
) -> Self {
let Some(own_entry) = server_list
.iter()
.find(|entry| entry.server_type == server_type && entry.server_id == server_id)
else {
panic!("can't find this instance ({server_type:?}-{server_id}) in server list")
};
Self {
own_server_type: server_type,
own_server_id: server_id,
own_socket_addr: own_entry.addr,
server_sockets: server_list
.iter()
.filter(|entry| entry.server_type != server_type || entry.server_id != server_id)
.map(|entry| {
(
(entry.server_type, entry.server_id),
ServerSocket::new(entry.addr),
)
})
.collect(),
}
}
pub async fn send_to<M: Encodeable + WithOpcode>(
&self,
server_type: ServerType,
server_id: ServerID,
message: M,
) {
if let Some(socket) = self.server_sockets.get(&(server_type, server_id)) {
let mut payload = Vec::with_capacity(message.encoding_length());
message.encode(&mut Cursor::new(&mut payload)).unwrap();
socket
.send(NetworkPacket {
opcode: message.opcode(),
header: Header {
sender_type: self.own_server_type as u8,
sender_id: self.own_server_id,
},
payload,
})
.await;
} else {
error!("ServerSocket for {server_type:?}-{server_id} does not exist");
}
}
pub async fn start_listener<S: Send + Sync + Clone + 'static>(
&self,
state: S,
callback: impl RecvCallback<S> + 'static,
) -> Result<JoinHandle<()>, ZmqError> {
listener::listen(self.own_socket_addr, state, callback).await
}
}

View file

@ -0,0 +1,60 @@
use std::{future::Future, net::SocketAddr};
use evelyn_encoding::Decodeable;
use futures::future::BoxFuture;
use tokio::task::JoinHandle;
use tracing::{error, warn};
use zeromq::{prelude::*, PullSocket, ZmqError};
use crate::message::NetworkPacket;
pub trait RecvCallback<S>: Send + Sync {
fn call(&self, state: S, data: NetworkPacket) -> BoxFuture<'static, ()>;
}
impl<T, F, S> RecvCallback<S> for T
where
T: Fn(S, NetworkPacket) -> F + Send + Sync,
F: Future<Output = ()> + 'static + Send + Sync,
S: Send + Sync,
{
fn call(&self, state: S, data: NetworkPacket) -> BoxFuture<'static, ()> {
Box::pin(self(state, data))
}
}
pub async fn listen<S: Send + Sync + Clone + 'static>(
addr: SocketAddr,
state: S,
callback: impl RecvCallback<S> + 'static,
) -> Result<JoinHandle<()>, ZmqError> {
let mut socket = PullSocket::new();
socket.bind(&format!("tcp://{addr}")).await?;
Ok(tokio::spawn(socket_recv_loop(socket, state, callback)))
}
async fn socket_recv_loop<S: Send + Sync + Clone>(
mut socket: PullSocket,
state: S,
callback: impl RecvCallback<S>,
) {
loop {
let Ok(mut message) = socket
.recv()
.await
.map(|m| m.into_vecdeque())
.inspect_err(|err| error!("socket_recv_loop: recv failed: {err}"))
else {
continue;
};
while let Some(data) = message.pop_front() {
match NetworkPacket::decode(&mut iter_read::IterRead::new(data.iter())) {
Ok(packet) => {
let _ = tokio::spawn(callback.call(state.clone(), packet));
}
Err(err) => warn!("failed to decode incoming packet: {err}"),
}
}
}
}

View file

@ -0,0 +1,52 @@
use std::io::Cursor;
use evelyn_codegen::{Decodeable, Encodeable};
mod session;
use evelyn_encoding::Decodeable;
pub use session::*;
#[derive(Debug, Encodeable, Decodeable)]
pub struct Header {
pub sender_type: u8,
pub sender_id: u32,
}
#[derive(Debug, Encodeable, Decodeable)]
pub struct NetworkPacket {
pub header: Header,
pub opcode: u16,
pub payload: Vec<u8>,
}
impl NetworkPacket {
pub fn get_message<M: Decodeable + WithOpcode>(&self) -> Option<M> {
(M::OPCODE == self.opcode)
.then(|| M::decode(&mut Cursor::new(&self.payload)).ok())
.flatten()
}
}
#[repr(u8)]
#[derive(num_enum::IntoPrimitive, num_enum::TryFromPrimitive)]
pub enum MessageCategory {
Session = 0,
}
pub trait WithOpcode {
const OPCODE: u16;
fn opcode(&self) -> u16 {
Self::OPCODE
}
}
macro_rules! opcode {
($category:ident, $($ty:ident = $num:expr),*) => {
$(impl super::WithOpcode for $ty {
const OPCODE: u16 = ((super::MessageCategory::$category as u16) * 100) + $num;
})*
};
}
use opcode;

View file

@ -0,0 +1,48 @@
use crate::message::opcode;
use evelyn_codegen::{Decodeable, Encodeable};
#[derive(Debug, Encodeable, Decodeable)]
pub struct BindClientSessionMessage {
pub session_id: u32,
pub player_uid: u32,
}
#[derive(Debug, Encodeable, Decodeable)]
pub struct BindClientSessionOkMessage {
pub session_id: u32,
}
#[derive(Debug, Encodeable, Decodeable)]
pub struct BindClientSessionFailedMessage {
pub session_id: u32,
}
#[derive(Debug, Encodeable, Decodeable)]
pub struct ProtocolUnit {
pub cmd_id: u16,
pub blob: Vec<u8>,
}
#[derive(Debug, Encodeable, Decodeable)]
pub struct ForwardClientProtocolMessage {
pub session_id: u32,
pub request_id: u32,
pub message: ProtocolUnit,
}
#[derive(Debug, Encodeable, Decodeable)]
pub struct AvailableServerProtocolMessage {
pub session_id: u32,
pub ack_request_id: u32,
pub notifies: Vec<ProtocolUnit>,
pub response: Option<ProtocolUnit>,
}
opcode! {
Session,
BindClientSessionMessage = 1,
BindClientSessionOkMessage = 2,
BindClientSessionFailedMessage = 3,
ForwardClientProtocolMessage = 4,
AvailableServerProtocolMessage = 5
}

View file

@ -0,0 +1,51 @@
use std::io::Cursor;
use std::net::SocketAddr;
use std::time::Duration;
use evelyn_encoding::Encodeable;
use tokio::sync::mpsc;
use zeromq::prelude::*;
use zeromq::PushSocket;
use crate::message::NetworkPacket;
#[derive(Clone)]
pub struct ServerSocket(mpsc::Sender<Box<[u8]>>);
impl ServerSocket {
const CONNECT_REPEAT_TIMEOUT: Duration = Duration::from_secs(2);
pub fn new(addr: SocketAddr) -> Self {
let (tx, rx) = mpsc::channel(64);
tokio::spawn(Self::worker_loop(format!("tcp://{addr}"), rx));
Self(tx)
}
pub async fn send(&self, data: NetworkPacket) {
let mut buf = Vec::with_capacity(data.encoding_length());
data.encode(&mut Cursor::new(&mut buf)).unwrap();
let _ = self.0.send(buf.into_boxed_slice()).await;
}
async fn worker_loop(endpoint: String, mut rx: mpsc::Receiver<Box<[u8]>>) {
let mut socket = Self::connect_to(&endpoint).await;
while let Some(buf) = rx.recv().await {
while socket.send(buf.to_vec().into()).await.is_err() {
socket = Self::connect_to(&endpoint).await;
}
}
}
async fn connect_to(endpoint: &str) -> PushSocket {
let mut socket = PushSocket::new();
while socket.connect(endpoint).await.is_err() {
tokio::time::sleep(Self::CONNECT_REPEAT_TIMEOUT).await;
}
socket
}
}

View file

@ -77,7 +77,7 @@ fn impl_struct_conversions(
let mut from_impls = quote! {};
let mut proto_to_qwer_matches = quote! {};
let mut qwer_to_proto_matches = quote! {};
let mut ptc_registers = quote! {};
let mut rsp_matches = quote! {};
let mut ptc_matches = quote! {};
for item in proto_file.items.iter() {
@ -114,74 +114,42 @@ fn impl_struct_conversions(
continue;
};
if let Some(req_base_name) = proto_ident_str.strip_suffix("CsReq") {
if proto_ident_str.ends_with("CsReq") {
if proto.attrs.iter().any(|attr| {
attr.path()
.get_ident()
.map(|i| i == "cmdid")
.unwrap_or(false)
}) {
let rpc_ret_ident =
Ident::new(&format!("Rpc{req_base_name}Ret"), proto_ident.span());
let proto_rsp_name = format!("{req_base_name}ScRsp");
// Check if rsp with this name defined in proto
// if it doesn't, send null message.
let proto_rsp_ident = proto_file
.items
.iter()
.any(|i| {
if let Item::Struct(s) = i {
s.ident == proto_rsp_name
} else {
false
}
})
.then_some(Ident::new(&proto_rsp_name, proto_ident.span()));
match proto_rsp_ident {
Some(proto_rsp_ident) => {
proto_to_qwer_matches = quote! {
#proto_to_qwer_matches
#proto_ident::CMD_ID => {
let packet = NetPacket::<::evelyn_proto::#proto_ident>::decode($buf)?;
let rpc_arg = ::protocol::#qwer_ident::from(packet.body);
let rpc_ret: ::protocol::#rpc_ret_ident = $point.call_rpc($addr, rpc_arg, $middlewares, $timeout).await?;
let proto_rsp = ::evelyn_proto::#proto_rsp_ident::from(rpc_ret);
$session.send_rsp(packet.head.packet_id, proto_rsp);
},
}
}
None => {
proto_to_qwer_matches = quote! {
#proto_to_qwer_matches
#proto_ident::CMD_ID => {
let packet = NetPacket::<::evelyn_proto::#proto_ident>::decode($buf)?;
let rpc_arg = ::protocol::#qwer_ident::from(packet.body);
let rpc_ret: ::protocol::#rpc_ret_ident = $point.call_rpc($addr, rpc_arg, $middlewares, $timeout).await?;
$session.send_null_rsp(packet.head.packet_id);
},
}
}
proto_to_qwer_matches = quote! {
#proto_to_qwer_matches
#proto_ident::CMD_ID => {
let packet = NetPacket::<::evelyn_proto::#proto_ident>::decode($buf)?;
let rpc_arg = ::protocol::#qwer_ident::from(packet.body);
let mut buf = Vec::new();
rpc_arg.marshal_to(&mut ::std::io::Cursor::new(&mut buf), 0).unwrap();
Some((packet.head.packet_id, ::evelyn_network::message::ProtocolUnit {
cmd_id: rpc_arg.get_protocol_id(),
blob: buf,
}))
},
}
}
} else if let Some(_ntf_base_name) = proto_ident_str.strip_suffix("ScNotify") {
ptc_registers = quote! {
#ptc_registers
$point.register_rpc_recv(
::protocol::#qwer_ident::PROTOCOL_ID,
move |ctx| async move {
let _ = $tx.get().unwrap().send(Input::Notify($conv, ctx)).await;
}
);
} else if let Some(_rsp_base_name) = proto_ident_str.strip_suffix("ScRsp") {
rsp_matches = quote! {
#rsp_matches
::protocol::#qwer_ident::PROTOCOL_ID => {
$session.send_rsp($request_id, ::evelyn_proto::#proto_ident::from(
::protocol::#qwer_ident::unmarshal_from(&mut ::std::io::Cursor::new($blob), 0).unwrap(),
));
},
};
} else if let Some(_ntf_base_name) = proto_ident_str.strip_suffix("ScNotify") {
ptc_matches = quote! {
#ptc_matches
::protocol::#qwer_ident::PROTOCOL_ID => {
$session.notify(::evelyn_proto::#proto_ident::from(
$ctx.get_arg::<::protocol::#qwer_ident>().unwrap(),
::protocol::#qwer_ident::unmarshal_from(&mut ::std::io::Cursor::new($blob), 0).unwrap(),
));
},
};
@ -191,7 +159,12 @@ fn impl_struct_conversions(
#proto_ident::CMD_ID => {
let packet = NetPacket::<::evelyn_proto::#proto_ident>::decode($buf)?;
let rpc_arg = ::protocol::#qwer_ident::from(packet.body);
$point.send_rpc($addr, 0, rpc_arg, 0, $middlewares).await;
let mut buf = Vec::new();
rpc_arg.marshal_to(&mut ::std::io::Cursor::new(&mut buf), 0).unwrap();
Some((packet.head.packet_id, ::evelyn_network::message::ProtocolUnit {
cmd_id: rpc_arg.get_protocol_id(),
blob: buf,
}))
}
}
}
@ -304,36 +277,29 @@ fn impl_struct_conversions(
#from_impls
#[macro_export]
macro_rules! decode_and_forward_proto {
($cmd_id:expr, $buf:expr, $session:expr, $point:expr, $addr:expr, $middlewares:expr, $timeout:expr) => {
macro_rules! to_protocol_unit {
($cmd_id:expr, $buf:expr) => {
match $cmd_id {
#proto_to_qwer_matches
_ => ::tracing::warn!("unknown cmd_id: {}", $cmd_id),
_ => None,
}
}
}
#[macro_export]
macro_rules! impl_qwer_to_proto_match {
($process_proto_message:ident) => {
match qwer.get_protocol_id() {
#qwer_to_proto_matches
_ => (),
macro_rules! send_rsp {
($session:expr, $request_id:expr, $cmd_id:expr, $blob:expr) => {
match $cmd_id {
#rsp_matches
_ => $session.send_null_rsp($request_id),
}
}
}
#[macro_export]
macro_rules! register_ptc_handlers {
($point:ident, $conv:ident, $tx:ident) => {
#ptc_registers
}
}
#[macro_export]
macro_rules! forward_as_notify {
($session:ident, $ctx:ident) => {
match $ctx.protocol_id {
macro_rules! send_notify {
($session:ident, $cmd_id:expr, $blob:expr) => {
match $cmd_id {
#ptc_matches
_ => (),
}

File diff suppressed because it is too large Load diff

View file

@ -9,7 +9,6 @@ tokio.workspace = true
# Service
qwer.workspace = true
qwer-rpc.workspace = true
protocol.workspace = true
# Database
@ -34,6 +33,7 @@ dashmap.workspace = true
# Internal
common.workspace = true
evelyn-data.workspace = true
evelyn-network.workspace = true
evelyn-eventgraph.workspace = true
evelyn-http-client.workspace = true
evelyn-codegen.workspace = true

View file

@ -1,9 +1,11 @@
service_id = 0
server_name = "nap_dev_01"
bind_client_version = "CNBetaWin1.5.0"
config_autopatch_url = "http://127.0.0.1:10000/server/"
design_data_url = "http://127.0.0.1:10000/design_data/NAP_Publish_AppStore_1.5"
[node]
server_id = 0
[database]
url = "localhost:8000"
username = "root"

View file

@ -24,7 +24,7 @@ type Result<T> = std::result::Result<T, surrealdb::Error>;
#[derive(Deserialize, Serialize)]
struct PlayerData {
pub player_uid: u64,
pub player_uid: u32,
pub game_uid_counter: u32,
pub player_info_blob: String,
}
@ -48,7 +48,7 @@ impl DbContext {
pub async fn get_or_create_player_data(
&self,
globals: &Globals,
player_uid: u64,
player_uid: u32,
) -> Result<(UidCounter, PlayerInfo)> {
let player_uid_str = player_uid.to_string();
let data: Option<PlayerData> = self.0.select((PLAYER_DATA_TABLE, &player_uid_str)).await?;
@ -79,7 +79,7 @@ impl DbContext {
}
pub async fn save_player_data(&self, last_uid: u32, player_info: &PlayerInfo) -> Result<()> {
let player_uid = *player_info.uid();
let player_uid = *player_info.uid() as u32;
let _: PlayerData = self
.0

View file

@ -4,7 +4,6 @@ use common::util::Ptr;
use evelyn_eventgraph::MainCityConfig;
use event_graph_runner::EventGraphGroup;
use protocol::PtcSyncEventInfoArg;
use qwer_rpc::RpcPtcContext;
use tracing::instrument;
use crate::{Globals, PlayerSession};
@ -82,10 +81,8 @@ impl LevelEventGraphManager {
});
}
pub async fn sync_event_info(&mut self, ctx: &RpcPtcContext) {
while let Some(ptc) = self.pending_events_info_sync.pop_front() {
ctx.send_ptc(ptc).await;
}
pub fn drain_pending_events_info_queue(&mut self) -> VecDeque<PtcSyncEventInfoArg> {
std::mem::take(&mut self.pending_events_info_sync)
}
}

View file

@ -1,22 +1,26 @@
use std::{
process::ExitCode,
sync::{Arc, LazyLock, OnceLock},
sync::{LazyLock, OnceLock},
time::Duration,
};
use anyhow::{bail, Result};
use common::config::{DatabaseSettings, ServiceType, TomlConfig};
use anyhow::Result;
use common::config::{DatabaseSettings, TomlConfig};
use dashmap::DashMap;
use database::DbContext;
use evelyn_network::{
config::ServerNodeConfiguration,
message::{BindClientSessionMessage, ForwardClientProtocolMessage, NetworkPacket, WithOpcode},
ServerNetworkManager, ServerType,
};
use level::{EventConfigManager, LevelEventGraphManager};
use player_info::PlayerInfo;
use player_util::UidCounter;
use qwer_rpc::{ProtocolServiceFrontend, RpcPtcPoint, RpcPtcServiceFrontend};
use evelyn_data::{ArchiveFile, NapFileCfg};
use protocol::*;
use serde::Deserialize;
use tracing::error;
use tracing::{error, warn};
mod database;
mod level;
@ -27,7 +31,7 @@ mod scene_section_util;
#[derive(Deserialize)]
pub struct GameServerConfig {
pub service_id: u32,
pub node: ServerNodeConfiguration,
pub server_name: String,
pub bind_client_version: String,
pub config_autopatch_url: String,
@ -40,7 +44,7 @@ impl TomlConfig for GameServerConfig {
}
struct PlayerSession {
pub player_uid: u64,
pub player_uid: u32,
pub uid_counter: UidCounter,
pub player_info: PlayerInfo,
pub last_save_time: u64,
@ -54,10 +58,12 @@ pub struct Globals {
static GLOBALS: OnceLock<Globals> = OnceLock::new();
static PLAYER_MAP: LazyLock<DashMap<u64, PlayerSession>> = LazyLock::new(|| DashMap::new());
static SESSION_MAP: LazyLock<DashMap<u32, PlayerSession>> = LazyLock::new(|| DashMap::new());
static DB_CONTEXT: OnceLock<DbContext> = OnceLock::new();
const SERVICE_TYPE: ServiceType = ServiceType::GameServer;
struct AppState {
pub network_manager: ServerNetworkManager,
}
#[tokio::main]
async fn main() -> ExitCode {
@ -117,28 +123,47 @@ async fn init_database(config: &'static GameServerConfig) -> Result<()> {
}
async fn init_network(config: &'static GameServerConfig) -> Result<()> {
static SERVICE: OnceLock<RpcPtcServiceFrontend> = OnceLock::new();
static POINT: OnceLock<Arc<RpcPtcPoint>> = OnceLock::new();
static STATE: OnceLock<AppState> = OnceLock::new();
let environment = remote_config::download_env_config(&config.config_autopatch_url);
let Some(listen_end_point) = environment.get_server_end_point(SERVICE_TYPE, config.service_id)
else {
bail!(
"the instance [{:?}-{}] is missing from environment.json",
SERVICE_TYPE,
config.service_id
);
};
let network_manager = ServerNetworkManager::new(
ServerType::GameServer,
config.node.server_id,
&environment.servers,
);
let service = RpcPtcServiceFrontend::new(ProtocolServiceFrontend::new());
let Ok(listen_point) = service.create_point(Some(listen_end_point)).await else {
bail!("failed to create_point at tcp://{listen_end_point}. Is another instance of this service already running?");
};
let state = STATE.get_or_init(|| AppState { network_manager });
rpc_ptc::register_handlers(&listen_point);
let _ = SERVICE.set(service);
let _ = POINT.set(listen_point);
state
.network_manager
.start_listener(state, on_message)
.await?;
Ok(())
}
async fn on_message(state: &'static AppState, packet: NetworkPacket) {
match packet.opcode {
BindClientSessionMessage::OPCODE => {
rpc_ptc::auth::on_bind_client_session_message(
state,
packet
.get_message()
.expect("failed to decode BindClientSessionMessage"),
packet.header,
)
.await
}
ForwardClientProtocolMessage::OPCODE => {
rpc_ptc::on_forward_client_protocol_message(
state,
packet
.get_message()
.expect("failed to decode ForwardClientProtocolMessage"),
packet.header,
)
.await
}
unhandled => warn!("unhandled incoming message from another server, opcode: {unhandled}"),
}
}

View file

@ -43,12 +43,12 @@ impl UidCounter {
pub fn create_starting_player_info(
globals: &Globals,
uid: u64,
uid: u32,
nick_name: &str,
) -> (UidCounter, PlayerInfo) {
let mut counter = UidCounter::new((uid & 0xFFFFFFFF) as u32, 0);
let mut player_info = PlayerInfo {
uid: Some(uid),
uid: Some(uid as u64),
account_name: Some(uid.to_string()),
last_enter_world_timestamp: Some(0),
items: Some(phashmap!()),

View file

@ -2,6 +2,7 @@ use std::time::Duration;
use common::config::*;
use evelyn_http_client::AutopatchClient;
use evelyn_network::config::ServerEnvironmentConfiguration;
use serde::Deserialize;
use crate::GameServerConfig;
@ -34,7 +35,7 @@ struct DataVersion {
const RETRY_TIME: Duration = Duration::from_secs(5);
pub fn download_env_config(autopatch_url: &'static str) -> EnvironmentConfiguration {
pub fn download_env_config(autopatch_url: &'static str) -> ServerEnvironmentConfiguration {
let client = AutopatchClient::new(&autopatch_url).retry_after(RETRY_TIME);
client.fetch_until_success("environment.json")
}

View file

@ -1,42 +1,48 @@
use common::time_util;
use tracing::{error, info, warn};
use evelyn_network::{
message::{
BindClientSessionFailedMessage, BindClientSessionMessage, BindClientSessionOkMessage,
Header,
},
ServerType,
};
use tracing::{error, info};
use crate::{level::LevelEventGraphManager, DB_CONTEXT, GLOBALS, PLAYER_MAP};
use crate::{level::LevelEventGraphManager, AppState, DB_CONTEXT, GLOBALS, SESSION_MAP};
use super::*;
pub async fn on_rpc_player_login_arg(ctx: RpcPtcContext) {
let _arg: RpcPlayerLoginArg = ctx.get_arg().unwrap();
let Some(MiddlewareModel::Account(account_mw)) = ctx
.middleware_list
.iter()
.find(|&mw| matches!(mw, MiddlewareModel::Account(_)))
else {
warn!("login failed: account middleware is missing");
return;
};
pub async fn on_bind_client_session_message(
state: &'static AppState,
message: BindClientSessionMessage,
header: Header,
) {
let Ok((uid_counter, mut player_info)) = DB_CONTEXT
.get()
.unwrap()
.get_or_create_player_data(GLOBALS.get().unwrap(), account_mw.player_uid)
.get_or_create_player_data(GLOBALS.get().unwrap(), message.player_uid)
.await
.inspect_err(|err| error!("login failed: get_or_create_player_data failed: {err}"))
else {
ctx.send_ret(RpcPlayerLoginRet {
retcode: Retcode::Fail,
})
.await;
state
.network_manager
.send_to(
ServerType::GateServer,
header.sender_id,
BindClientSessionFailedMessage {
session_id: message.session_id,
},
)
.await;
return;
};
*player_info.login_times_mut() += 1;
PLAYER_MAP.insert(
account_mw.player_uid,
SESSION_MAP.insert(
message.session_id,
PlayerSession {
player_uid: account_mw.player_uid,
player_uid: message.player_uid,
uid_counter,
player_info,
level_event_graph_mgr: LevelEventGraphManager::new(common::util::Ptr::Static(
@ -46,39 +52,47 @@ pub async fn on_rpc_player_login_arg(ctx: RpcPtcContext) {
},
);
info!("player with uid {} is logging in!", account_mw.player_uid);
ctx.send_ret(RpcPlayerLoginRet {
retcode: Retcode::Succ,
})
.await;
}
info!("player with uid {} is logging in!", message.player_uid);
pub async fn on_rpc_player_logout_arg(ctx: RpcPtcContext) {
let Some(MiddlewareModel::Account(account_mw)) = ctx
.middleware_list
.iter()
.find(|&mw| matches!(mw, MiddlewareModel::Account(_)))
else {
warn!("logout failed: account middleware is missing");
return;
};
if let Some((_, session)) = PLAYER_MAP.remove(&account_mw.player_uid) {
crate::DB_CONTEXT
.get()
.unwrap()
.save_player_data(session.uid_counter.last_uid(), &session.player_info)
.await
.expect("failed to save player data");
info!(
"successfully saved player data and logged out (uid: {})",
session.player_uid
state
.network_manager
.send_to(
ServerType::GateServer,
header.sender_id,
BindClientSessionOkMessage {
session_id: message.session_id,
},
)
}
ctx.send_ret(RpcPlayerLogoutRet {
retcode: Retcode::Succ,
})
.await;
.await;
}
// TODO: UnbindClientSessionMessage
// pub async fn on_rpc_player_logout_arg(ctx: RpcPtcContext) {
// let Some(MiddlewareModel::Account(account_mw)) = ctx
// .middleware_list
// .iter()
// .find(|&mw| matches!(mw, MiddlewareModel::Account(_)))
// else {
// warn!("logout failed: account middleware is missing");
// return;
// };
//
// if let Some((_, session)) = PLAYER_MAP.remove(&account_mw.player_uid) {
// crate::DB_CONTEXT
// .get()
// .unwrap()
// .save_player_data(session.uid_counter.last_uid(), &session.player_info)
// .await
// .expect("failed to save player data");
//
// info!(
// "successfully saved player data and logged out (uid: {})",
// session.player_uid
// )
// }
//
// ctx.send_ret(RpcPlayerLogoutRet {
// retcode: Retcode::Succ,
// })
// .await;
// }

View file

@ -25,10 +25,15 @@ mod handlers {
.begin_interact(ctx.arg.interaction, ctx.arg.npc_tag_id);
level::fire_event(ctx.session, ctx.arg.interaction, "OnInteract");
ctx.session
let mut queue = ctx
.session
.level_event_graph_mgr
.sync_event_info(&ctx.rpc_ptc)
.await;
.drain_pending_events_info_queue();
while let Some(ptc) = queue.pop_front() {
ctx.add_notify(ptc);
}
Ok(RpcInteractWithUnitRet::default())
}
@ -36,16 +41,14 @@ mod handlers {
pub async fn on_rpc_run_event_graph_arg(
ctx: &mut NetworkContext<'_, RpcRunEventGraphArg>,
) -> Result<RpcRunEventGraphRet, Retcode> {
ctx.rpc_ptc
.send_ptc(PtcUpdateEventGraphArg {
owner_type: ctx.arg.owner_type,
tag: ctx.arg.tag,
event_graph_uid: ctx.arg.event_graph_uid,
npc_interaction: String::from("OnInteract"),
is_event_success: true,
event_graph_owner_uid: ctx.arg.owner_id,
})
.await;
ctx.add_notify(PtcUpdateEventGraphArg {
owner_type: ctx.arg.owner_type,
tag: ctx.arg.tag,
event_graph_uid: ctx.arg.event_graph_uid,
npc_interaction: String::from("OnInteract"),
is_event_success: true,
event_graph_owner_uid: ctx.arg.owner_id,
});
Ok(RpcRunEventGraphRet::default())
}

View file

@ -102,13 +102,11 @@ mod handlers {
*avatar_uid = target_avatar_uid;
ctx.rpc_ptc
.send_ptc(PtcPlayerSyncArg {
avatar: Some(protocol::util::build_avatar_sync(&ctx.session.player_info)),
item: Some(protocol::util::build_item_sync(&ctx.session.player_info)),
..Default::default()
})
.await;
ctx.add_notify(PtcPlayerSyncArg {
avatar: Some(protocol::util::build_avatar_sync(&ctx.session.player_info)),
item: Some(protocol::util::build_item_sync(&ctx.session.player_info)),
..Default::default()
});
Ok(RpcWeaponDressRet::default())
}
@ -146,13 +144,11 @@ mod handlers {
}
});
ctx.rpc_ptc
.send_ptc(PtcPlayerSyncArg {
avatar: Some(protocol::util::build_avatar_sync(&ctx.session.player_info)),
item: Some(protocol::util::build_item_sync(&ctx.session.player_info)),
..Default::default()
})
.await;
ctx.add_notify(PtcPlayerSyncArg {
avatar: Some(protocol::util::build_avatar_sync(&ctx.session.player_info)),
item: Some(protocol::util::build_item_sync(&ctx.session.player_info)),
..Default::default()
});
Ok(RpcWeaponUnDressRet::default())
}

View file

@ -131,7 +131,7 @@ mod handlers {
scene_uid,
&mut ptc,
);
ctx.rpc_ptc.send_ptc(ptc).await;
ctx.add_notify(ptc);
} else {
warn!("RpcModTime: currently not in Hall");
}
@ -145,16 +145,14 @@ mod handlers {
ctx: &mut NetworkContext<'_, RpcModMainCityAvatarArg>,
) -> Result<RpcModMainCityAvatarRet, Retcode> {
debug!("{:?}", &ctx.arg);
ctx.session.player_info.main_city_avatar_id = Some(ctx.arg.main_city_avatar_id);
let player_info = &mut ctx.session.player_info;
player_info.main_city_avatar_id = Some(ctx.arg.main_city_avatar_id);
ctx.rpc_ptc
.send_ptc(PtcPlayerSyncArg {
basic_info: Some(protocol::util::build_player_basic_info(player_info)),
..Default::default()
})
.await;
ctx.add_notify(PtcPlayerSyncArg {
basic_info: Some(protocol::util::build_player_basic_info(
&ctx.session.player_info,
)),
..Default::default()
});
Ok(RpcModMainCityAvatarRet::default())
}

View file

@ -1,26 +1,47 @@
use common::{time_util, util::Ptr};
use qwer_rpc::{middleware::MiddlewareModel, RpcPtcContext, RpcPtcPoint};
use evelyn_network::{
message::{AvailableServerProtocolMessage, ForwardClientProtocolMessage, Header, ProtocolUnit},
ServerType,
};
use tracing::info;
use crate::{Globals, PlayerSession};
use crate::{AppState, Globals, PlayerSession};
use protocol::*;
use qwer::*;
mod auth;
pub mod auth;
pub struct NetworkContext<'s, T: OctData> {
pub arg: T,
pub rpc_ptc: RpcPtcContext,
pub notifies: Vec<ProtocolUnit>,
pub session: &'s mut PlayerSession,
pub globals: Ptr<Globals>,
}
impl<T: OctData> NetworkContext<'_, T> {
pub fn add_notify<Notify: OctData + ProtocolID>(&mut self, arg: Notify) {
let mut blob = Vec::new();
arg.marshal_to(&mut blob, 0).unwrap();
self.notifies.push(ProtocolUnit {
cmd_id: arg.get_protocol_id(),
blob,
});
}
}
macro_rules! network_modules {
($($name:ident),*) => {
$(mod $name;)*
fn register_module_handlers(point: &RpcPtcPoint) {
$($name::register_handlers(point);)*
async fn handle_client_protocol_by_module(cmd_id: u16, blob: &[u8], session_id: u32) -> (Option<ProtocolUnit>, Vec<ProtocolUnit>) {
$(
if $name::supports_message(cmd_id) {
return $name::handle_message(cmd_id, blob, session_id).await;
}
)*
(None, Vec::with_capacity(0))
}
};
}
@ -51,18 +72,31 @@ network_modules!(
world
);
pub fn register_handlers(listen_point: &RpcPtcPoint) {
listen_point.register_rpc_recv(
RpcPlayerLoginArg::PROTOCOL_ID,
auth::on_rpc_player_login_arg,
);
pub async fn on_forward_client_protocol_message(
state: &'static AppState,
message: ForwardClientProtocolMessage,
header: Header,
) {
let (response, notifies) = handle_client_protocol_by_module(
message.message.cmd_id,
&message.message.blob,
message.session_id,
)
.await;
listen_point.register_rpc_recv(
RpcPlayerLogoutArg::PROTOCOL_ID,
auth::on_rpc_player_logout_arg,
);
register_module_handlers(listen_point);
state
.network_manager
.send_to(
ServerType::GateServer,
header.sender_id,
AvailableServerProtocolMessage {
session_id: message.session_id,
ack_request_id: message.request_id,
response,
notifies,
},
)
.await;
}
pub async fn post_rpc_handle(session: &mut PlayerSession) {

View file

@ -195,7 +195,7 @@ mod handlers {
initiative_item_used_times: 0,
avatar_map: phashmap![],
battle_report: Vec::new(),
dungeon_group_uid: ctx.session.player_uid,
dungeon_group_uid: ctx.session.player_uid as u64,
entered_times: 0,
is_preset_avatar: false,
hollow_event_version: 0,
@ -227,12 +227,10 @@ mod handlers {
.scenes_mut()
.insert(scene_uid, scene_info);
ctx.rpc_ptc
.send_ptc(PtcEnterSceneArg {
scene_info: build_client_scene_info(&ctx.session.player_info, scene_uid).unwrap(),
dungeon_info: build_client_dungeon_info(&ctx.session.player_info, scene_uid),
})
.await;
ctx.add_notify(PtcEnterSceneArg {
scene_info: build_client_scene_info(&ctx.session.player_info, scene_uid).unwrap(),
dungeon_info: build_client_dungeon_info(&ctx.session.player_info, scene_uid),
});
Ok(RpcBeginArchiveBattleQuestRet {
retcode: Retcode::Succ,

View file

@ -130,15 +130,15 @@ mod handlers {
.collect(),
);
ctx.rpc_ptc
.send_ptc(PtcPlayerSyncArg {
client_systems: Some(ClientSystemsSync {
post_girl_data: Some(protocol::util::build_post_girl_sync(player_info)),
..Default::default()
}),
ctx.add_notify(PtcPlayerSyncArg {
client_systems: Some(ClientSystemsSync {
post_girl_data: Some(protocol::util::build_post_girl_sync(
&ctx.session.player_info,
)),
..Default::default()
})
.await;
}),
..Default::default()
});
Ok(RpcSelectPostGirlRet {
retcode: Retcode::Succ,

View file

@ -40,7 +40,7 @@ mod handlers {
initiative_item_used_times: 0,
avatar_map: phashmap![],
battle_report: Vec::new(),
dungeon_group_uid: ctx.session.player_uid,
dungeon_group_uid: ctx.session.player_uid as u64,
entered_times: 0,
is_preset_avatar: false,
hollow_event_version: 0,
@ -59,7 +59,7 @@ mod handlers {
camera_x: 0xFFFFFFFF,
camera_y: 0xFFFFFFFF,
main_city_time_info: scene_info::MainCityTimeInfo {
initial_time: 60 * 21,
initial_time: 360,
day_of_week: 5,
passed_milliseconds: 0,
executing_event_groups: phashset![],
@ -109,12 +109,10 @@ mod handlers {
let mut scene_info = build_client_scene_info(player_info, scene_uid).unwrap();
scene_section_util::add_scene_units_to_scene_info(ctx.session, scene_uid, &mut scene_info);
ctx.rpc_ptc
.send_ptc(PtcEnterSceneArg {
scene_info,
dungeon_info: build_client_dungeon_info(&ctx.session.player_info, scene_uid),
})
.await;
ctx.add_notify(PtcEnterSceneArg {
scene_info,
dungeon_info: build_client_dungeon_info(&ctx.session.player_info, scene_uid),
});
Ok(RpcEnterWorldRet::default())
}
@ -208,12 +206,10 @@ mod handlers {
cur_scene_uid,
&mut scene_info,
);
ctx.rpc_ptc
.send_ptc(PtcEnterSceneArg {
scene_info,
dungeon_info: build_client_dungeon_info(&ctx.session.player_info, cur_scene_uid),
})
.await;
ctx.add_notify(PtcEnterSceneArg {
scene_info,
dungeon_info: build_client_dungeon_info(&ctx.session.player_info, cur_scene_uid),
});
Ok(RpcEnterSectionRet::default())
}
@ -290,7 +286,7 @@ mod handlers {
initiative_item_used_times: 0,
avatar_map: phashmap![],
battle_report: Vec::new(),
dungeon_group_uid: ctx.session.player_uid,
dungeon_group_uid: ctx.session.player_uid as u64,
entered_times: 0,
is_preset_avatar: false,
hollow_event_version: 0,
@ -325,12 +321,10 @@ mod handlers {
let mut scene_info = build_client_scene_info(&ctx.session.player_info, scene_uid).unwrap();
scene_section_util::add_scene_units_to_scene_info(ctx.session, scene_uid, &mut scene_info);
ctx.rpc_ptc
.send_ptc(PtcEnterSceneArg {
scene_info,
dungeon_info: None,
})
.await;
ctx.add_notify(PtcEnterSceneArg {
scene_info,
dungeon_info: None,
});
Ok(RpcBeginTrainingCourseBattleRet::default())
}
@ -371,12 +365,10 @@ mod handlers {
let mut scene_info = build_client_scene_info(&ctx.session.player_info, scene_uid).unwrap();
scene_section_util::add_scene_units_to_scene_info(ctx.session, scene_uid, &mut scene_info);
ctx.rpc_ptc
.send_ptc(PtcEnterSceneArg {
scene_info,
dungeon_info: build_client_dungeon_info(&ctx.session.player_info, scene_uid),
})
.await;
ctx.add_notify(PtcEnterSceneArg {
scene_info,
dungeon_info: build_client_dungeon_info(&ctx.session.player_info, scene_uid),
});
Ok(RpcLeaveCurSceneRet::default())
}

View file

@ -32,8 +32,8 @@ paste.workspace = true
kcp = { path = "./kcp" }
common.workspace = true
qwer.workspace = true
qwer-rpc.workspace = true
protocol.workspace = true
evelyn-proto.workspace = true
evelyn-network.workspace = true
evelyn-encryption.workspace = true
evelyn-http-client.workspace = true

View file

@ -3,6 +3,9 @@ bind_client_version = "CNBetaWin1.5.0"
config_autopatch_url = "http://127.0.0.1:10000/server/"
design_data_url = "http://127.0.0.1:10000/design_data/NAP_Publish_AppStore_1.5"
[node]
server_id = 0
[database]
url = "localhost:8000"
username = "root"

View file

@ -5,11 +5,18 @@ use std::{
time::Duration,
};
use common::config::{DatabaseSettings, EnvironmentConfiguration, TomlConfig};
use common::config::{DatabaseSettings, TomlConfig};
use database::DbContext;
use evelyn_network::{
config::ServerNodeConfiguration,
message::{
AvailableServerProtocolMessage, BindClientSessionOkMessage, NetworkPacket, WithOpcode,
},
ServerNetworkManager, ServerType,
};
use remote_config::RemoteConfiguration;
use serde::Deserialize;
use tracing::{error, Level};
use tracing::{error, warn, Level};
use udp_server::UdpServer;
mod database;
@ -21,6 +28,7 @@ mod udp_server;
#[derive(Deserialize)]
pub struct GateServerConfig {
pub node: ServerNodeConfiguration,
pub server_name: String,
pub bind_client_version: String,
pub config_autopatch_url: String,
@ -30,8 +38,8 @@ pub struct GateServerConfig {
struct AppState {
remote_config: RemoteConfiguration,
environment: EnvironmentConfiguration,
db_context: DbContext,
network_manager: ServerNetworkManager,
}
impl TomlConfig for GateServerConfig {
@ -62,12 +70,24 @@ async fn main() -> ExitCode {
return ExitCode::FAILURE;
};
let network_manager = ServerNetworkManager::new(
ServerType::GateServer,
CONFIG.node.server_id,
&environment.servers,
);
let state = STATE.get_or_init(|| AppState {
remote_config: config,
environment,
db_context,
network_manager,
});
state
.network_manager
.start_listener(state, on_message)
.await
.unwrap();
let Ok(server) = UdpServer::new(&format!("0.0.0.0:{gateway_port}"), state) else {
error!("failed to bind at udp://0.0.0.0:{gateway_port}, is another instance of this service already running?");
return ExitCode::FAILURE;
@ -78,3 +98,54 @@ async fn main() -> ExitCode {
tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
ExitCode::SUCCESS
}
async fn on_message(_state: &'static AppState, packet: NetworkPacket) {
match packet.opcode {
BindClientSessionOkMessage::OPCODE => {
let bind_client_session_ok = packet
.get_message::<BindClientSessionOkMessage>()
.expect("failed to decode BindClientSessionOkMessage");
net::packet_processor::TX
.get()
.unwrap()
.send(net::packet_processor::Input::LoginFinished(
bind_client_session_ok.session_id,
evelyn_proto::PlayerLoginScRsp::default(),
))
.await
.unwrap();
}
AvailableServerProtocolMessage::OPCODE => {
let available_server_protocol = packet
.get_message::<AvailableServerProtocolMessage>()
.expect("failed to decode AvailableServerProtocolMessage");
for notify in available_server_protocol.notifies {
net::packet_processor::TX
.get()
.unwrap()
.send(net::packet_processor::Input::SendNotify(
available_server_protocol.session_id,
notify,
))
.await
.unwrap();
}
if let Some(rsp) = available_server_protocol.response {
net::packet_processor::TX
.get()
.unwrap()
.send(net::packet_processor::Input::SendRsp(
available_server_protocol.session_id,
available_server_protocol.ack_request_id,
rsp,
))
.await
.unwrap();
}
}
unhandled => warn!("unhandled incoming message from another server, opcode: {unhandled}"),
}
}

View file

@ -3,23 +3,20 @@ use crate::{
session::Session,
AppState,
};
use common::config::ServiceType;
use evelyn_encryption::rsa;
use evelyn_proto::*;
use qwer_rpc::{
middleware::{AccountMiddlewareModel, MiddlewareModel},
RpcCallError,
use evelyn_network::{
message::{BindClientSessionMessage, ForwardClientProtocolMessage},
ServerType,
};
use evelyn_proto::*;
use qwer::{OctData, ProtocolID};
use rand::RngCore;
use std::time::Duration;
use tracing::{debug, error};
use tracing::{debug, error, warn};
#[derive(thiserror::Error, Debug)]
pub enum PacketHandlingError {
#[error("decode error: {0}")]
Decode(#[from] DecodeError),
#[error("rpc call error: {0}")]
RpcCallError(#[from] RpcCallError),
}
pub async fn decode_and_handle(
@ -35,23 +32,27 @@ pub async fn decode_and_handle(
let packet = NetPacket::<PlayerGetTokenCsReq>::decode(buf)?;
on_player_get_token_cs_req(session, state, packet.head, packet.body).await;
}
PlayerLoginCsReq::CMD_ID => {
let _packet = NetPacket::<PlayerLoginCsReq>::decode(buf)?;
state.network_manager.send_to(ServerType::GameServer, 0, BindClientSessionMessage {
player_uid: session.get_player_uid(),
session_id: session.conv,
}).await;
}
cmd_id if session.is_logged_in() => {
let middleware_list = vec![MiddlewareModel::Account(AccountMiddlewareModel {
player_uid: session.get_player_uid() as u64,
client_protocol_uid: 1,
is_resend: false,
})];
let end_point = session.game_server_addr();
decode_and_forward_proto!(
if let Some((request_id, protocol_unit)) = to_protocol_unit!(
cmd_id,
buf,
session,
session.rpc_ptc_point.lock().await,
end_point,
middleware_list,
Duration::from_secs(2)
)
buf
) {
state.network_manager.send_to(ServerType::GameServer, 0, ForwardClientProtocolMessage {
session_id: session.conv,
request_id,
message: protocol_unit,
}).await;
}
else {
warn!("received unknown cmd_id: {cmd_id}");
}
}
cmd_id => debug!("received cmd_id: {cmd_id}, session is not logged in, expected PlayerGetTokenCsReq (cmd_id: {})", PlayerGetTokenCsReq::CMD_ID),
}
@ -108,14 +109,6 @@ async fn on_player_get_token_cs_req(
}
};
// TODO: multiple game servers, choose random one by load balance manager
session.bind_game_server(
state
.environment
.get_server_end_point(ServiceType::GameServer, 0)
.unwrap(),
);
session.send_rsp(
head.packet_id,
PlayerGetTokenScRsp {

View file

@ -1,12 +1,12 @@
use evelyn_proto::{forward_as_notify, register_ptc_handlers};
use qwer::ProtocolID;
use qwer_rpc::{ProtocolServiceFrontend, RpcPtcContext, RpcPtcServiceFrontend};
use evelyn_network::message::ProtocolUnit;
use evelyn_proto::PlayerLoginScRsp;
use qwer::{OctData, ProtocolID};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, OnceLock},
};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::mpsc;
use tracing::{info_span, Instrument};
use crate::{session::Session, AppState};
@ -17,10 +17,12 @@ pub enum Input {
CreateSession(u32, SocketAddr),
RemoveSession(u32),
Packet(u32, Box<[u8]>),
Notify(u32, RpcPtcContext),
LoginFinished(u32, PlayerLoginScRsp),
SendNotify(u32, ProtocolUnit),
SendRsp(u32, u32, ProtocolUnit),
}
static TX: OnceLock<mpsc::Sender<Input>> = OnceLock::new();
pub static TX: OnceLock<mpsc::Sender<Input>> = OnceLock::new();
pub fn start(
kcp_evt_tx: std::sync::mpsc::Sender<(u32, KcpEvent)>,
@ -41,25 +43,14 @@ async fn processing_loop(
tx: std::sync::mpsc::Sender<(u32, KcpEvent)>,
state: &'static AppState,
) {
let rpc_ptc_service = RpcPtcServiceFrontend::new(ProtocolServiceFrontend::new());
let mut session_map: HashMap<u32, Arc<Session>> = HashMap::new();
loop {
match rx.recv().await {
Some(Input::CreateSession(conv, addr)) => {
let rpc_ptc_point = rpc_ptc_service.create_point(None).await.unwrap();
let conv_id = conv;
register_ptc_handlers!(rpc_ptc_point, conv_id, TX);
let session = Session::new(
conv,
addr,
&state.remote_config.xorpad,
tx.clone(),
Mutex::new(rpc_ptc_point),
);
//register_ptc_handlers!(rpc_ptc_point, conv_id, TX);
let session = Session::new(conv, addr, &state.remote_config.xorpad, tx.clone());
session_map.insert(conv, Arc::new(session));
}
Some(Input::RemoveSession(conv)) => {
@ -81,9 +72,19 @@ async fn processing_loop(
);
}
}
Some(Input::Notify(conv, ctx)) => {
Some(Input::LoginFinished(conv, rsp)) => {
if let Some(session) = session_map.get(&conv) {
forward_as_notify!(session, ctx);
session.send_rsp(1, rsp);
}
}
Some(Input::SendNotify(conv, message)) => {
if let Some(session) = session_map.get(&conv) {
evelyn_proto::send_notify!(session, message.cmd_id, &message.blob);
}
}
Some(Input::SendRsp(conv, rsp_id, message)) => {
if let Some(session) = session_map.get(&conv) {
evelyn_proto::send_rsp!(session, rsp_id, message.cmd_id, &message.blob);
}
}
None => break,

View file

@ -3,6 +3,7 @@ use std::time::Duration;
use common::config::*;
use evelyn_encryption::xor::MhyXorpad;
use evelyn_http_client::AutopatchClient;
use evelyn_network::config::ServerEnvironmentConfiguration;
use crate::GateServerConfig;
@ -14,7 +15,7 @@ pub struct RemoteConfiguration {
const RETRY_TIME: Duration = Duration::from_secs(5);
pub fn download_env_config(autopatch_url: &'static str) -> EnvironmentConfiguration {
pub fn download_env_config(autopatch_url: &'static str) -> ServerEnvironmentConfiguration {
let client = AutopatchClient::new(&autopatch_url).retry_after(RETRY_TIME);
client.fetch_until_success("environment.json")
}

View file

@ -2,14 +2,12 @@ use std::{
net::SocketAddr,
sync::{
atomic::{AtomicU32, Ordering},
mpsc, Arc, OnceLock,
mpsc, OnceLock,
},
};
use evelyn_encryption::xor::MhyXorpad;
use evelyn_proto::{NapMessage, NullMessage, PlayerGetTokenScRsp};
use qwer_rpc::RpcPtcPoint;
use tokio::sync::Mutex;
use tracing::{debug, instrument};
use crate::{
@ -29,8 +27,6 @@ pub struct SessionID {
pub struct Session {
pub conv: u32,
pub addr: SocketAddr,
pub rpc_ptc_point: Mutex<Arc<RpcPtcPoint>>,
game_server_addr: OnceLock<SocketAddr>,
player_uid: OnceLock<u32>,
initial_xorpad: &'static MhyXorpad,
secret_key: OnceLock<MhyXorpad>,
@ -44,14 +40,11 @@ impl Session {
addr: SocketAddr,
initial_xorpad: &'static MhyXorpad,
kcp_evt_tx: mpsc::Sender<(u32, KcpEvent)>,
rpc_ptc_point: Mutex<Arc<RpcPtcPoint>>,
) -> Self {
Self {
conv,
addr,
kcp_evt_tx,
rpc_ptc_point,
game_server_addr: OnceLock::new(),
initial_xorpad,
seq_id: AtomicU32::new(0),
secret_key: OnceLock::new(),
@ -126,14 +119,6 @@ impl Session {
let _ = self.kcp_evt_tx.send((self.conv, KcpEvent::Send(buf)));
}
pub fn bind_game_server(&self, addr: SocketAddr) {
let _ = self.game_server_addr.set(addr);
}
pub fn game_server_addr(&self) -> SocketAddr {
*self.game_server_addr.get().unwrap()
}
pub fn set_secret_key(&self, seed: u64) {
let _ = self.secret_key.set(MhyXorpad::new::<byteorder::BE>(seed));
}

View file

@ -22,17 +22,6 @@ pub use enums::*;
#[id(1)]
pub struct PtcKeepAliveArg {}
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(100)]
pub struct RpcPlayerLoginArg {
pub platform: u32,
}
#[derive(OctData, Debug, Default)]
pub struct RpcPlayerLoginRet {
pub retcode: Retcode,
}
#[derive(OctData, Debug, Default)]
pub struct PlayerBasicInfo {
pub avatar_id: u32,
@ -47,7 +36,8 @@ pub struct PlayerBasicInfo {
#[id(101)]
pub struct RpcGetPlayerBasicInfoArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1101)]
pub struct RpcGetPlayerBasicInfoRet {
pub retcode: Retcode,
pub basic_info: PlayerBasicInfo,
@ -77,7 +67,8 @@ pub struct AvatarInfo {
pub cur_weapon_uid: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1102)]
pub struct RpcGetAvatarDataRet {
pub retcode: Retcode,
pub avatar_list: Vec<AvatarInfo>,
@ -98,7 +89,8 @@ pub struct WeaponInfo {
pub lock: bool,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1103)]
pub struct RpcGetWeaponDataRet {
pub retcode: Retcode,
pub weapon_list: Vec<WeaponInfo>,
@ -118,7 +110,8 @@ pub struct EquipInfo {
pub lock: bool,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1104)]
pub struct RpcGetEquipDataRet {
pub retcode: Retcode,
pub equip_list: Vec<EquipInfo>,
@ -140,7 +133,8 @@ pub struct AutoRecoveryInfo {
pub buy_times: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1105)]
pub struct RpcGetResourceDataRet {
pub retcode: Retcode,
pub resource_list: Vec<ResourceInfo>,
@ -164,7 +158,8 @@ pub struct QuestData {
pub quest_collection_list: Vec<QuestCollection>,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1106)]
pub struct RpcGetQuestDataRet {
pub retcode: Retcode,
pub quest_type: u32,
@ -187,7 +182,8 @@ pub struct ArchiveData {
pub videotaps_info: Vec<VideotapeInfo>,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1107)]
pub struct RpcGetArchiveDataRet {
pub retcode: Retcode,
pub archive_data: ArchiveData,
@ -202,7 +198,8 @@ pub struct HollowData {
pub unlock_hollow_id_list: Vec<u32>,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1108)]
pub struct RpcGetHollowDataRet {
pub retcode: Retcode,
pub hollow_data: HollowData,
@ -215,7 +212,8 @@ pub struct RpcAbyssGetDataArg {}
#[derive(OctData, Debug, Default)]
pub struct AbyssInfo {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1109)]
pub struct RpcAbyssGetDataRet {
pub retcode: Retcode,
pub abyss_info: AbyssInfo,
@ -225,7 +223,8 @@ pub struct RpcAbyssGetDataRet {
#[id(110)]
pub struct RpcGetBuddyDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1110)]
pub struct RpcGetBuddyDataRet {
pub retcode: Retcode,
}
@ -234,7 +233,8 @@ pub struct RpcGetBuddyDataRet {
#[id(111)]
pub struct RpcAbyssArpeggioGetDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1111)]
pub struct RpcAbyssArpeggioGetDataRet {
pub retcode: Retcode,
}
@ -243,7 +243,8 @@ pub struct RpcAbyssArpeggioGetDataRet {
#[id(112)]
pub struct RpcGetServerTimestampArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1112)]
pub struct RpcGetServerTimestampRet {
pub retcode: Retcode,
pub utc_offset: i32,
@ -254,7 +255,8 @@ pub struct RpcGetServerTimestampRet {
#[id(113)]
pub struct RpcVideoGetInfoArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1113)]
pub struct RpcVideoGetInfoRet {
pub retcode: Retcode,
}
@ -263,7 +265,8 @@ pub struct RpcVideoGetInfoRet {
#[id(114)]
pub struct RpcGetAuthkeyArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1114)]
pub struct RpcGetAuthkeyRet {
pub retcode: Retcode,
}
@ -277,7 +280,8 @@ pub struct RpcGetGachaDataArg {
#[derive(OctData, Debug, Default)]
pub struct GachaData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1115)]
pub struct RpcGetGachaDataRet {
pub retcode: Retcode,
pub gacha_type: u32,
@ -291,7 +295,8 @@ pub struct RpcGetCampIdleDataArg {}
#[derive(OctData, Debug, Default)]
pub struct CampIdleData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1116)]
pub struct RpcGetCampIdleDataRet {
pub retcode: Retcode,
pub camp_idle_data: CampIdleData,
@ -304,7 +309,8 @@ pub struct RpcSavePlayerSystemSettingArg {
pub r#type: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1117)]
pub struct RpcSavePlayerSystemSettingRet {
pub retcode: Retcode,
}
@ -316,7 +322,8 @@ pub struct RpcGetRamenDataArg {}
#[derive(OctData, Debug, Default)]
pub struct RamenData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1118)]
pub struct RpcGetRamenDataRet {
pub retcode: Retcode,
pub ramen_data: RamenData,
@ -329,7 +336,8 @@ pub struct RpcGetCafeDataArg {}
#[derive(OctData, Debug, Default)]
pub struct CafeData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1119)]
pub struct RpcGetCafeDataRet {
pub retcode: Retcode,
pub cafe_data: CafeData,
@ -342,7 +350,8 @@ pub struct RpcGetRewardBuffDataArg {}
#[derive(OctData, Debug, Default)]
pub struct RewardBuffData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1120)]
pub struct RpcGetRewardBuffDataRet {
pub retcode: Retcode,
pub data: RewardBuffData,
@ -352,7 +361,8 @@ pub struct RpcGetRewardBuffDataRet {
#[id(121)]
pub struct RpcGetPlayerMailsArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1121)]
pub struct RpcGetPlayerMailsRet {
pub retcode: Retcode,
}
@ -364,7 +374,8 @@ pub struct RpcGetFairyDataArg {}
#[derive(OctData, Debug, Default)]
pub struct FairyData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1122)]
pub struct RpcGetFairyDataRet {
pub retcode: Retcode,
pub data: FairyData,
@ -377,7 +388,8 @@ pub struct RpcGetTipsInfoArg {}
#[derive(OctData, Debug, Default)]
pub struct TipsInfo {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1123)]
pub struct RpcGetTipsInfoRet {
pub retcode: Retcode,
pub tips_info: TipsInfo,
@ -411,7 +423,8 @@ pub struct ClientSystemsData {
pub post_girl_data: PostGirlData,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1124)]
pub struct RpcGetClientSystemsDataRet {
pub retcode: Retcode,
pub data: ClientSystemsData,
@ -424,7 +437,8 @@ pub struct RpcGetPrivateMessageDataArg {}
#[derive(OctData, Debug, Default)]
pub struct PrivateMessageData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1125)]
pub struct RpcGetPrivateMessageDataRet {
pub retcode: Retcode,
pub private_message_data: PrivateMessageData,
@ -437,7 +451,8 @@ pub struct RpcGetCollectMapArg {}
#[derive(OctData, Debug, Default)]
pub struct CollectMap {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1126)]
pub struct RpcGetCollectMapRet {
pub retcode: Retcode,
pub collect_map: CollectMap,
@ -450,7 +465,8 @@ pub struct RpcWorkbenchGetDataArg {}
#[derive(OctData, Debug, Default)]
pub struct WorkbenchData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1127)]
pub struct RpcWorkbenchGetDataRet {
pub retcode: Retcode,
pub workbench_data: WorkbenchData,
@ -463,7 +479,8 @@ pub struct RpcGetAbyssRewardDataArg {}
#[derive(OctData, Debug, Default)]
pub struct AbyssRewardData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1128)]
pub struct RpcGetAbyssRewardDataRet {
pub retcode: Retcode,
pub abyss_reward_data: AbyssRewardData,
@ -476,7 +493,8 @@ pub struct RpcGetVhsStoreDataArg {}
#[derive(OctData, Debug, Default)]
pub struct VhsStoreData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1129)]
pub struct RpcGetVhsStoreDataRet {
pub retcode: Retcode,
pub data: VhsStoreData,
@ -486,7 +504,8 @@ pub struct RpcGetVhsStoreDataRet {
#[id(130)]
pub struct RpcGetActivityDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1130)]
pub struct RpcGetActivityDataRet {
pub retcode: Retcode,
}
@ -495,7 +514,8 @@ pub struct RpcGetActivityDataRet {
#[id(131)]
pub struct RpcGetWebActivityDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1131)]
pub struct RpcGetWebActivityDataRet {
pub retcode: Retcode,
}
@ -507,7 +527,8 @@ pub struct RpcGetEmbattlesDataArg {}
#[derive(OctData, Debug, Default)]
pub struct EmbattlesData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1132)]
pub struct RpcGetEmbattlesDataRet {
pub retcode: Retcode,
pub embattles_data: EmbattlesData,
@ -520,7 +541,8 @@ pub struct RpcGetNewsStandDataArg {}
#[derive(OctData, Debug, Default)]
pub struct NewsStandData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1133)]
pub struct RpcGetNewsStandDataRet {
pub retcode: Retcode,
pub news_stand_data: NewsStandData,
@ -533,7 +555,8 @@ pub struct RpcGetTrashbinHermitDataArg {}
#[derive(OctData, Debug, Default)]
pub struct TrashbinHermitData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1134)]
pub struct RpcGetTrashbinHermitDataRet {
pub retcode: Retcode,
pub trashbin_hermit_data: TrashbinHermitData,
@ -546,7 +569,8 @@ pub struct RpcGetMainCityRevivalDataArg {}
#[derive(OctData, Debug, Default)]
pub struct MainCityRevivalData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1135)]
pub struct RpcGetMainCityRevivalDataRet {
pub retcode: Retcode,
pub main_city_revival_data: MainCityRevivalData,
@ -556,7 +580,8 @@ pub struct RpcGetMainCityRevivalDataRet {
#[id(136)]
pub struct RpcGetArcadeDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1136)]
pub struct RpcGetArcadeDataRet {
pub retcode: Retcode,
}
@ -565,7 +590,8 @@ pub struct RpcGetArcadeDataRet {
#[id(137)]
pub struct RpcGetBattlePassDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1137)]
pub struct RpcGetBattlePassDataRet {
pub retcode: Retcode,
}
@ -574,7 +600,8 @@ pub struct RpcGetBattlePassDataRet {
#[id(138)]
pub struct RpcGetHadalZoneDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1138)]
pub struct RpcGetHadalZoneDataRet {
pub retcode: Retcode,
}
@ -583,7 +610,8 @@ pub struct RpcGetHadalZoneDataRet {
#[id(139)]
pub struct RpcGetBabelTowerDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1139)]
pub struct RpcGetBabelTowerDataRet {
pub retcode: Retcode,
}
@ -595,7 +623,8 @@ pub struct RpcGetDailyChallengeDataArg {}
#[derive(OctData, Debug, Default)]
pub struct DailyChallengeData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1140)]
pub struct RpcGetDailyChallengeDataRet {
pub retcode: Retcode,
pub data: DailyChallengeData,
@ -608,7 +637,8 @@ pub struct RpcGetRoleCardDataArg {}
#[derive(OctData, Debug, Default)]
pub struct RoleCardData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1141)]
pub struct RpcGetRoleCardDataRet {
pub retcode: Retcode,
pub role_card_data: RoleCardData,
@ -618,7 +648,8 @@ pub struct RpcGetRoleCardDataRet {
#[id(142)]
pub struct RpcGetChatEmojiListArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1142)]
pub struct RpcGetChatEmojiListRet {
pub retcode: Retcode,
}
@ -627,7 +658,8 @@ pub struct RpcGetChatEmojiListRet {
#[id(143)]
pub struct RpcGetFriendListArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1143)]
pub struct RpcGetFriendListRet {
pub retcode: Retcode,
}
@ -636,7 +668,8 @@ pub struct RpcGetFriendListRet {
#[id(144)]
pub struct RpcGetCharacterQuestListArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1144)]
pub struct RpcGetCharacterQuestListRet {
pub retcode: Retcode,
}
@ -645,7 +678,8 @@ pub struct RpcGetCharacterQuestListRet {
#[id(145)]
pub struct RpcGetExplorationDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1146)]
pub struct RpcGetExplorationDataRet {
pub retcode: Retcode,
}
@ -657,7 +691,8 @@ pub struct RpcGetFashionStoreDataArg {}
#[derive(OctData, Debug, Default)]
pub struct FashionStoreData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1146)]
pub struct RpcGetFashionStoreDataRet {
pub retcode: Retcode,
pub data: FashionStoreData,
@ -670,7 +705,8 @@ pub struct RpcGetShoppingMallInfoArg {}
#[derive(OctData, Debug, Default)]
pub struct ShoppingMallInfo {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1147)]
pub struct RpcGetShoppingMallInfoRet {
pub retcode: Retcode,
pub shopping_mall_info: ShoppingMallInfo,
@ -680,7 +716,8 @@ pub struct RpcGetShoppingMallInfoRet {
#[id(148)]
pub struct RpcGetOnlineFriendsListArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1148)]
pub struct RpcGetOnlineFriendsListRet {
pub retcode: Retcode,
}
@ -689,7 +726,8 @@ pub struct RpcGetOnlineFriendsListRet {
#[id(149)]
pub struct RpcEnterWorldArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1149)]
pub struct RpcEnterWorldRet {
pub retcode: Retcode,
}
@ -790,7 +828,8 @@ pub struct RpcSceneTransitionArg {
pub reason: String,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1151)]
pub struct RpcSceneTransitionRet {
pub retcode: Retcode,
}
@ -799,7 +838,8 @@ pub struct RpcSceneTransitionRet {
#[id(152)]
pub struct RpcEnterSectionCompleteArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1152)]
pub struct RpcEnterSectionCompleteRet {
pub retcode: Retcode,
}
@ -808,7 +848,8 @@ pub struct RpcEnterSectionCompleteRet {
#[id(153)]
pub struct RpcGetMonthCardRewardListArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1153)]
pub struct RpcGetMonthCardRewardListRet {
pub retcode: Retcode,
}
@ -817,7 +858,8 @@ pub struct RpcGetMonthCardRewardListRet {
#[id(154)]
pub struct RpcGetDisplayCaseDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1154)]
pub struct RpcGetDisplayCaseDataRet {
pub retcode: Retcode,
}
@ -835,7 +877,8 @@ pub struct RpcSavePosInMainCityArg {
pub section_id: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1155)]
pub struct RpcSavePosInMainCityRet {
pub retcode: Retcode,
}
@ -848,7 +891,8 @@ pub struct RpcPlayerOperationArg {
pub param: i32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1156)]
pub struct RpcPlayerOperationRet {
pub retcode: Retcode,
}
@ -857,7 +901,8 @@ pub struct RpcPlayerOperationRet {
#[id(157)]
pub struct RpcReportUiLayoutPlatformArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1157)]
pub struct RpcReportUiLayoutPlatformRet {
pub retcode: Retcode,
}
@ -866,7 +911,8 @@ pub struct RpcReportUiLayoutPlatformRet {
#[id(158)]
pub struct RpcPlayerTransactionArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1158)]
pub struct RpcPlayerTransactionRet {
pub retcode: Retcode,
pub transaction: String,
@ -876,7 +922,8 @@ pub struct RpcPlayerTransactionRet {
#[id(159)]
pub struct RpcRechargeGetItemListArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1159)]
pub struct RpcRechargeGetItemListRet {
pub retcode: Retcode,
}
@ -920,7 +967,8 @@ pub struct WishlistPlanInfo {
pub skill_wishlist_plan: Option<SkillWishlistPlan>,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1161)]
pub struct RpcGetWishlistDataRet {
pub retcode: Retcode,
pub wishlist_plan_list: Vec<WishlistPlanInfo>,
@ -930,7 +978,8 @@ pub struct RpcGetWishlistDataRet {
#[id(162)]
pub struct RpcGetMiniscapeEntrustDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1162)]
pub struct RpcGetMiniscapeEntrustDataRet {
pub retcode: Retcode,
}
@ -942,7 +991,8 @@ pub struct RpcGetJourneyDataArg {}
#[derive(OctData, Debug, Default)]
pub struct JourneyData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1163)]
pub struct RpcGetJourneyDataRet {
pub retcode: Retcode,
pub data: JourneyData,
@ -952,7 +1002,8 @@ pub struct RpcGetJourneyDataRet {
#[id(164)]
pub struct RpcGetPhotoWallDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1164)]
pub struct RpcGetPhotoWallDataRet {
pub retcode: Retcode,
}
@ -965,7 +1016,8 @@ pub struct RpcModMainCityAvatarArg {
pub main_city_avatar_id: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1165)]
pub struct RpcModMainCityAvatarRet {
pub retcode: Retcode,
}
@ -1055,7 +1107,8 @@ pub struct RpcModTimeArg {
pub time_period: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1166)]
pub struct RpcModTimeRet {
pub retcode: Retcode,
}
@ -1066,7 +1119,8 @@ pub struct RpcInteractWithClientEntityArg {
pub interaction: i32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1167)]
pub struct RpcInteractWithClientEntityRet {
pub retcode: Retcode,
}
@ -1079,7 +1133,8 @@ pub struct RpcInteractWithUnitArg {
pub npc_tag_id: i32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1168)]
pub struct RpcInteractWithUnitRet {
pub retcode: Retcode,
}
@ -1094,7 +1149,8 @@ pub struct RpcRunEventGraphArg {
pub event_graph_uid: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1169)]
pub struct RpcRunEventGraphRet {
pub retcode: Retcode,
}
@ -1106,7 +1162,8 @@ pub struct RpcEnterSectionArg {
pub transform_id: String,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1170)]
pub struct RpcEnterSectionRet {
pub retcode: Retcode,
}
@ -1115,7 +1172,8 @@ pub struct RpcEnterSectionRet {
#[id(171)]
pub struct RpcRefreshSectionArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1171)]
pub struct RpcRefreshSectionRet {
pub retcode: Retcode,
pub refresh_status: u32,
@ -1125,7 +1183,8 @@ pub struct RpcRefreshSectionRet {
#[id(172)]
pub struct RpcCheckYorozuyaInfoRefreshArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1172)]
pub struct RpcCheckYorozuyaInfoRefreshRet {
pub retcode: Retcode,
}
@ -1138,7 +1197,8 @@ pub struct RpcBeginTrainingCourseBattleArg {
pub quest_id: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1173)]
pub struct RpcBeginTrainingCourseBattleRet {
pub retcode: Retcode,
}
@ -1149,7 +1209,8 @@ pub struct RpcReportEmbattleInfoArg {
pub avatar_list: Vec<i32>,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1174)]
pub struct RpcReportEmbattleInfoRet {
pub retcode: Retcode,
}
@ -1158,7 +1219,8 @@ pub struct RpcReportEmbattleInfoRet {
#[id(175)]
pub struct RpcBattleReportArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1175)]
pub struct RpcBattleReportRet {
pub retcode: Retcode,
}
@ -1172,7 +1234,8 @@ pub struct RpcEndBattleArg {
#[derive(OctData, Debug, Default)]
pub struct FightSettle {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1176)]
pub struct RpcEndBattleRet {
pub retcode: Retcode,
pub fight_settle: FightSettle,
@ -1182,7 +1245,8 @@ pub struct RpcEndBattleRet {
#[id(177)]
pub struct RpcLeaveCurSceneArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1177)]
pub struct RpcLeaveCurSceneRet {
pub retcode: Retcode,
}
@ -1194,7 +1258,8 @@ pub struct RpcGetPlayerNetworkDataArg {}
#[derive(OctData, Debug, Default)]
pub struct PlayerNetworkData {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1178)]
pub struct RpcGetPlayerNetworkDataRet {
pub retcode: Retcode,
pub player_network_data: Option<PlayerNetworkData>,
@ -1207,7 +1272,8 @@ pub struct RpcWeaponDressArg {
pub weapon_uid: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1179)]
pub struct RpcWeaponDressRet {
pub retcode: Retcode,
}
@ -1218,7 +1284,8 @@ pub struct RpcWeaponUnDressArg {
pub avatar_id: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1180)]
pub struct RpcWeaponUnDressRet {
pub retcode: Retcode,
}
@ -1227,7 +1294,8 @@ pub struct RpcWeaponUnDressRet {
#[id(181)]
pub struct RpcGetRidusGotBooDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1181)]
pub struct RpcGetRidusGotBooDataRet {
pub retcode: Retcode,
}
@ -1236,7 +1304,8 @@ pub struct RpcGetRidusGotBooDataRet {
#[id(182)]
pub struct RpcGetFishingContestDataArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1182)]
pub struct RpcGetFishingContestDataRet {
pub retcode: Retcode,
}
@ -1245,7 +1314,8 @@ pub struct RpcGetFishingContestDataRet {
#[id(183)]
pub struct RpcGetRedDotListArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1183)]
pub struct RpcGetRedDotListRet {
pub retcode: Retcode,
}
@ -1256,7 +1326,8 @@ pub struct RpcGetAvatarRecommendEquipArg {
pub avatar_id: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1184)]
pub struct RpcGetAvatarRecommendEquipRet {
pub retcode: Retcode,
}
@ -1265,7 +1336,8 @@ pub struct RpcGetAvatarRecommendEquipRet {
#[id(185)]
pub struct RpcPostEnterWorldArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1185)]
pub struct RpcPostEnterWorldRet {
pub retcode: Retcode,
}
@ -1276,7 +1348,8 @@ pub struct RpcSetLanguageArg {
pub language: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1186)]
pub struct RpcSetLanguageRet {
pub retcode: Retcode,
}
@ -1288,7 +1361,8 @@ pub struct RpcSelectPostGirlArg {
pub new_selected_post_girl_id_list: Vec<u32>,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1187)]
pub struct RpcSelectPostGirlRet {
pub retcode: Retcode,
}
@ -1302,7 +1376,8 @@ pub struct RpcBeginArchiveBattleQuestArg {
pub buddy_id: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1188)]
pub struct RpcBeginArchiveBattleQuestRet {
pub retcode: Retcode,
pub quest_id: u32,
@ -1315,7 +1390,8 @@ pub struct RpcPerformTriggerArg {
pub perform_type: i32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1189)]
pub struct RpcPerformTriggerRet {
pub retcode: Retcode,
pub perform_uid: i64,
@ -1329,7 +1405,8 @@ pub struct RpcPerformJumpArg {
pub perform_uid: i64,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1190)]
pub struct RpcPerformJumpRet {
pub retcode: Retcode,
}
@ -1342,7 +1419,8 @@ pub struct RpcPerformEndArg {
pub perform_uid: i64,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1191)]
pub struct RpcPerformEndRet {
pub retcode: Retcode,
}
@ -1353,7 +1431,8 @@ pub struct RpcEndNewbieArg {
pub group_id: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1192)]
pub struct RpcEndNewbieRet {
pub retcode: Retcode,
}
@ -1364,7 +1443,8 @@ pub struct RpcFinishArchiveQuestArg {
pub quest_id: u32,
}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1193)]
pub struct RpcFinishArchiveQuestRet {
pub retcode: Retcode,
pub quest_id: u32,
@ -1374,7 +1454,8 @@ pub struct RpcFinishArchiveQuestRet {
#[id(194)]
pub struct RpcPlayerLogoutArg {}
#[derive(OctData, Debug, Default)]
#[derive(OctData, Debug, Default, ProtocolID)]
#[id(1194)]
pub struct RpcPlayerLogoutRet {
pub retcode: Retcode,
}

View file

@ -1,16 +0,0 @@
[package]
name = "qwer-rpc"
edition = "2021"
version.workspace = true
[dependencies]
common.workspace = true
qwer.workspace = true
tokio.workspace = true
futures.workspace = true
tracing.workspace = true
thiserror.workspace = true
dashmap.workspace = true
byteorder.workspace = true

View file

@ -1,14 +0,0 @@
[package]
name = "qwer-client-example"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio.workspace = true
anyhow.workspace = true
common.workspace = true
tracing.workspace = true
protocol.workspace = true
qwer.workspace = true
qwer-rpc.workspace = true

View file

@ -1,39 +0,0 @@
use std::time::Duration;
use anyhow::Result;
use protocol::{RpcGetPlayerBasicInfoArg, RpcGetPlayerBasicInfoRet};
use qwer_rpc::{ProtocolServiceFrontend, RpcPtcServiceFrontend};
#[tokio::main]
async fn main() -> Result<()> {
common::logging::init(tracing::Level::DEBUG);
let service = RpcPtcServiceFrontend::new(ProtocolServiceFrontend::new());
let protocol_point = service.create_point(None).await?;
let rsp = protocol_point
.call_rpc::<_, RpcGetPlayerBasicInfoRet>(
"127.0.0.1:10101".parse()?,
RpcGetPlayerBasicInfoArg::default(),
Vec::with_capacity(0),
Duration::from_secs(5),
)
.await?;
println!("rsp: {rsp:?}");
let protocol_point = service.create_point(None).await?;
let rsp = protocol_point
.call_rpc::<_, RpcGetPlayerBasicInfoRet>(
"127.0.0.1:10101".parse()?,
RpcGetPlayerBasicInfoArg::default(),
Vec::with_capacity(0),
Duration::from_secs(5),
)
.await?;
println!("rsp: {rsp:?}");
Ok(())
}

View file

@ -1,14 +0,0 @@
[package]
name = "qwer-server-example"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio.workspace = true
anyhow.workspace = true
common.workspace = true
tracing.workspace = true
protocol.workspace = true
qwer.workspace = true
qwer-rpc.workspace = true

View file

@ -1,58 +0,0 @@
use std::{
sync::{Arc, OnceLock},
time::Duration,
};
use anyhow::Result;
use protocol::{
PlayerBasicInfo, Retcode, RpcGetPlayerBasicInfoArg, RpcGetPlayerBasicInfoRet,
RpcPlayerLoginArg, RpcPlayerLoginRet,
};
use qwer::ProtocolID;
use qwer_rpc::{ProtocolServiceFrontend, RpcPtcContext, RpcPtcPoint, RpcPtcServiceFrontend};
use tokio::time;
static SERVICE: OnceLock<RpcPtcServiceFrontend> = OnceLock::new();
static SERVER_POINT: OnceLock<Arc<RpcPtcPoint>> = OnceLock::new();
#[tokio::main]
async fn main() -> Result<()> {
common::logging::init(tracing::Level::DEBUG);
println!("Hello, world!");
let service =
SERVICE.get_or_init(|| RpcPtcServiceFrontend::new(ProtocolServiceFrontend::new()));
let point = service.create_point(Some("0.0.0.0:10101".parse()?)).await?;
let point = SERVER_POINT.get_or_init(|| point);
point.register_rpc_recv(RpcPlayerLoginArg::PROTOCOL_ID, handle_login);
point.register_rpc_recv(RpcGetPlayerBasicInfoArg::PROTOCOL_ID, handle_get_basic_info);
time::sleep(Duration::from_secs(100000)).await;
Ok(())
}
async fn handle_login(ctx: RpcPtcContext) {
let arg = ctx.get_arg::<RpcPlayerLoginArg>().unwrap();
println!("Login: {arg:?}");
ctx.send_ret(RpcPlayerLoginRet {
retcode: Retcode::Succ,
})
.await;
}
async fn handle_get_basic_info(ctx: RpcPtcContext) {
println!("BasicInfo req");
ctx.send_ret(RpcGetPlayerBasicInfoRet {
basic_info: PlayerBasicInfo {
nick_name: String::from("Name"),
..Default::default()
},
retcode: Retcode::Succ,
})
.await;
}

View file

@ -1,12 +0,0 @@
mod object_res_mini_mgr;
mod protocol;
mod rpc_ptc;
pub use protocol::protocol_helper::{ProtocolLinker, ProtocolListener};
pub use protocol::protocol_point::ProtocolPoint;
pub use protocol::protocol_service::ProtocolServiceFrontend;
pub use protocol::protocol_session::{ProtocolContext, ProtocolSession, ProtocolSessionImpl};
pub use rpc_ptc::middleware;
pub use rpc_ptc::rpc_ptc_point::{RpcCallError, RpcHandler, RpcPtcContext, RpcPtcPoint};
pub use rpc_ptc::rpc_ptc_service::RpcPtcServiceFrontend;

View file

@ -1,45 +0,0 @@
use std::sync::atomic::{self, AtomicU64};
use dashmap::{mapref::one::Ref, DashMap};
pub struct ObjectResMiniMgr<T> {
uid_counter: AtomicU64,
objects: DashMap<u64, T>,
}
impl<T> ObjectResMiniMgr<T>
where
T: ResObj,
{
pub fn new() -> Self {
Self {
uid_counter: AtomicU64::new(1),
objects: DashMap::new(),
}
}
pub fn insert(&self, obj: T) -> u64 {
let uid = self.uid_counter.fetch_add(1, atomic::Ordering::SeqCst);
obj.set_uid(uid);
self.objects.insert(uid, obj);
uid
}
pub fn iter(&self) -> dashmap::iter::Iter<'_, u64, T> {
self.objects.iter()
}
pub fn get(&self, uid: u64) -> Option<Ref<u64, T>> {
self.objects.get(&uid)
}
pub fn release(&self, uid: u64) -> Option<(u64, T)> {
self.objects.remove(&uid)
}
}
pub trait ResObj {
fn set_uid(&self, uid: u64);
fn get_uid(&self) -> u64;
}

View file

@ -1,5 +0,0 @@
pub mod protocol_entity;
pub mod protocol_helper;
pub mod protocol_point;
pub mod protocol_service;
pub mod protocol_session;

View file

@ -1,225 +0,0 @@
use std::{
io::Cursor,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, Weak},
time::Duration,
};
use dashmap::{
mapref::one::{Ref, RefMut},
DashMap,
};
use qwer::ProtocolStream;
use tokio::{sync::OnceCell, time::timeout};
use crate::{
object_res_mini_mgr::ResObj, protocol::protocol_helper,
protocol::protocol_point::ProtocolPoint,
protocol::protocol_service::ProtocolServiceFrontendImpl, ProtocolContext, ProtocolLinker,
ProtocolListener, ProtocolSessionImpl,
};
pub struct ProtocolEntity {
service_backend: OnceCell<Weak<ProtocolServiceFrontendImpl>>,
local_addr: Option<SocketAddr>,
protocol_listener: Option<ProtocolListener>,
used_channels: DashMap<u16, ProtocolChannelInfo>,
pub sessions: DashMap<SocketAddr, Arc<ProtocolSessionImpl>>,
pub session_uid_map: DashMap<u64, SocketAddr>,
}
pub struct ProtocolChannelInfo {
pub callback: Option<Box<dyn Fn(ProtocolContext) + Send + Sync>>,
}
#[derive(thiserror::Error, Debug)]
enum AcceptError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Invalid addr len")]
InvalidAddrLen,
#[error("Invalid addr format")]
InvalidAddrFormat,
}
impl ProtocolEntity {
pub fn new(local_addr: Option<SocketAddr>, listener: Option<ProtocolListener>) -> Self {
Self {
service_backend: OnceCell::new(),
local_addr,
protocol_listener: listener,
used_channels: DashMap::new(),
sessions: DashMap::new(),
session_uid_map: DashMap::new(),
}
}
pub async fn connect(
self: Arc<Self>,
addr: SocketAddr,
no_reverse: bool,
) -> std::io::Result<u64> {
if let Some(session_impl) = self.sessions.get(&addr) {
return Ok(session_impl.get_uid());
}
let linker = protocol_helper::connect(addr).await?;
let protocol_session = Arc::new(ProtocolSessionImpl::new(true, linker, self.clone()));
let uid = self
.service_backend()
.session_mgr
.insert(protocol_session.clone());
let mut buf: Vec<u8> = Vec::new();
let mut ps = ProtocolStream::new(Cursor::new(&mut buf));
if let Some(local_addr) = self.local_addr {
let IpAddr::V4(ip) = local_addr.ip() else {
unreachable!();
};
ps.push_boolean(true)?;
let octets = ip.octets();
ps.push_u8(octets.len() as u8)?;
if no_reverse {
ps.push(&octets)?;
} else {
// it's hell
todo!()
}
ps.push_u16(local_addr.port())?;
ps.push_boolean(no_reverse)?;
} else {
ps.push_boolean(false)?;
ps.push_u8(0)?;
}
protocol_session.linker.send(&buf).await;
self.sessions.insert(addr, protocol_session.clone());
self.session_uid_map
.insert(protocol_session.get_uid(), addr);
tokio::spawn(async move { self.service_backend().begin_recv(protocol_session).await });
Ok(uid)
}
pub fn get_channel_info(&self, channel_id: u16) -> Option<Ref<'_, u16, ProtocolChannelInfo>> {
self.used_channels.get(&channel_id)
}
pub fn get_channel_info_mut(
&self,
channel_id: u16,
) -> Option<RefMut<'_, u16, ProtocolChannelInfo>> {
self.used_channels.get_mut(&channel_id)
}
pub fn set_backend(&self, backend: Arc<ProtocolServiceFrontendImpl>) {
let _ = self.service_backend.set(Arc::downgrade(&backend));
}
pub fn create_point(self: Arc<Self>, channel: u16) -> Option<ProtocolPoint> {
if self.used_channels.contains_key(&channel) {
return None;
}
self.used_channels
.insert(channel, ProtocolChannelInfo { callback: None });
Some(ProtocolPoint::new(
&self.service_backend(),
self.clone(),
channel,
))
}
fn service_backend(&self) -> Arc<ProtocolServiceFrontendImpl> {
self.service_backend
.get()
.unwrap()
.upgrade()
.expect("backend gone?")
}
pub fn has_listener(&self) -> bool {
self.protocol_listener.is_some()
}
pub async fn accept(self: &Arc<Self>) -> std::io::Result<Arc<ProtocolSessionImpl>> {
loop {
let mut linker = self.protocol_listener.as_ref().unwrap().accept().await?;
let Ok(result) = timeout(
Duration::from_millis(200),
Self::proceed_accept(&mut linker),
)
.await
else {
continue;
};
let remote_addr = match result {
Ok(r) => r,
Err(err) => {
tracing::warn!("accept error, skipping client: {err}");
continue;
}
};
let session = Arc::new(ProtocolSessionImpl::new(false, linker, self.clone()));
self.sessions
.insert(session.linker.remote_addr, session.clone());
if let Some(addr) = remote_addr {
self.sessions.insert(addr, session.clone());
}
return Ok(session);
}
}
async fn proceed_accept(
linker: &mut ProtocolLinker,
) -> Result<Option<SocketAddr>, AcceptError> {
let mut buf = [0u8; 9];
let mut n = 0;
while n < 2 {
n += linker.recv_some(&mut buf[..2]).await?;
}
let mut ps = ProtocolStream::new(Cursor::new(&buf));
let has_addr = ps.pop_boolean()?;
let addr_size = ps.pop_u8()? as usize;
if has_addr {
if addr_size != 4 {
return Err(AcceptError::InvalidAddrLen);
}
while n < 5 + addr_size {
n += linker.recv_some(&mut buf[n..]).await?;
}
let mut ps = ProtocolStream::new(Cursor::new(&buf[2..]));
let octets = ps.pop(addr_size)?;
let port = ps.pop_u16()?;
let no_reverse = ps.pop_boolean()?;
if !no_reverse {
return Err(AcceptError::InvalidAddrFormat); // not supported yet
}
Ok(Some(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(octets[0], octets[1], octets[2], octets[3])),
port,
)))
} else {
if addr_size != 0 {
return Err(AcceptError::InvalidAddrLen);
}
Ok(None)
}
}
}

View file

@ -1,58 +0,0 @@
use std::io::Result;
use std::net::SocketAddr;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream,
},
sync::Mutex,
};
pub struct ProtocolListener {
socket: TcpListener,
}
pub struct ProtocolLinker {
pub remote_addr: SocketAddr,
write_half: Mutex<OwnedWriteHalf>,
read_half: Mutex<OwnedReadHalf>,
}
pub async fn listen(local_addr: SocketAddr) -> Result<ProtocolListener> {
Ok(ProtocolListener {
socket: TcpListener::bind(local_addr).await?,
})
}
pub async fn connect(remote_addr: SocketAddr) -> Result<ProtocolLinker> {
let stream = TcpStream::connect(remote_addr).await?;
Ok(ProtocolLinker::new(remote_addr, stream))
}
impl ProtocolLinker {
pub async fn send(&self, buf: &[u8]) {
let _ = self.write_half.lock().await.write_all(buf).await;
}
pub async fn recv_some(&self, buf: &mut [u8]) -> Result<usize> {
self.read_half.lock().await.read(buf).await
}
fn new(remote_addr: SocketAddr, stream: TcpStream) -> Self {
let (read, write) = stream.into_split();
ProtocolLinker {
remote_addr,
write_half: Mutex::new(write),
read_half: Mutex::new(read),
}
}
}
impl ProtocolListener {
pub async fn accept(&self) -> Result<ProtocolLinker> {
let (stream, remote_addr) = self.socket.accept().await?;
Ok(ProtocolLinker::new(remote_addr, stream))
}
}

View file

@ -1,97 +0,0 @@
use std::{
net::SocketAddr,
sync::{Arc, Weak},
time::Duration,
};
use crate::{
protocol::protocol_entity::ProtocolEntity,
protocol::protocol_service::ProtocolServiceFrontendImpl, ProtocolContext,
};
pub struct ProtocolPoint {
service_backend: Weak<ProtocolServiceFrontendImpl>,
entity: Arc<ProtocolEntity>,
channel: u16,
}
impl ProtocolPoint {
pub fn new(
service_backend: &Arc<ProtocolServiceFrontendImpl>,
entity: Arc<ProtocolEntity>,
channel: u16,
) -> Self {
Self {
service_backend: Arc::downgrade(service_backend),
entity,
channel,
}
}
pub fn register_rpc_call(&self, cb: impl Fn(ProtocolContext) + Send + Sync + 'static) {
self.entity
.get_channel_info_mut(self.channel)
.unwrap()
.callback = Some(Box::new(cb));
}
pub async fn send_rpc(
&self,
addr: SocketAddr,
rpc_arg: Box<[u8]>,
to_channel: u16,
arg_uid: u64,
) {
let _ = self
.service_backend
.upgrade()
.unwrap()
.send_rpc(
self.entity.clone(),
self.channel,
to_channel,
addr,
&rpc_arg,
Duration::ZERO,
arg_uid == 0,
arg_uid,
)
.await;
}
pub async fn call_rpc(
&self,
addr: SocketAddr,
rpc_arg: &[u8],
timeout: Duration,
) -> Option<Box<[u8]>> {
self.service_backend
.upgrade()
.unwrap()
.send_rpc(
self.entity.clone(),
self.channel,
0,
addr,
rpc_arg,
timeout,
false,
0,
)
.await
.inspect_err(|err| tracing::error!("ProtocolPoint::CallRpc -> send_rpc failed: {err}"))
.ok()
.flatten()
}
pub fn close(&self, addr: SocketAddr) {
self.service_backend
.upgrade()
.unwrap()
.close_session(&self.entity, addr);
}
pub fn get_channel(&self) -> u16 {
self.channel
}
}

View file

@ -1,345 +0,0 @@
use std::{
io::Cursor,
net::SocketAddr,
sync::{
atomic::{self, AtomicU16},
Arc,
},
time::Duration,
};
use common::time_util;
use dashmap::DashMap;
use qwer::{ProtocolHeader, ProtocolStream};
use tokio::{
sync::{oneshot, OnceCell},
time,
};
use crate::{
object_res_mini_mgr::{ObjectResMiniMgr, ResObj},
protocol::protocol_entity::{ProtocolChannelInfo, ProtocolEntity},
protocol::protocol_helper,
protocol::protocol_point::ProtocolPoint,
ProtocolContext, ProtocolSession, ProtocolSessionImpl,
};
pub struct RpcArgInfo {
#[expect(unused)]
pub session: ProtocolSession,
pub sender: oneshot::Sender<Box<[u8]>>,
uid: OnceCell<u64>,
}
impl RpcArgInfo {
pub fn new(session: ProtocolSession, sender: oneshot::Sender<Box<[u8]>>) -> Self {
Self {
session,
sender,
uid: OnceCell::new(),
}
}
}
impl ResObj for RpcArgInfo {
fn set_uid(&self, uid: u64) {
let _ = self.uid.set(uid);
}
fn get_uid(&self) -> u64 {
self.uid.get().copied().unwrap_or_default()
}
}
pub struct ProtocolServiceFrontend {
backend: Arc<ProtocolServiceFrontendImpl>,
}
impl ProtocolServiceFrontend {
pub fn new() -> Self {
Self {
backend: ProtocolServiceFrontendImpl::new(),
}
}
pub async fn create_point(
&self,
local_addr: Option<SocketAddr>,
) -> Result<ProtocolPoint, std::io::Error> {
self.backend.create_point(local_addr).await
}
}
pub struct ProtocolServiceFrontendImpl {
protocol_entity_map: DashMap<Option<SocketAddr>, Arc<ProtocolEntity>>,
pub session_mgr: ObjectResMiniMgr<Arc<ProtocolSessionImpl>>,
rpc_arg_mgr: ObjectResMiniMgr<RpcArgInfo>,
channel_counter: AtomicU16,
}
#[derive(thiserror::Error, Debug)]
pub enum ProtocolError {
#[error("I/O Error: {0}")]
Io(#[from] std::io::Error),
#[error("header size doesn't match, expected: {0}, received: {1}")]
HeaderSize(usize, usize),
}
#[derive(thiserror::Error, Debug)]
pub enum SendRpcError {
#[error("I/O Error: {0}")]
Io(#[from] std::io::Error),
#[error("RPC timeout reached")]
Timeout,
}
impl ProtocolServiceFrontendImpl {
pub fn new() -> Arc<Self> {
let inst = Arc::new(Self {
protocol_entity_map: DashMap::new(),
session_mgr: ObjectResMiniMgr::new(),
rpc_arg_mgr: ObjectResMiniMgr::new(),
channel_counter: AtomicU16::new(0),
});
let inst_clone = inst.clone();
tokio::spawn(async move { inst_clone.check_active_sessions() });
inst
}
pub async fn send_rpc(
&self,
entity: Arc<ProtocolEntity>,
from_channel: u16,
to_channel: u16,
to_addr: SocketAddr,
body: &[u8],
timeout: Duration,
is_ptc: bool,
arg_uid: u64,
) -> Result<Option<Box<[u8]>>, SendRpcError> {
let session_uid = entity.connect(to_addr, true).await?;
let Some(session_impl) = self.session_mgr.get(session_uid).map(|s| s.clone()) else {
tracing::error!("self.session_mgr.get({session_uid}) = None");
return Ok(None);
};
let (rpc_arg_uid, receiver) = if !is_ptc && arg_uid == 0 {
let (sender, receiver) = oneshot::channel();
let arg_info = RpcArgInfo::new(
ProtocolSession {
remote_addr: session_impl.linker.remote_addr,
session_id: session_impl.get_uid(),
local_channel: from_channel,
remote_channel: to_channel,
},
sender,
);
(self.rpc_arg_mgr.insert(arg_info), Some(receiver))
} else {
(arg_uid, None)
};
let header = ProtocolHeader {
from_channel,
to_channel: 0,
is_rpc_ret: arg_uid != 0,
rpc_arg_uid,
};
let mut buf = Vec::new();
let mut ps = ProtocolStream::new(Cursor::new(&mut buf));
ps.push_u16(to_channel)?;
ps.push_u32(body.len() as u32)?;
ps.push_u16(ProtocolHeader::SIZE as u16)?;
header.marshal_to(&mut ps)?;
ps.push(&body)?;
session_impl.linker.send(&buf).await;
match receiver {
Some(r) => Ok(time::timeout(timeout, r)
.await
.map_err(|_| SendRpcError::Timeout)?
.ok()),
None => Ok(None),
}
}
pub async fn begin_accept(self: Arc<Self>, protocol_entity: Arc<ProtocolEntity>) {
loop {
let protocol_session = protocol_entity.accept().await.unwrap(); // TODO: handle err
let uid = self.session_mgr.insert(protocol_session.clone());
tracing::info!("[SERVICE] new ProtocolSession, uid: {uid}");
let s = self.clone();
tokio::spawn(async { s.begin_recv(protocol_session).await });
}
}
pub async fn begin_recv(self: Arc<Self>, session: Arc<ProtocolSessionImpl>) {
let uid = session.get_uid();
if let Err(err) = self.do_recv(session).await {
tracing::error!("ProtocolServiceFrontendImpl: session: {uid} recv failed: {err}",);
}
self.release_session(uid);
}
async fn do_recv(&self, session: Arc<ProtocolSessionImpl>) -> Result<(), ProtocolError> {
let mut buf = [0u8; 8];
let mut n = 0;
loop {
while n < 8 {
match session.linker.recv_some(&mut buf[n..8]).await? {
r if r > 0 => n += r,
_ => return Ok(()),
}
}
let mut ps = ProtocolStream::new(Cursor::new(&mut buf));
let to_channel = ps.pop_u16()?;
let body_len = ps.pop_u32()? as usize;
let header_len = ps.pop_u16()? as usize;
if header_len != ProtocolHeader::SIZE {
return Err(ProtocolError::HeaderSize(ProtocolHeader::SIZE, header_len));
}
n = 0;
let mut buf = vec![0u8; header_len + body_len];
while n < header_len + body_len {
match session
.linker
.recv_some(&mut buf[n..header_len + body_len])
.await?
{
r if r > 0 => n += r,
_ => return Ok(()),
}
}
let mut ps = ProtocolStream::new(Cursor::new(&mut buf[..header_len + body_len]));
let header = ProtocolHeader::unmarshal_from(&mut ps)?;
if let Some(channel_info) = session.entity.get_channel_info(to_channel) {
let body = ps.pop(body_len)?;
self.handle_received_rpc(&session, &channel_info, header, body)
.await;
} else {
println!("channel with id {to_channel} not found");
}
n = 0;
}
}
async fn handle_received_rpc(
&self,
session: &Arc<ProtocolSessionImpl>,
info: &ProtocolChannelInfo,
header: ProtocolHeader,
body: Vec<u8>,
) {
session
.last_active_time
.store(time_util::unix_timestamp(), atomic::Ordering::SeqCst);
if header.is_rpc_ret {
if let Some((_, rpc_arg_info)) = self.rpc_arg_mgr.release(header.rpc_arg_uid) {
rpc_arg_info.sender.send(body.into_boxed_slice()).unwrap();
} else {
println!("can't find rpc_arg_info with uid {}", header.rpc_arg_uid);
}
} else {
if let Some(cb) = info.callback.as_ref() {
cb(ProtocolContext {
session: ProtocolSession {
remote_addr: session.linker.remote_addr,
session_id: session.get_uid(),
local_channel: header.to_channel,
remote_channel: header.from_channel,
},
rpc_arg_uid: header.rpc_arg_uid,
body: body.into_boxed_slice(),
local_channel: header.to_channel,
remote_channel: header.from_channel,
})
}
}
}
async fn check_active_sessions(self: Arc<Self>) {
loop {
time::sleep(Duration::from_millis(2000)).await;
let cur_time = time_util::unix_timestamp();
let mut remove_session_vec = Vec::new();
for session in self.session_mgr.iter() {
if cur_time - session.last_active_time.load(atomic::Ordering::SeqCst) > 30000 {
remove_session_vec.push(session.get_uid());
}
}
remove_session_vec
.into_iter()
.for_each(|uid| self.release_session(uid));
}
}
pub async fn create_point(
self: &Arc<Self>,
addr: Option<SocketAddr>,
) -> Result<ProtocolPoint, std::io::Error> {
let entity = if let Some(entity) = self.protocol_entity_map.get(&addr) {
entity.clone()
} else {
let listener = match addr {
Some(addr) => Some(protocol_helper::listen(addr).await?),
None => None,
};
let protocol_entity = Arc::new(ProtocolEntity::new(addr, listener));
protocol_entity.set_backend(self.clone());
self.protocol_entity_map
.insert(addr, protocol_entity.clone());
if protocol_entity.has_listener() {
let s = self.clone();
let e = protocol_entity.clone();
tokio::spawn(async move {
s.begin_accept(e).await;
});
}
protocol_entity
};
Ok(entity
.create_point(self.channel_counter.fetch_add(1, atomic::Ordering::SeqCst))
.unwrap())
}
pub fn close_session(&self, entity: &Arc<ProtocolEntity>, addr: SocketAddr) {
if let Some(session) = entity.sessions.get(&addr) {
self.release_session(session.get_uid());
}
}
fn release_session(&self, uid: u64) {
tracing::info!("[SERVICE] released ProtocolSession, uid: {uid}");
self.protocol_entity_map.iter().for_each(|e| {
if let Some((_, addr)) = e.value().session_uid_map.remove(&uid) {
e.value().sessions.remove(&addr);
}
});
self.session_mgr.release(uid);
}
}

View file

@ -1,68 +0,0 @@
use std::{
fmt,
net::SocketAddr,
sync::{atomic::AtomicU64, Arc},
};
use common::time_util;
use tokio::sync::OnceCell;
use crate::{
object_res_mini_mgr::ResObj, protocol::protocol_entity::ProtocolEntity, ProtocolLinker,
};
#[derive(Clone)]
pub struct ProtocolSession {
pub session_id: u64,
pub local_channel: u16,
pub remote_channel: u16,
pub remote_addr: SocketAddr,
}
pub struct ProtocolSessionImpl {
pub is_connector: bool,
pub linker: ProtocolLinker,
pub entity: Arc<ProtocolEntity>,
pub last_active_time: AtomicU64,
uid: OnceCell<u64>,
}
pub struct ProtocolContext {
pub session: ProtocolSession,
pub body: Box<[u8]>,
pub rpc_arg_uid: u64,
pub local_channel: u16,
pub remote_channel: u16,
}
impl fmt::Display for ProtocolSession {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{}/{}",
self.session_id, self.local_channel, self.remote_channel
)
}
}
impl ProtocolSessionImpl {
pub fn new(is_connector: bool, linker: ProtocolLinker, entity: Arc<ProtocolEntity>) -> Self {
ProtocolSessionImpl {
is_connector,
linker,
entity,
last_active_time: AtomicU64::new(time_util::unix_timestamp()),
uid: OnceCell::new(),
}
}
}
impl ResObj for Arc<ProtocolSessionImpl> {
fn set_uid(&self, uid: u64) {
let _ = self.uid.set(uid);
}
fn get_uid(&self) -> u64 {
self.uid.get().copied().unwrap_or_default()
}
}

View file

@ -1,88 +0,0 @@
use std::io::{Cursor, Read, Write};
use qwer::{OctData, ProtocolStream};
use tracing::debug;
#[derive(Debug, Clone)]
pub enum MiddlewareModel {
Account(AccountMiddlewareModel),
Unknown(u16, Box<[u8]>),
}
pub fn unmarshal_middleware_list<R: Read>(
s: &mut ProtocolStream<R>,
) -> std::io::Result<Vec<MiddlewareModel>> {
let size = s.pop_u16()?;
(0..size)
.map(|_| {
let middleware_type = s.pop_u16()?;
let middleware_size = s.pop_u16()?;
let buf = s.pop(middleware_size as usize)?;
match middleware_type {
1 => Ok(MiddlewareModel::Account(
AccountMiddlewareModel::unmarshal_from(&mut Cursor::new(&buf), 0)?,
)),
_ => {
debug!("unknown middleware_type encountered: {middleware_type}");
Ok(MiddlewareModel::Unknown(
middleware_type,
buf.into_boxed_slice(),
))
}
}
})
.collect()
}
pub fn marshal_middleware_list<W: Write>(
s: &mut ProtocolStream<W>,
list: &[MiddlewareModel],
) -> std::io::Result<()> {
s.push_u16(list.len() as u16)?;
list.iter()
.try_for_each(|middleware_model| match middleware_model {
MiddlewareModel::Account(model) => {
let mut data = [0u8; 17];
model.marshal_to(&mut Cursor::new(&mut data[..]), 0)?;
s.push_u16(1)?;
s.push_u16(data.len() as u16)?;
s.push(&data)
}
MiddlewareModel::Unknown(ty, data) => {
s.push_u16(*ty)?;
s.push_u16(data.len() as u16)?;
s.push(&data)
}
})
}
#[derive(Debug, Clone)]
pub struct AccountMiddlewareModel {
pub player_uid: u64,
pub client_protocol_uid: u64,
pub is_resend: bool,
}
impl OctData for AccountMiddlewareModel {
fn marshal_to<W: std::io::Write>(&self, w: &mut W, _: u16) -> std::io::Result<()> {
use byteorder::{WriteBytesExt, LE};
w.write_u64::<LE>(self.player_uid)?;
w.write_u64::<LE>(self.client_protocol_uid)?;
w.write_u8(self.is_resend as u8)
}
fn unmarshal_from<R: std::io::Read>(r: &mut R, _: u16) -> std::io::Result<Self>
where
Self: Sized,
{
use byteorder::{ReadBytesExt, LE};
Ok(Self {
player_uid: r.read_u64::<LE>()?,
client_protocol_uid: r.read_u64::<LE>()?,
is_resend: r.read_u8()? != 0,
})
}
}

View file

@ -1,3 +0,0 @@
pub mod middleware;
pub mod rpc_ptc_point;
pub mod rpc_ptc_service;

View file

@ -1,249 +0,0 @@
use std::{
future::Future,
io::{Cursor, Read, Write},
net::SocketAddr,
sync::Arc,
time::Duration,
};
use byteorder::{ReadBytesExt, WriteBytesExt, BE, LE};
use dashmap::DashMap;
use futures::future::BoxFuture;
use qwer::{OctData, ProtocolID, ProtocolStream};
use tracing::{debug, warn};
use crate::{ProtocolContext, ProtocolPoint};
use super::middleware::{marshal_middleware_list, unmarshal_middleware_list, MiddlewareModel};
pub struct RpcPtcContext {
pub point: Arc<RpcPtcPoint>,
pub protocol_id: u16,
pub arg: Box<[u8]>,
pub addr: SocketAddr,
pub middleware_list: Vec<MiddlewareModel>,
arg_uid: u64,
remote_channel: u16,
}
impl RpcPtcContext {
pub fn get_arg<Arg: OctData>(&self) -> Result<Arg, std::io::Error> {
let mut r = Cursor::new(&self.arg);
Arg::unmarshal_from(&mut r, 0)
}
pub async fn send_ptc<RpcArg: OctData + ProtocolID>(&self, arg: RpcArg) {
self.point
.send_rpc(
self.addr,
self.remote_channel,
arg,
0,
Vec::with_capacity(0),
)
.await;
}
pub async fn send_ret<RpcRet: OctData>(&self, ret: RpcRet) {
self.point.send_ret(self.addr, ret, self.arg_uid).await;
}
}
pub trait RpcHandler: Send + Sync {
fn call(&self, context: RpcPtcContext) -> BoxFuture<'static, ()>;
}
impl<T, F> RpcHandler for T
where
T: Fn(RpcPtcContext) -> F + Send + Sync,
F: Future<Output = ()> + 'static + Send + Sync,
{
fn call(&self, context: RpcPtcContext) -> BoxFuture<'static, ()> {
Box::pin(self(context))
}
}
pub struct RpcPtcPoint {
pub protocol_point: ProtocolPoint,
rpc_handlers: DashMap<u16, Box<dyn RpcHandler>>,
}
pub struct RpcRawRet(Box<[u8]>);
impl RpcRawRet {
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}
#[derive(thiserror::Error, Debug)]
pub enum RpcCallError {
#[error("remote protocol service didn't reply")]
NoResponse,
#[error("failed to decode RpcRet")]
Decode(#[from] std::io::Error),
}
impl RpcPtcPoint {
pub fn new(point: ProtocolPoint) -> Arc<Self> {
let point = Arc::new(Self {
protocol_point: point,
rpc_handlers: DashMap::new(),
});
let pt_clone = point.clone();
point.protocol_point.register_rpc_call(move |ctx| {
let point = pt_clone.clone();
tokio::spawn(async move {
point.handle_protocol_context(ctx).await;
});
});
point
}
async fn handle_protocol_context(self: &Arc<Self>, context: ProtocolContext) {
if context.body.len() < 6 {
debug!("received packet with too small body");
return;
}
let mut r = Cursor::new(&context.body);
let protocol_id = r.read_u16::<LE>().unwrap();
let arg_size = r.read_u32::<BE>().unwrap() as usize;
if arg_size + 6 > context.body.len() {
debug!(
"arg_size out of bounds! size: {}, body_len: {}",
arg_size + 6,
context.body.len()
);
return;
}
let mut arg = vec![0u8; arg_size];
r.read_exact(&mut arg).unwrap();
let Ok(middleware_list) = unmarshal_middleware_list(&mut ProtocolStream::new(&mut r))
else {
debug!("failed to decode middleware list");
return;
};
if let Some(cb) = self.rpc_handlers.get(&protocol_id) {
cb.call(RpcPtcContext {
point: self.clone(),
protocol_id,
arg: arg.into_boxed_slice(),
arg_uid: context.rpc_arg_uid,
addr: context.session.remote_addr,
middleware_list,
remote_channel: context.remote_channel,
})
.await;
} else {
warn!("RpcPtc: no handler registered for {protocol_id}");
}
}
pub async fn send_rpc<RpcArg>(
&self,
addr: SocketAddr,
to_channel: u16,
rpc_arg: RpcArg,
arg_uid: u64,
middleware_list: Vec<MiddlewareModel>,
) where
RpcArg: OctData + ProtocolID,
{
let mut buf = Vec::new();
let mut cursor = Cursor::new(&mut buf);
let protocol_id = rpc_arg.get_protocol_id();
cursor.write_u16::<LE>(protocol_id).unwrap();
let mut arg_buf = Vec::new();
rpc_arg
.marshal_to(&mut Cursor::new(&mut arg_buf), 0)
.unwrap();
cursor.write_u32::<BE>(arg_buf.len() as u32).unwrap();
cursor.write_all(&arg_buf).unwrap();
marshal_middleware_list(&mut ProtocolStream::new(&mut cursor), &middleware_list).unwrap();
self.protocol_point
.send_rpc(addr, buf.into_boxed_slice(), to_channel, arg_uid)
.await;
if arg_uid == 0 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
pub async fn send_ret<RpcRet>(&self, addr: SocketAddr, rpc_ret: RpcRet, arg_uid: u64)
where
RpcRet: OctData,
{
let mut buf = Vec::new();
let mut cursor = Cursor::new(&mut buf);
rpc_ret.marshal_to(&mut cursor, 0).unwrap();
self.protocol_point
.send_rpc(addr, buf.into_boxed_slice(), 0, arg_uid)
.await;
}
pub async fn call_rpc<RpcArg, RpcRet>(
&self,
addr: SocketAddr,
rpc_arg: RpcArg,
middleware_list: Vec<MiddlewareModel>,
timeout: Duration,
) -> Result<RpcRet, RpcCallError>
where
RpcArg: OctData + ProtocolID,
RpcRet: OctData,
{
let raw_ret = self
.call_rpc_raw(addr, rpc_arg, middleware_list, timeout)
.await
.ok_or(RpcCallError::NoResponse)?;
let mut r = Cursor::new(raw_ret.as_bytes());
let ret = RpcRet::unmarshal_from(&mut r, 0)?;
Ok(ret)
}
pub async fn call_rpc_raw<RpcArg>(
&self,
addr: SocketAddr,
rpc_arg: RpcArg,
middleware_list: Vec<MiddlewareModel>,
timeout: Duration,
) -> Option<RpcRawRet>
where
RpcArg: OctData + ProtocolID,
{
let mut buf = Vec::new();
let mut cursor = Cursor::new(&mut buf);
let protocol_id = rpc_arg.get_protocol_id();
cursor.write_u16::<LE>(protocol_id).unwrap();
let mut arg_buf = Vec::new();
rpc_arg
.marshal_to(&mut Cursor::new(&mut arg_buf), 0)
.unwrap();
cursor.write_u32::<BE>(arg_buf.len() as u32).unwrap();
cursor.write_all(&arg_buf).unwrap();
marshal_middleware_list(&mut ProtocolStream::new(&mut cursor), &middleware_list).unwrap();
let ret_buf = self.protocol_point.call_rpc(addr, &buf, timeout).await?;
Some(RpcRawRet(ret_buf))
}
pub fn register_rpc_recv<T: RpcHandler + 'static>(&self, protocol_id: u16, cb: T) {
self.rpc_handlers.insert(protocol_id, Box::new(cb));
}
}

View file

@ -1,28 +0,0 @@
use std::{net::SocketAddr, sync::Arc};
use crate::ProtocolServiceFrontend;
use super::rpc_ptc_point::RpcPtcPoint;
pub struct RpcPtcServiceFrontend {
pub protocol_service_frontend: ProtocolServiceFrontend,
}
impl RpcPtcServiceFrontend {
pub fn new(protocol_service: ProtocolServiceFrontend) -> Self {
Self {
protocol_service_frontend: protocol_service,
}
}
pub async fn create_point(
&self,
local_addr: Option<SocketAddr>,
) -> Result<Arc<RpcPtcPoint>, std::io::Error> {
let protocol_point = self
.protocol_service_frontend
.create_point(local_addr)
.await?;
Ok(RpcPtcPoint::new(protocol_point))
}
}