Compare commits

..

5 commits
s2n ... kity

Author SHA1 Message Date
bf24b4ecaa
fucking scanners 2025-05-25 15:46:53 +09:00
998b210463
fuck 2025-02-24 18:49:17 +09:00
65505fa6e8
shoot 2025-02-24 18:48:57 +09:00
236f838662
bind dual-stack-ly 2025-02-24 17:43:27 +09:00
e621cca1c2
update code to be better 2025-02-24 17:42:55 +09:00
9 changed files with 8928 additions and 1175 deletions

1693
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,16 +7,16 @@ license = "MIT OR Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
anyhow = { version = "1.0.71", features = ["backtrace"] }
async-trait = "0.1.68"
axum = "0.6.18" axum = "0.6.18"
env_logger = "0.10.0" env_logger = "0.10.0"
idna = "0.4.0" eyre = "0.6.12"
governor = "0.10.0"
idna = "1.0.3"
log = "0.4.19" log = "0.4.19"
parking_lot = "0.12.1" parking_lot = "0.12.3"
rand = "0.8.5" quinn = "0.11.6"
rustls-pemfile = "1.0.2" rand = "0.9.0"
s2n-quic = { version = "1.46.0", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] } rustls-pemfile = "2"
serde = { version = "1.0.164", features = ["derive"] } serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1.0.97" serde_json = "1.0.97"
thiserror = "1.0.40" thiserror = "1.0.40"

View file

@ -88,13 +88,13 @@
mcAddr = mkOption { mcAddr = mkOption {
type = types.str; type = types.str;
default = "0.0.0.0:25565"; default = "[::]:25565";
description = lib.mdDoc "The socket address to listen to Minecraft connections."; description = lib.mdDoc "The socket address to listen to Minecraft connections.";
}; };
relayAddr = mkOption { relayAddr = mkOption {
type = types.str; type = types.str;
default = "0.0.0.0:25575"; default = "[::]:25575";
description = lib.mdDoc "The socket address to listen to quiclime connections."; description = lib.mdDoc "The socket address to listen to quiclime connections.";
}; };

View file

@ -0,0 +1,3 @@
{
"text": "You are trying to connect too fast. Please wait a minute before retrying."
}

View file

@ -2,26 +2,29 @@
#![allow(clippy::cast_possible_truncation)] #![allow(clippy::cast_possible_truncation)]
#![allow(clippy::cast_possible_wrap)] #![allow(clippy::cast_possible_wrap)]
use std::{convert::Infallible, io::ErrorKind, net::SocketAddr, sync::Arc, time::Duration}; use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
use anyhow::{anyhow, Context};
use axum::{ use axum::{
http::StatusCode, http::StatusCode,
routing::{get, post}, routing::{get, post},
}; };
use log::{error, info}; use eyre::{eyre, Context};
use log::{error, info, warn};
use netty::{Handshake, ReadError}; use netty::{Handshake, ReadError};
use routing::RoutingTable; use quinn::{
use s2n_quic::{connection::Error as ConnectionError, Connection, Server}; crypto::rustls::QuicServerConfig,
rustls::pki_types::{CertificateDer, PrivateKeyDer},
ConnectionError, Endpoint, Incoming, ServerConfig, TransportConfig,
};
use routing::{RoutingError, RoutingTable};
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream, net::TcpStream,
task::{JoinError, JoinSet},
}; };
use crate::{ use crate::{
netty::{ReadExt, WriteExt}, netty::{ReadExt, WriteExt},
proto::{ClientboundControlMessage, ServerboundControlMessage}, routing::RouterRequest, proto::{ClientboundControlMessage, ServerboundControlMessage},
}; };
mod netty; mod netty;
@ -30,50 +33,28 @@ mod routing;
mod unicode_madness; mod unicode_madness;
mod wordlist; mod wordlist;
fn any_private_keys(rd: &mut dyn std::io::BufRead) -> Result<Vec<Vec<u8>>, std::io::Error> { fn get_certs() -> eyre::Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)> {
let mut keys = Vec::<Vec<u8>>::new();
loop {
match rustls_pemfile::read_one(rd)? {
None => return Ok(keys),
Some(
rustls_pemfile::Item::RSAKey(key)
| rustls_pemfile::Item::PKCS8Key(key)
| rustls_pemfile::Item::ECKey(key),
) => keys.push(key),
_ => {}
};
}
}
fn get_certs() -> anyhow::Result<(Vec<Certificate>, PrivateKey)> {
let mut cert_file = std::io::BufReader::new(std::fs::File::open( let mut cert_file = std::io::BufReader::new(std::fs::File::open(
std::env::var("QUICLIME_CERT_PATH").context("Reading QUICLIME_CERT_PATH")?, std::env::var("QUICLIME_CERT_PATH").context("Reading QUICLIME_CERT_PATH")?,
)?); )?);
let certs = rustls_pemfile::certs(&mut cert_file)? let certs = rustls_pemfile::certs(&mut cert_file)
.into_iter() .filter_map(Result::ok)
.map(Certificate)
.collect(); .collect();
let mut key_file = std::io::BufReader::new(std::fs::File::open( let mut key_file = std::io::BufReader::new(std::fs::File::open(
std::env::var("QUICLIME_KEY_PATH").context("Reading QUICLIME_KEY_PATH")?, std::env::var("QUICLIME_KEY_PATH").context("Reading QUICLIME_KEY_PATH")?,
)?); )?);
let key = PrivateKey( let key = rustls_pemfile::private_key(&mut key_file)?.ok_or(eyre!("No private key?"))?;
any_private_keys(&mut key_file)?
.into_iter()
.next()
.ok_or(anyhow::anyhow!("No private key?"))?,
);
Ok((certs, key)) Ok((certs, key))
} }
async fn create_server_config() -> anyhow::Result<ServerConfig> { async fn create_server_config() -> eyre::Result<ServerConfig> {
let (cert_chain, key_der) = tokio::task::spawn_blocking(get_certs).await??; let (cert_chain, key_der) = tokio::task::spawn_blocking(get_certs).await??;
let mut rustls_config = rustls::ServerConfig::builder() let mut rustls_config = quinn::rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth() .with_no_client_auth()
.with_single_cert(cert_chain, key_der)?; .with_single_cert(cert_chain, key_der)?;
rustls_config.alpn_protocols = vec![b"quiclime".to_vec()]; rustls_config.alpn_protocols = vec![b"quiclime".to_vec()];
let mut config = ServerConfig::with_crypto(Arc::new(rustls_config)); let quic_rustls_config = QuicServerConfig::try_from(rustls_config)?;
let mut config = ServerConfig::with_crypto(Arc::new(quic_rustls_config));
let mut transport = TransportConfig::default(); let mut transport = TransportConfig::default();
transport transport
.max_concurrent_bidi_streams(1u32.into()) .max_concurrent_bidi_streams(1u32.into())
@ -84,7 +65,7 @@ async fn create_server_config() -> anyhow::Result<ServerConfig> {
} }
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> eyre::Result<()> {
env_logger::init(); env_logger::init();
// JUSTIFICATION: this lives until the end of the entire program // JUSTIFICATION: this lives until the end of the entire program
let endpoint = Box::leak(Box::new(Endpoint::server( let endpoint = Box::leak(Box::new(Endpoint::server(
@ -106,136 +87,98 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn try_handle_quic( async fn try_handle_quic(connection: Incoming, routing_table: &RoutingTable) -> eyre::Result<()> {
connection: Connection, let connection = connection.await?;
routing_table: &RoutingTable,
) -> anyhow::Result<()> {
info!( info!(
"QUIClime connection established to: {}", "QUIClime connection established to: {}",
connection.remote_addr()? connection.remote_address()
); );
let mut control = connection let (mut send_control, mut recv_control) = connection.accept_bi().await?;
.accept_bidirectional_stream() info!("Control channel open: {}", connection.remote_address());
.await?
.ok_or(anyhow!(
"Connection closed while waiting for control channel"
))?;
info!("Control channel open: {}", connection.remote_addr()?);
let mut handle = loop { let mut handle = loop {
let mut buf = vec![0u8; control.read_u8().await? as _]; let mut buf = vec![0u8; recv_control.read_u8().await? as _];
control.read_exact(&mut buf).await?; recv_control.read_exact(&mut buf).await?;
if let Ok(parsed) = serde_json::from_slice(&buf) { if let Ok(parsed) = serde_json::from_slice(&buf) {
match parsed { match parsed {
ServerboundControlMessage::RequestDomainAssignment => { ServerboundControlMessage::RequestDomainAssignment => {
let handle = routing_table.register(); let handle = routing_table.register();
info!( info!(
"Domain assigned to {}: {}", "Domain assigned to {}: {}",
connection.remote_addr()?, connection.remote_address(),
handle.domain() handle.domain()
); );
let response = let response =
serde_json::to_vec(&ClientboundControlMessage::DomainAssignmentComplete { serde_json::to_vec(&ClientboundControlMessage::DomainAssignmentComplete {
domain: handle.domain().to_string(), domain: handle.domain().to_string(),
})?; })?;
control.write_all(&[response.len() as u8]).await?; send_control.write_all(&[response.len() as u8]).await?;
control.write_all(&response).await?; send_control.write_all(&response).await?;
break handle; break handle;
} }
} }
} }
let response = serde_json::to_vec(&ClientboundControlMessage::UnknownMessage)?; let response = serde_json::to_vec(&ClientboundControlMessage::UnknownMessage)?;
control.write_all(&[response.len() as u8]).await?; send_control.write_all(&[response.len() as u8]).await?;
control.write_all(&response).await?; send_control.write_all(&response).await?;
}; };
let mut set = JoinSet::new();
let (control_message_queue, mut control_message_queue_recv) = tokio::sync::mpsc::unbounded_channel();
let (mut control_recv, mut control_send) = control.split();
let send_task = tokio::spawn(async move {
while let Some(event) = control_message_queue_recv.recv().await {
let response = serde_json::to_vec(&event)?;
control_send.write_all(&[response.len() as u8]).await?;
control_send.write_all(&response).await?;
}
Ok::<_, tokio::io::Error>(())
});
let control_message_queue_ref = &control_message_queue;
set.spawn(async move {
loop {
let mut buf = vec![0u8; control_recv.read_u8().await? as _];
control_recv.read_exact(&mut buf).await?;
control_message_queue_ref.send(ClientboundControlMessage::UnknownMessage);
}
Ok::<_, tokio::io::Error>(())
});
enum Event {
RouterEvent(RouterRequest),
TaskSet(Result<Result<(), tokio::io::Error>, JoinError>)
}
while let Some(remote) = tokio::select! {
v = handle.next() => v.map(Event::RouterEvent),
v = set.join_next() => v.map(Event::TaskSet),
} {
match remote {
Event::RouterEvent(RouterRequest::RouteRequest((handshake, mut client_stream))) => {
let stream = connection.open_bidirectional_stream().await;
set.spawn(async move {
if let Err(
ConnectionError::Transport { .. }
| ConnectionError::Application { .. }
| ConnectionError::EndpointClosing { .. }
| ConnectionError::ImmediateClose { .. },
) = stream
{
Ok(())
} else {
let mut stream = stream?;
handshake.send(&mut stream).await?;
tokio::io::copy_bidirectional(&mut stream, &mut client_stream).await?;
Ok::<_, tokio::io::Error>(())
}
});
}
Event::RouterEvent(RouterRequest::BroadcastRequest(message)) => {
control_message_queue.send(ClientboundControlMessage::RequestMessageBroadcast {
message,
});
}
Event::TaskSet(Ok(Ok(()))) => {}
Event::TaskSet(Ok(Err(e))) => {
if e.kind() != ErrorKind::UnexpectedEof {
error!("Error in task: {e:?}")
}
}
Event::TaskSet(Err(e)) => {
error!("Error in task: {e:?}")
}
tokio::select! {
e = connection.closed() => {
match e {
ConnectionError::ConnectionClosed(_)
| ConnectionError::ApplicationClosed(_)
| ConnectionError::LocallyClosed => Ok(()),
e => Err(e.into()),
}
},
r = async {
while let Some(remote) = handle.next().await {
match remote {
routing::RouterRequest::RouteRequest(remote) => {
let pair = connection.open_bi().await;
if let Err(ConnectionError::ApplicationClosed(_)) = pair {
break;
} else if let Err(ConnectionError::ConnectionClosed(_)) = pair {
break;
}
remote.send(pair?).map_err(|e| eyre!("{:?}", e))?;
}
routing::RouterRequest::BroadcastRequest(message) => {
let response =
serde_json::to_vec(&ClientboundControlMessage::RequestMessageBroadcast {
message,
})?;
send_control.write_all(&[response.len() as u8]).await?;
send_control.write_all(&response).await?;
}
} }
} }
send_task.abort();
Ok(()) Ok(())
} => r
}
} }
async fn handle_quic(connection: Connection, routing_table: &RoutingTable) { async fn handle_quic(connection: Incoming, routing_table: &RoutingTable) {
if let Err(e) = try_handle_quic(connection, routing_table).await { if let Err(e) = try_handle_quic(connection, routing_table).await {
error!("Error handling QUIClime connection: {}", e); error!("Error handling QUIClime connection: {:#}", e);
}; };
info!("Finished handling QUIClime connection"); info!("Finished handling QUIClime connection");
} }
async fn listen_quic( async fn listen_quic(
mut endpoint: Server, endpoint: &'static Endpoint,
routing_table: &'static RoutingTable, routing_table: &'static RoutingTable,
) -> anyhow::Result<Infallible> { ) -> eyre::Result<Infallible> {
while let Some(connection) = endpoint.accept().await { while let Some(connection) = endpoint.accept().await {
tokio::spawn(handle_quic(connection, routing_table)); tokio::spawn(handle_quic(connection, routing_table));
} }
Err(anyhow!("quiclime endpoint closed")) Err(eyre!("quiclime endpoint closed"))
} }
async fn listen_control( async fn listen_control(
endpoint: &'static Endpoint,
routing_table: &'static RoutingTable, routing_table: &'static RoutingTable,
) -> anyhow::Result<Infallible> { ) -> eyre::Result<Infallible> {
let app = axum::Router::new() let app = axum::Router::new()
.route( .route(
"/metrics", "/metrics",
@ -270,16 +213,16 @@ async fn listen_control(
) )
.serve(app.into_make_service()) .serve(app.into_make_service())
.await?; .await?;
Err(anyhow!("control endpoint closed")) Err(eyre!("control endpoint closed"))
} }
async fn try_handle_minecraft( async fn try_handle_minecraft(
mut connection: TcpStream, mut connection: TcpStream,
routing_table: &'static RoutingTable, routing_table: &'static RoutingTable,
) -> anyhow::Result<()> { ) -> eyre::Result<()> {
let peer = connection.peer_addr()?; let peer = connection.peer_addr()?;
info!("Minecraft client connected from: {}", peer); info!("Minecraft client connected from: {}", peer);
let handshake = netty::read_packet(&mut connection).await; let handshake = netty::read_packet(&mut connection, 512).await;
if let Err(ReadError::LegacyServerListPing) = handshake { if let Err(ReadError::LegacyServerListPing) = handshake {
connection connection
.write_all(include_bytes!("legacy_serverlistping_response.bin")) .write_all(include_bytes!("legacy_serverlistping_response.bin"))
@ -290,8 +233,16 @@ async fn try_handle_minecraft(
let Some(address) = handshake.normalized_address() else { let Some(address) = handshake.normalized_address() else {
return politely_disconnect(connection, handshake).await; return politely_disconnect(connection, handshake).await;
}; };
let Some((mut send_host, mut recv_host)) = routing_table.route(&address).await else { let (mut send_host, mut recv_host) =
match routing_table.route_limited(&address, peer.ip()).await {
Ok(val) => val,
Err(RoutingError::InvalidDomain) => {
return politely_disconnect(connection, handshake).await; return politely_disconnect(connection, handshake).await;
}
Err(RoutingError::RateLimited) => {
warn!("Connection from {} has been rate limited!", peer);
return impolitely_disconnect(connection, handshake).await;
}
}; };
handshake.send(&mut send_host).await?; handshake.send(&mut send_host).await?;
let (mut recv_client, mut send_client) = connection.split(); let (mut recv_client, mut send_client) = connection.split();
@ -300,23 +251,21 @@ async fn try_handle_minecraft(
_ = tokio::io::copy(&mut recv_host, &mut send_client) => () _ = tokio::io::copy(&mut recv_host, &mut send_client) => ()
} }
_ = connection.shutdown().await; _ = connection.shutdown().await;
_ = send_host.finish().await; _ = send_host.finish();
_ = recv_host.stop(0u32.into()); _ = recv_host.stop(0u32.into());
_ = send_host.stopped().await;
info!("Minecraft client disconnected from: {}", peer); info!("Minecraft client disconnected from: {}", peer);
Ok(()) Ok(())
} }
async fn politely_disconnect( async fn politely_disconnect(mut connection: TcpStream, handshake: Handshake) -> eyre::Result<()> {
mut connection: TcpStream,
handshake: Handshake,
) -> anyhow::Result<()> {
match handshake.next_state { match handshake.next_state {
netty::HandshakeType::Status => { netty::HandshakeType::Status => {
let packet = netty::read_packet(&mut connection).await?; let packet = netty::read_packet(&mut connection, 1).await?;
let mut packet = packet.as_slice(); let mut packet = packet.as_slice();
let id = packet.read_varint()?; let id = packet.read_varint()?;
if id != 0 { if id != 0 {
return Err(anyhow!( return Err(eyre!(
"Packet isn't a Status Request(0x00), but {:#04x}", "Packet isn't a Status Request(0x00), but {:#04x}",
id id
)); ));
@ -327,14 +276,11 @@ async fn politely_disconnect(
.await?; .await?;
connection.write_varint(buf.len() as i32).await?; connection.write_varint(buf.len() as i32).await?;
connection.write_all(&buf).await?; connection.write_all(&buf).await?;
let packet = netty::read_packet(&mut connection).await?; let packet = netty::read_packet(&mut connection, 9).await?;
let mut packet = packet.as_slice(); let mut packet = packet.as_slice();
let id = packet.read_varint()?; let id = packet.read_varint()?;
if id != 1 { if id != 1 {
return Err(anyhow!( return Err(eyre!("Packet isn't a Ping Request(0x01), but {:#04x}", id));
"Packet isn't a Ping Request(0x01), but {:#04x}",
id
));
} }
let payload = packet.read_long()?; let payload = packet.read_long()?;
let mut buf = Vec::with_capacity(1 + 8); let mut buf = Vec::with_capacity(1 + 8);
@ -344,7 +290,7 @@ async fn politely_disconnect(
connection.write_all(&buf).await?; connection.write_all(&buf).await?;
} }
netty::HandshakeType::Login => { netty::HandshakeType::Login => {
let _ = netty::read_packet(&mut connection).await?; let _ = netty::read_packet(&mut connection, 128).await?;
let mut buf = vec![]; let mut buf = vec![];
buf.write_varint(0).await?; buf.write_varint(0).await?;
buf.write_string(include_str!("./disconnect_response.json")) buf.write_string(include_str!("./disconnect_response.json"))
@ -356,13 +302,60 @@ async fn politely_disconnect(
Ok(()) Ok(())
} }
async fn impolitely_disconnect(
mut connection: TcpStream,
handshake: Handshake,
) -> eyre::Result<()> {
match handshake.next_state {
netty::HandshakeType::Status => {
let packet = netty::read_packet(&mut connection, 1).await?;
let mut packet = packet.as_slice();
let id = packet.read_varint()?;
if id != 0 {
return Err(eyre!(
"Packet isn't a Status Request(0x00), but {:#04x}",
id
));
}
let mut buf = vec![];
buf.write_varint(0).await?;
buf.write_string(include_str!("./serverlistping_response_rate.json"))
.await?;
connection.write_varint(buf.len() as i32).await?;
connection.write_all(&buf).await?;
let packet = netty::read_packet(&mut connection, 9).await?;
let mut packet = packet.as_slice();
let id = packet.read_varint()?;
if id != 1 {
return Err(eyre!("Packet isn't a Ping Request(0x01), but {:#04x}", id));
}
let payload = packet.read_long()?;
let mut buf = Vec::with_capacity(1 + 8);
buf.write_varint(1).await?;
buf.write_u64(payload).await?;
connection.write_varint(buf.len() as i32).await?;
connection.write_all(&buf).await?;
}
netty::HandshakeType::Login => {
let _ = netty::read_packet(&mut connection, 128).await?;
let mut buf = vec![];
buf.write_varint(0).await?;
buf.write_string(include_str!("./disconnect_response_rate.json"))
.await?;
connection.write_varint(buf.len() as i32).await?;
connection.write_all(&buf).await?;
}
}
Ok(())
}
async fn handle_minecraft(connection: TcpStream, routing_table: &'static RoutingTable) { async fn handle_minecraft(connection: TcpStream, routing_table: &'static RoutingTable) {
if let Err(e) = try_handle_minecraft(connection, routing_table).await { if let Err(e) = try_handle_minecraft(connection, routing_table).await {
error!("Error handling Minecraft connection: {}", e.backtrace()); error!("Error handling Minecraft connection: {:#}", e);
}; };
} }
async fn listen_minecraft(routing_table: &'static RoutingTable) -> anyhow::Result<Infallible> { async fn listen_minecraft(routing_table: &'static RoutingTable) -> eyre::Result<Infallible> {
let server = tokio::net::TcpListener::bind( let server = tokio::net::TcpListener::bind(
std::env::var("QUICLIME_BIND_ADDR_MC") std::env::var("QUICLIME_BIND_ADDR_MC")
.context("Reading QUICLIME_BIND_ADDR_MC")? .context("Reading QUICLIME_BIND_ADDR_MC")?
@ -375,7 +368,7 @@ async fn listen_minecraft(routing_table: &'static RoutingTable) -> anyhow::Resul
tokio::spawn(handle_minecraft(connection, routing_table)); tokio::spawn(handle_minecraft(connection, routing_table));
} }
Err(e) => { Err(e) => {
error!("Error accepting minecraft connection: {}", e); error!("Error accepting minecraft connection: {:#}", e);
} }
} }
} }

View file

@ -4,7 +4,6 @@ use std::io::Read;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use async_trait::async_trait;
use log::error; use log::error;
use thiserror::Error; use thiserror::Error;
@ -14,6 +13,8 @@ pub enum ReadError {
IoError(std::io::Error), IoError(std::io::Error),
#[error("Was not a netty packet, but a Legacy ServerListPing")] #[error("Was not a netty packet, but a Legacy ServerListPing")]
LegacyServerListPing, LegacyServerListPing,
#[error("Packet was too large")]
PacketTooLarge,
} }
impl From<std::io::Error> for ReadError { impl From<std::io::Error> for ReadError {
@ -86,8 +87,25 @@ pub trait ReadExt: Read {
// } // }
} }
pub async fn read_packet(mut reader: impl AsyncReadExt + Unpin) -> Result<Vec<u8>, ReadError> { pub async fn read_packet(
mut reader: impl AsyncReadExt + Unpin,
max_size: usize,
) -> Result<Vec<u8>, ReadError> {
let len = read_varint(&mut reader).await?; let len = read_varint(&mut reader).await?;
if len < 0 || (len as usize) > max_size {
return Err(if len == 254 {
let mut temp = [0u8];
reader.read_exact(&mut temp).await?;
if temp[0] == 0xFA {
// FE 01 FA: Legacy ServerListPing
ReadError::LegacyServerListPing
} else {
ReadError::PacketTooLarge
}
} else {
ReadError::PacketTooLarge
});
}
let mut buf = vec![0u8; len as usize]; let mut buf = vec![0u8; len as usize];
if len == 254 { if len == 254 {
let mut temp = [0u8]; let mut temp = [0u8];
@ -119,7 +137,6 @@ async fn read_varint(mut reader: impl AsyncReadExt + Unpin) -> Result<i32, ReadE
impl<T: Read> ReadExt for T {} impl<T: Read> ReadExt for T {}
#[async_trait]
pub trait WriteExt: AsyncWriteExt + Unpin { pub trait WriteExt: AsyncWriteExt + Unpin {
async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> { async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> {
for _ in 0..5 { for _ in 0..5 {
@ -156,10 +173,10 @@ pub enum HandshakeType {
} }
impl Handshake { impl Handshake {
pub fn new(mut packet: &[u8]) -> anyhow::Result<Self> { pub fn new(mut packet: &[u8]) -> eyre::Result<Self> {
let packet_type = packet.read_varint()?; let packet_type = packet.read_varint()?;
if packet_type != 0 { if packet_type != 0 {
Err(anyhow::anyhow!("Not a Handshake packet")) Err(eyre::eyre!("Not a Handshake packet"))
} else { } else {
let protocol_version = packet.read_varint()?; let protocol_version = packet.read_varint()?;
let server_address = packet.read_string()?; let server_address = packet.read_string()?;
@ -167,7 +184,7 @@ impl Handshake {
let next_state = match packet.read_varint()? { let next_state = match packet.read_varint()? {
1 => HandshakeType::Status, 1 => HandshakeType::Status,
2 => HandshakeType::Login, 2 => HandshakeType::Login,
_ => return Err(anyhow::anyhow!("Invalid next state")), _ => return Err(eyre::eyre!("Invalid next state")),
}; };
Ok(Self { Ok(Self {
protocol_version, protocol_version,

View file

@ -1,28 +1,35 @@
use governor::DefaultKeyedRateLimiter;
use governor::Quota;
use log::info; use log::info;
use log::warn; use log::warn;
use parking_lot::RwLock; use parking_lot::RwLock;
use quinn::RecvStream;
use quinn::SendStream;
use rand::prelude::*; use rand::prelude::*;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::net::TcpStream; use std::net::IpAddr;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use crate::netty::Handshake;
#[derive(Debug)] #[derive(Debug)]
pub enum RouterRequest { pub enum RouterRequest {
RouteRequest(RouterConnection), RouteRequest(RouterCallback),
BroadcastRequest(String), BroadcastRequest(String),
} }
type RouterConnection = (Handshake, TcpStream); type RouterCallback = oneshot::Sender<(SendStream, RecvStream)>;
type RouteRequestReceiver = mpsc::UnboundedSender<RouterRequest>; type RouteRequestReceiver = mpsc::UnboundedSender<RouterRequest>;
#[allow(clippy::module_name_repetitions)] #[allow(clippy::module_name_repetitions)]
#[derive(Default)]
pub struct RoutingTable { pub struct RoutingTable {
table: RwLock<HashMap<String, RouteRequestReceiver>>, table: RwLock<HashMap<String, RouteRequestReceiver>>,
base_domain: String, base_domain: String,
limiter: DefaultKeyedRateLimiter<IpAddr>,
}
pub enum RoutingError {
InvalidDomain,
RateLimited,
} }
impl RoutingTable { impl RoutingTable {
@ -30,6 +37,7 @@ impl RoutingTable {
RoutingTable { RoutingTable {
table: RwLock::default(), table: RwLock::default(),
base_domain, base_domain,
limiter: DefaultKeyedRateLimiter::dashmap(Quota::per_minute(30.try_into().unwrap())),
} }
} }
@ -45,41 +53,44 @@ impl RoutingTable {
} }
} }
pub fn route(&self, domain: &str, conn: RouterConnection) -> Option<()> { pub async fn route_limited(
&self,
domain: &str,
ip: IpAddr,
) -> Result<(SendStream, RecvStream), RoutingError> {
if self.limiter.check_key(&ip).is_err() {
return Err(RoutingError::RateLimited);
}
self.limiter.retain_recent();
let (send, recv) = oneshot::channel();
self.table self.table
.read() .read()
.get(domain)? .get(domain)
.send(RouterRequest::RouteRequest(conn)) .ok_or(RoutingError::InvalidDomain)?
.send(RouterRequest::RouteRequest(send))
.ok() .ok()
.ok_or(RoutingError::InvalidDomain)?;
recv.await.ok().ok_or(RoutingError::InvalidDomain)
}
fn random_domain(&self) -> String {
format!(
"{}-{}.{}",
crate::wordlist::ID_WORDS.choose(&mut rand::rng()).unwrap(),
crate::wordlist::ID_WORDS.choose(&mut rand::rng()).unwrap(),
self.base_domain
)
} }
pub fn register(&self) -> RoutingHandle { pub fn register(&self) -> RoutingHandle {
let mut lock = self.table.write(); let mut lock = self.table.write();
let mut domain = format!( let mut domain = self.random_domain();
"{}-{}.{}",
crate::wordlist::ID_WORDS
.choose(&mut rand::thread_rng())
.unwrap(),
crate::wordlist::ID_WORDS
.choose(&mut rand::thread_rng())
.unwrap(),
self.base_domain
);
while lock.contains_key(&domain) { while lock.contains_key(&domain) {
warn!( warn!(
"Randomly selected domain {} conflicts; trying again", "Randomly selected domain {} conflicts; trying again",
domain domain
); );
domain = format!( domain = self.random_domain();
"{}-{}.{}",
crate::wordlist::ID_WORDS
.choose(&mut rand::thread_rng())
.unwrap(),
crate::wordlist::ID_WORDS
.choose(&mut rand::thread_rng())
.unwrap(),
self.base_domain
);
} }
domain = crate::unicode_madness::validate_and_normalize_domain(&domain) domain = crate::unicode_madness::validate_and_normalize_domain(&domain)
.expect("Resulting domain is not valid"); .expect("Resulting domain is not valid");

View file

@ -0,0 +1,13 @@
{
"version": {
"name": "e4mc",
"protocol": -1
},
"players": {
"max": 0,
"online": 0
},
"description": {
"text": "You are trying to connect too fast. Please wait a minute before retrying."
}
}

File diff suppressed because it is too large Load diff