Compare commits
5 commits
Author | SHA1 | Date | |
---|---|---|---|
bf24b4ecaa | |||
998b210463 | |||
65505fa6e8 | |||
236f838662 | |||
e621cca1c2 |
9 changed files with 8928 additions and 1175 deletions
1693
Cargo.lock
generated
1693
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
14
Cargo.toml
14
Cargo.toml
|
@ -7,16 +7,16 @@ license = "MIT OR Apache-2.0"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = { version = "1.0.71", features = ["backtrace"] }
|
|
||||||
async-trait = "0.1.68"
|
|
||||||
axum = "0.6.18"
|
axum = "0.6.18"
|
||||||
env_logger = "0.10.0"
|
env_logger = "0.10.0"
|
||||||
idna = "0.4.0"
|
eyre = "0.6.12"
|
||||||
|
governor = "0.10.0"
|
||||||
|
idna = "1.0.3"
|
||||||
log = "0.4.19"
|
log = "0.4.19"
|
||||||
parking_lot = "0.12.1"
|
parking_lot = "0.12.3"
|
||||||
rand = "0.8.5"
|
quinn = "0.11.6"
|
||||||
rustls-pemfile = "1.0.2"
|
rand = "0.9.0"
|
||||||
s2n-quic = { version = "1.46.0", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] }
|
rustls-pemfile = "2"
|
||||||
serde = { version = "1.0.164", features = ["derive"] }
|
serde = { version = "1.0.164", features = ["derive"] }
|
||||||
serde_json = "1.0.97"
|
serde_json = "1.0.97"
|
||||||
thiserror = "1.0.40"
|
thiserror = "1.0.40"
|
||||||
|
|
|
@ -88,13 +88,13 @@
|
||||||
|
|
||||||
mcAddr = mkOption {
|
mcAddr = mkOption {
|
||||||
type = types.str;
|
type = types.str;
|
||||||
default = "0.0.0.0:25565";
|
default = "[::]:25565";
|
||||||
description = lib.mdDoc "The socket address to listen to Minecraft connections.";
|
description = lib.mdDoc "The socket address to listen to Minecraft connections.";
|
||||||
};
|
};
|
||||||
|
|
||||||
relayAddr = mkOption {
|
relayAddr = mkOption {
|
||||||
type = types.str;
|
type = types.str;
|
||||||
default = "0.0.0.0:25575";
|
default = "[::]:25575";
|
||||||
description = lib.mdDoc "The socket address to listen to quiclime connections.";
|
description = lib.mdDoc "The socket address to listen to quiclime connections.";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
3
src/disconnect_response_rate.json
Normal file
3
src/disconnect_response_rate.json
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
{
|
||||||
|
"text": "You are trying to connect too fast. Please wait a minute before retrying."
|
||||||
|
}
|
285
src/main.rs
285
src/main.rs
|
@ -2,26 +2,29 @@
|
||||||
#![allow(clippy::cast_possible_truncation)]
|
#![allow(clippy::cast_possible_truncation)]
|
||||||
#![allow(clippy::cast_possible_wrap)]
|
#![allow(clippy::cast_possible_wrap)]
|
||||||
|
|
||||||
use std::{convert::Infallible, io::ErrorKind, net::SocketAddr, sync::Arc, time::Duration};
|
use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use anyhow::{anyhow, Context};
|
|
||||||
use axum::{
|
use axum::{
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
};
|
};
|
||||||
use log::{error, info};
|
use eyre::{eyre, Context};
|
||||||
|
use log::{error, info, warn};
|
||||||
use netty::{Handshake, ReadError};
|
use netty::{Handshake, ReadError};
|
||||||
use routing::RoutingTable;
|
use quinn::{
|
||||||
use s2n_quic::{connection::Error as ConnectionError, Connection, Server};
|
crypto::rustls::QuicServerConfig,
|
||||||
|
rustls::pki_types::{CertificateDer, PrivateKeyDer},
|
||||||
|
ConnectionError, Endpoint, Incoming, ServerConfig, TransportConfig,
|
||||||
|
};
|
||||||
|
use routing::{RoutingError, RoutingTable};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
net::TcpStream,
|
net::TcpStream,
|
||||||
task::{JoinError, JoinSet},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
netty::{ReadExt, WriteExt},
|
netty::{ReadExt, WriteExt},
|
||||||
proto::{ClientboundControlMessage, ServerboundControlMessage}, routing::RouterRequest,
|
proto::{ClientboundControlMessage, ServerboundControlMessage},
|
||||||
};
|
};
|
||||||
|
|
||||||
mod netty;
|
mod netty;
|
||||||
|
@ -30,50 +33,28 @@ mod routing;
|
||||||
mod unicode_madness;
|
mod unicode_madness;
|
||||||
mod wordlist;
|
mod wordlist;
|
||||||
|
|
||||||
fn any_private_keys(rd: &mut dyn std::io::BufRead) -> Result<Vec<Vec<u8>>, std::io::Error> {
|
fn get_certs() -> eyre::Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)> {
|
||||||
let mut keys = Vec::<Vec<u8>>::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match rustls_pemfile::read_one(rd)? {
|
|
||||||
None => return Ok(keys),
|
|
||||||
Some(
|
|
||||||
rustls_pemfile::Item::RSAKey(key)
|
|
||||||
| rustls_pemfile::Item::PKCS8Key(key)
|
|
||||||
| rustls_pemfile::Item::ECKey(key),
|
|
||||||
) => keys.push(key),
|
|
||||||
_ => {}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_certs() -> anyhow::Result<(Vec<Certificate>, PrivateKey)> {
|
|
||||||
let mut cert_file = std::io::BufReader::new(std::fs::File::open(
|
let mut cert_file = std::io::BufReader::new(std::fs::File::open(
|
||||||
std::env::var("QUICLIME_CERT_PATH").context("Reading QUICLIME_CERT_PATH")?,
|
std::env::var("QUICLIME_CERT_PATH").context("Reading QUICLIME_CERT_PATH")?,
|
||||||
)?);
|
)?);
|
||||||
let certs = rustls_pemfile::certs(&mut cert_file)?
|
let certs = rustls_pemfile::certs(&mut cert_file)
|
||||||
.into_iter()
|
.filter_map(Result::ok)
|
||||||
.map(Certificate)
|
|
||||||
.collect();
|
.collect();
|
||||||
let mut key_file = std::io::BufReader::new(std::fs::File::open(
|
let mut key_file = std::io::BufReader::new(std::fs::File::open(
|
||||||
std::env::var("QUICLIME_KEY_PATH").context("Reading QUICLIME_KEY_PATH")?,
|
std::env::var("QUICLIME_KEY_PATH").context("Reading QUICLIME_KEY_PATH")?,
|
||||||
)?);
|
)?);
|
||||||
let key = PrivateKey(
|
let key = rustls_pemfile::private_key(&mut key_file)?.ok_or(eyre!("No private key?"))?;
|
||||||
any_private_keys(&mut key_file)?
|
|
||||||
.into_iter()
|
|
||||||
.next()
|
|
||||||
.ok_or(anyhow::anyhow!("No private key?"))?,
|
|
||||||
);
|
|
||||||
Ok((certs, key))
|
Ok((certs, key))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_server_config() -> anyhow::Result<ServerConfig> {
|
async fn create_server_config() -> eyre::Result<ServerConfig> {
|
||||||
let (cert_chain, key_der) = tokio::task::spawn_blocking(get_certs).await??;
|
let (cert_chain, key_der) = tokio::task::spawn_blocking(get_certs).await??;
|
||||||
let mut rustls_config = rustls::ServerConfig::builder()
|
let mut rustls_config = quinn::rustls::ServerConfig::builder()
|
||||||
.with_safe_defaults()
|
|
||||||
.with_no_client_auth()
|
.with_no_client_auth()
|
||||||
.with_single_cert(cert_chain, key_der)?;
|
.with_single_cert(cert_chain, key_der)?;
|
||||||
rustls_config.alpn_protocols = vec![b"quiclime".to_vec()];
|
rustls_config.alpn_protocols = vec![b"quiclime".to_vec()];
|
||||||
let mut config = ServerConfig::with_crypto(Arc::new(rustls_config));
|
let quic_rustls_config = QuicServerConfig::try_from(rustls_config)?;
|
||||||
|
let mut config = ServerConfig::with_crypto(Arc::new(quic_rustls_config));
|
||||||
let mut transport = TransportConfig::default();
|
let mut transport = TransportConfig::default();
|
||||||
transport
|
transport
|
||||||
.max_concurrent_bidi_streams(1u32.into())
|
.max_concurrent_bidi_streams(1u32.into())
|
||||||
|
@ -84,7 +65,7 @@ async fn create_server_config() -> anyhow::Result<ServerConfig> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> eyre::Result<()> {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
// JUSTIFICATION: this lives until the end of the entire program
|
// JUSTIFICATION: this lives until the end of the entire program
|
||||||
let endpoint = Box::leak(Box::new(Endpoint::server(
|
let endpoint = Box::leak(Box::new(Endpoint::server(
|
||||||
|
@ -106,136 +87,98 @@ async fn main() -> anyhow::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_handle_quic(
|
async fn try_handle_quic(connection: Incoming, routing_table: &RoutingTable) -> eyre::Result<()> {
|
||||||
connection: Connection,
|
let connection = connection.await?;
|
||||||
routing_table: &RoutingTable,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
info!(
|
info!(
|
||||||
"QUIClime connection established to: {}",
|
"QUIClime connection established to: {}",
|
||||||
connection.remote_addr()?
|
connection.remote_address()
|
||||||
);
|
);
|
||||||
let mut control = connection
|
let (mut send_control, mut recv_control) = connection.accept_bi().await?;
|
||||||
.accept_bidirectional_stream()
|
info!("Control channel open: {}", connection.remote_address());
|
||||||
.await?
|
|
||||||
.ok_or(anyhow!(
|
|
||||||
"Connection closed while waiting for control channel"
|
|
||||||
))?;
|
|
||||||
info!("Control channel open: {}", connection.remote_addr()?);
|
|
||||||
let mut handle = loop {
|
let mut handle = loop {
|
||||||
let mut buf = vec![0u8; control.read_u8().await? as _];
|
let mut buf = vec![0u8; recv_control.read_u8().await? as _];
|
||||||
control.read_exact(&mut buf).await?;
|
recv_control.read_exact(&mut buf).await?;
|
||||||
if let Ok(parsed) = serde_json::from_slice(&buf) {
|
if let Ok(parsed) = serde_json::from_slice(&buf) {
|
||||||
match parsed {
|
match parsed {
|
||||||
ServerboundControlMessage::RequestDomainAssignment => {
|
ServerboundControlMessage::RequestDomainAssignment => {
|
||||||
let handle = routing_table.register();
|
let handle = routing_table.register();
|
||||||
info!(
|
info!(
|
||||||
"Domain assigned to {}: {}",
|
"Domain assigned to {}: {}",
|
||||||
connection.remote_addr()?,
|
connection.remote_address(),
|
||||||
handle.domain()
|
handle.domain()
|
||||||
);
|
);
|
||||||
let response =
|
let response =
|
||||||
serde_json::to_vec(&ClientboundControlMessage::DomainAssignmentComplete {
|
serde_json::to_vec(&ClientboundControlMessage::DomainAssignmentComplete {
|
||||||
domain: handle.domain().to_string(),
|
domain: handle.domain().to_string(),
|
||||||
})?;
|
})?;
|
||||||
control.write_all(&[response.len() as u8]).await?;
|
send_control.write_all(&[response.len() as u8]).await?;
|
||||||
control.write_all(&response).await?;
|
send_control.write_all(&response).await?;
|
||||||
break handle;
|
break handle;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let response = serde_json::to_vec(&ClientboundControlMessage::UnknownMessage)?;
|
let response = serde_json::to_vec(&ClientboundControlMessage::UnknownMessage)?;
|
||||||
control.write_all(&[response.len() as u8]).await?;
|
send_control.write_all(&[response.len() as u8]).await?;
|
||||||
control.write_all(&response).await?;
|
send_control.write_all(&response).await?;
|
||||||
};
|
};
|
||||||
let mut set = JoinSet::new();
|
|
||||||
let (control_message_queue, mut control_message_queue_recv) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (mut control_recv, mut control_send) = control.split();
|
|
||||||
let send_task = tokio::spawn(async move {
|
|
||||||
while let Some(event) = control_message_queue_recv.recv().await {
|
|
||||||
let response = serde_json::to_vec(&event)?;
|
|
||||||
control_send.write_all(&[response.len() as u8]).await?;
|
|
||||||
control_send.write_all(&response).await?;
|
|
||||||
}
|
|
||||||
Ok::<_, tokio::io::Error>(())
|
|
||||||
});
|
|
||||||
let control_message_queue_ref = &control_message_queue;
|
|
||||||
set.spawn(async move {
|
|
||||||
loop {
|
|
||||||
let mut buf = vec![0u8; control_recv.read_u8().await? as _];
|
|
||||||
control_recv.read_exact(&mut buf).await?;
|
|
||||||
control_message_queue_ref.send(ClientboundControlMessage::UnknownMessage);
|
|
||||||
}
|
|
||||||
Ok::<_, tokio::io::Error>(())
|
|
||||||
});
|
|
||||||
enum Event {
|
|
||||||
RouterEvent(RouterRequest),
|
|
||||||
TaskSet(Result<Result<(), tokio::io::Error>, JoinError>)
|
|
||||||
}
|
|
||||||
while let Some(remote) = tokio::select! {
|
|
||||||
v = handle.next() => v.map(Event::RouterEvent),
|
|
||||||
v = set.join_next() => v.map(Event::TaskSet),
|
|
||||||
} {
|
|
||||||
match remote {
|
|
||||||
Event::RouterEvent(RouterRequest::RouteRequest((handshake, mut client_stream))) => {
|
|
||||||
let stream = connection.open_bidirectional_stream().await;
|
|
||||||
set.spawn(async move {
|
|
||||||
if let Err(
|
|
||||||
ConnectionError::Transport { .. }
|
|
||||||
| ConnectionError::Application { .. }
|
|
||||||
| ConnectionError::EndpointClosing { .. }
|
|
||||||
| ConnectionError::ImmediateClose { .. },
|
|
||||||
) = stream
|
|
||||||
{
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
let mut stream = stream?;
|
|
||||||
handshake.send(&mut stream).await?;
|
|
||||||
tokio::io::copy_bidirectional(&mut stream, &mut client_stream).await?;
|
|
||||||
Ok::<_, tokio::io::Error>(())
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Event::RouterEvent(RouterRequest::BroadcastRequest(message)) => {
|
|
||||||
control_message_queue.send(ClientboundControlMessage::RequestMessageBroadcast {
|
|
||||||
message,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Event::TaskSet(Ok(Ok(()))) => {}
|
|
||||||
Event::TaskSet(Ok(Err(e))) => {
|
|
||||||
if e.kind() != ErrorKind::UnexpectedEof {
|
|
||||||
error!("Error in task: {e:?}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Event::TaskSet(Err(e)) => {
|
|
||||||
error!("Error in task: {e:?}")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
e = connection.closed() => {
|
||||||
|
match e {
|
||||||
|
ConnectionError::ConnectionClosed(_)
|
||||||
|
| ConnectionError::ApplicationClosed(_)
|
||||||
|
| ConnectionError::LocallyClosed => Ok(()),
|
||||||
|
e => Err(e.into()),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
r = async {
|
||||||
|
while let Some(remote) = handle.next().await {
|
||||||
|
match remote {
|
||||||
|
routing::RouterRequest::RouteRequest(remote) => {
|
||||||
|
let pair = connection.open_bi().await;
|
||||||
|
if let Err(ConnectionError::ApplicationClosed(_)) = pair {
|
||||||
|
break;
|
||||||
|
} else if let Err(ConnectionError::ConnectionClosed(_)) = pair {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
remote.send(pair?).map_err(|e| eyre!("{:?}", e))?;
|
||||||
|
}
|
||||||
|
routing::RouterRequest::BroadcastRequest(message) => {
|
||||||
|
let response =
|
||||||
|
serde_json::to_vec(&ClientboundControlMessage::RequestMessageBroadcast {
|
||||||
|
message,
|
||||||
|
})?;
|
||||||
|
send_control.write_all(&[response.len() as u8]).await?;
|
||||||
|
send_control.write_all(&response).await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
send_task.abort();
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
} => r
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_quic(connection: Connection, routing_table: &RoutingTable) {
|
async fn handle_quic(connection: Incoming, routing_table: &RoutingTable) {
|
||||||
if let Err(e) = try_handle_quic(connection, routing_table).await {
|
if let Err(e) = try_handle_quic(connection, routing_table).await {
|
||||||
error!("Error handling QUIClime connection: {}", e);
|
error!("Error handling QUIClime connection: {:#}", e);
|
||||||
};
|
};
|
||||||
info!("Finished handling QUIClime connection");
|
info!("Finished handling QUIClime connection");
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn listen_quic(
|
async fn listen_quic(
|
||||||
mut endpoint: Server,
|
endpoint: &'static Endpoint,
|
||||||
routing_table: &'static RoutingTable,
|
routing_table: &'static RoutingTable,
|
||||||
) -> anyhow::Result<Infallible> {
|
) -> eyre::Result<Infallible> {
|
||||||
while let Some(connection) = endpoint.accept().await {
|
while let Some(connection) = endpoint.accept().await {
|
||||||
tokio::spawn(handle_quic(connection, routing_table));
|
tokio::spawn(handle_quic(connection, routing_table));
|
||||||
}
|
}
|
||||||
Err(anyhow!("quiclime endpoint closed"))
|
Err(eyre!("quiclime endpoint closed"))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn listen_control(
|
async fn listen_control(
|
||||||
|
endpoint: &'static Endpoint,
|
||||||
routing_table: &'static RoutingTable,
|
routing_table: &'static RoutingTable,
|
||||||
) -> anyhow::Result<Infallible> {
|
) -> eyre::Result<Infallible> {
|
||||||
let app = axum::Router::new()
|
let app = axum::Router::new()
|
||||||
.route(
|
.route(
|
||||||
"/metrics",
|
"/metrics",
|
||||||
|
@ -270,16 +213,16 @@ async fn listen_control(
|
||||||
)
|
)
|
||||||
.serve(app.into_make_service())
|
.serve(app.into_make_service())
|
||||||
.await?;
|
.await?;
|
||||||
Err(anyhow!("control endpoint closed"))
|
Err(eyre!("control endpoint closed"))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_handle_minecraft(
|
async fn try_handle_minecraft(
|
||||||
mut connection: TcpStream,
|
mut connection: TcpStream,
|
||||||
routing_table: &'static RoutingTable,
|
routing_table: &'static RoutingTable,
|
||||||
) -> anyhow::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
let peer = connection.peer_addr()?;
|
let peer = connection.peer_addr()?;
|
||||||
info!("Minecraft client connected from: {}", peer);
|
info!("Minecraft client connected from: {}", peer);
|
||||||
let handshake = netty::read_packet(&mut connection).await;
|
let handshake = netty::read_packet(&mut connection, 512).await;
|
||||||
if let Err(ReadError::LegacyServerListPing) = handshake {
|
if let Err(ReadError::LegacyServerListPing) = handshake {
|
||||||
connection
|
connection
|
||||||
.write_all(include_bytes!("legacy_serverlistping_response.bin"))
|
.write_all(include_bytes!("legacy_serverlistping_response.bin"))
|
||||||
|
@ -290,8 +233,16 @@ async fn try_handle_minecraft(
|
||||||
let Some(address) = handshake.normalized_address() else {
|
let Some(address) = handshake.normalized_address() else {
|
||||||
return politely_disconnect(connection, handshake).await;
|
return politely_disconnect(connection, handshake).await;
|
||||||
};
|
};
|
||||||
let Some((mut send_host, mut recv_host)) = routing_table.route(&address).await else {
|
let (mut send_host, mut recv_host) =
|
||||||
|
match routing_table.route_limited(&address, peer.ip()).await {
|
||||||
|
Ok(val) => val,
|
||||||
|
Err(RoutingError::InvalidDomain) => {
|
||||||
return politely_disconnect(connection, handshake).await;
|
return politely_disconnect(connection, handshake).await;
|
||||||
|
}
|
||||||
|
Err(RoutingError::RateLimited) => {
|
||||||
|
warn!("Connection from {} has been rate limited!", peer);
|
||||||
|
return impolitely_disconnect(connection, handshake).await;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
handshake.send(&mut send_host).await?;
|
handshake.send(&mut send_host).await?;
|
||||||
let (mut recv_client, mut send_client) = connection.split();
|
let (mut recv_client, mut send_client) = connection.split();
|
||||||
|
@ -300,23 +251,21 @@ async fn try_handle_minecraft(
|
||||||
_ = tokio::io::copy(&mut recv_host, &mut send_client) => ()
|
_ = tokio::io::copy(&mut recv_host, &mut send_client) => ()
|
||||||
}
|
}
|
||||||
_ = connection.shutdown().await;
|
_ = connection.shutdown().await;
|
||||||
_ = send_host.finish().await;
|
_ = send_host.finish();
|
||||||
_ = recv_host.stop(0u32.into());
|
_ = recv_host.stop(0u32.into());
|
||||||
|
_ = send_host.stopped().await;
|
||||||
info!("Minecraft client disconnected from: {}", peer);
|
info!("Minecraft client disconnected from: {}", peer);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn politely_disconnect(
|
async fn politely_disconnect(mut connection: TcpStream, handshake: Handshake) -> eyre::Result<()> {
|
||||||
mut connection: TcpStream,
|
|
||||||
handshake: Handshake,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
match handshake.next_state {
|
match handshake.next_state {
|
||||||
netty::HandshakeType::Status => {
|
netty::HandshakeType::Status => {
|
||||||
let packet = netty::read_packet(&mut connection).await?;
|
let packet = netty::read_packet(&mut connection, 1).await?;
|
||||||
let mut packet = packet.as_slice();
|
let mut packet = packet.as_slice();
|
||||||
let id = packet.read_varint()?;
|
let id = packet.read_varint()?;
|
||||||
if id != 0 {
|
if id != 0 {
|
||||||
return Err(anyhow!(
|
return Err(eyre!(
|
||||||
"Packet isn't a Status Request(0x00), but {:#04x}",
|
"Packet isn't a Status Request(0x00), but {:#04x}",
|
||||||
id
|
id
|
||||||
));
|
));
|
||||||
|
@ -327,14 +276,11 @@ async fn politely_disconnect(
|
||||||
.await?;
|
.await?;
|
||||||
connection.write_varint(buf.len() as i32).await?;
|
connection.write_varint(buf.len() as i32).await?;
|
||||||
connection.write_all(&buf).await?;
|
connection.write_all(&buf).await?;
|
||||||
let packet = netty::read_packet(&mut connection).await?;
|
let packet = netty::read_packet(&mut connection, 9).await?;
|
||||||
let mut packet = packet.as_slice();
|
let mut packet = packet.as_slice();
|
||||||
let id = packet.read_varint()?;
|
let id = packet.read_varint()?;
|
||||||
if id != 1 {
|
if id != 1 {
|
||||||
return Err(anyhow!(
|
return Err(eyre!("Packet isn't a Ping Request(0x01), but {:#04x}", id));
|
||||||
"Packet isn't a Ping Request(0x01), but {:#04x}",
|
|
||||||
id
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
let payload = packet.read_long()?;
|
let payload = packet.read_long()?;
|
||||||
let mut buf = Vec::with_capacity(1 + 8);
|
let mut buf = Vec::with_capacity(1 + 8);
|
||||||
|
@ -344,7 +290,7 @@ async fn politely_disconnect(
|
||||||
connection.write_all(&buf).await?;
|
connection.write_all(&buf).await?;
|
||||||
}
|
}
|
||||||
netty::HandshakeType::Login => {
|
netty::HandshakeType::Login => {
|
||||||
let _ = netty::read_packet(&mut connection).await?;
|
let _ = netty::read_packet(&mut connection, 128).await?;
|
||||||
let mut buf = vec![];
|
let mut buf = vec![];
|
||||||
buf.write_varint(0).await?;
|
buf.write_varint(0).await?;
|
||||||
buf.write_string(include_str!("./disconnect_response.json"))
|
buf.write_string(include_str!("./disconnect_response.json"))
|
||||||
|
@ -356,13 +302,60 @@ async fn politely_disconnect(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn impolitely_disconnect(
|
||||||
|
mut connection: TcpStream,
|
||||||
|
handshake: Handshake,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
match handshake.next_state {
|
||||||
|
netty::HandshakeType::Status => {
|
||||||
|
let packet = netty::read_packet(&mut connection, 1).await?;
|
||||||
|
let mut packet = packet.as_slice();
|
||||||
|
let id = packet.read_varint()?;
|
||||||
|
if id != 0 {
|
||||||
|
return Err(eyre!(
|
||||||
|
"Packet isn't a Status Request(0x00), but {:#04x}",
|
||||||
|
id
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let mut buf = vec![];
|
||||||
|
buf.write_varint(0).await?;
|
||||||
|
buf.write_string(include_str!("./serverlistping_response_rate.json"))
|
||||||
|
.await?;
|
||||||
|
connection.write_varint(buf.len() as i32).await?;
|
||||||
|
connection.write_all(&buf).await?;
|
||||||
|
let packet = netty::read_packet(&mut connection, 9).await?;
|
||||||
|
let mut packet = packet.as_slice();
|
||||||
|
let id = packet.read_varint()?;
|
||||||
|
if id != 1 {
|
||||||
|
return Err(eyre!("Packet isn't a Ping Request(0x01), but {:#04x}", id));
|
||||||
|
}
|
||||||
|
let payload = packet.read_long()?;
|
||||||
|
let mut buf = Vec::with_capacity(1 + 8);
|
||||||
|
buf.write_varint(1).await?;
|
||||||
|
buf.write_u64(payload).await?;
|
||||||
|
connection.write_varint(buf.len() as i32).await?;
|
||||||
|
connection.write_all(&buf).await?;
|
||||||
|
}
|
||||||
|
netty::HandshakeType::Login => {
|
||||||
|
let _ = netty::read_packet(&mut connection, 128).await?;
|
||||||
|
let mut buf = vec![];
|
||||||
|
buf.write_varint(0).await?;
|
||||||
|
buf.write_string(include_str!("./disconnect_response_rate.json"))
|
||||||
|
.await?;
|
||||||
|
connection.write_varint(buf.len() as i32).await?;
|
||||||
|
connection.write_all(&buf).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_minecraft(connection: TcpStream, routing_table: &'static RoutingTable) {
|
async fn handle_minecraft(connection: TcpStream, routing_table: &'static RoutingTable) {
|
||||||
if let Err(e) = try_handle_minecraft(connection, routing_table).await {
|
if let Err(e) = try_handle_minecraft(connection, routing_table).await {
|
||||||
error!("Error handling Minecraft connection: {}", e.backtrace());
|
error!("Error handling Minecraft connection: {:#}", e);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn listen_minecraft(routing_table: &'static RoutingTable) -> anyhow::Result<Infallible> {
|
async fn listen_minecraft(routing_table: &'static RoutingTable) -> eyre::Result<Infallible> {
|
||||||
let server = tokio::net::TcpListener::bind(
|
let server = tokio::net::TcpListener::bind(
|
||||||
std::env::var("QUICLIME_BIND_ADDR_MC")
|
std::env::var("QUICLIME_BIND_ADDR_MC")
|
||||||
.context("Reading QUICLIME_BIND_ADDR_MC")?
|
.context("Reading QUICLIME_BIND_ADDR_MC")?
|
||||||
|
@ -375,7 +368,7 @@ async fn listen_minecraft(routing_table: &'static RoutingTable) -> anyhow::Resul
|
||||||
tokio::spawn(handle_minecraft(connection, routing_table));
|
tokio::spawn(handle_minecraft(connection, routing_table));
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error accepting minecraft connection: {}", e);
|
error!("Error accepting minecraft connection: {:#}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
29
src/netty.rs
29
src/netty.rs
|
@ -4,7 +4,6 @@ use std::io::Read;
|
||||||
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use log::error;
|
use log::error;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
|
@ -14,6 +13,8 @@ pub enum ReadError {
|
||||||
IoError(std::io::Error),
|
IoError(std::io::Error),
|
||||||
#[error("Was not a netty packet, but a Legacy ServerListPing")]
|
#[error("Was not a netty packet, but a Legacy ServerListPing")]
|
||||||
LegacyServerListPing,
|
LegacyServerListPing,
|
||||||
|
#[error("Packet was too large")]
|
||||||
|
PacketTooLarge,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<std::io::Error> for ReadError {
|
impl From<std::io::Error> for ReadError {
|
||||||
|
@ -86,8 +87,25 @@ pub trait ReadExt: Read {
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read_packet(mut reader: impl AsyncReadExt + Unpin) -> Result<Vec<u8>, ReadError> {
|
pub async fn read_packet(
|
||||||
|
mut reader: impl AsyncReadExt + Unpin,
|
||||||
|
max_size: usize,
|
||||||
|
) -> Result<Vec<u8>, ReadError> {
|
||||||
let len = read_varint(&mut reader).await?;
|
let len = read_varint(&mut reader).await?;
|
||||||
|
if len < 0 || (len as usize) > max_size {
|
||||||
|
return Err(if len == 254 {
|
||||||
|
let mut temp = [0u8];
|
||||||
|
reader.read_exact(&mut temp).await?;
|
||||||
|
if temp[0] == 0xFA {
|
||||||
|
// FE 01 FA: Legacy ServerListPing
|
||||||
|
ReadError::LegacyServerListPing
|
||||||
|
} else {
|
||||||
|
ReadError::PacketTooLarge
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ReadError::PacketTooLarge
|
||||||
|
});
|
||||||
|
}
|
||||||
let mut buf = vec![0u8; len as usize];
|
let mut buf = vec![0u8; len as usize];
|
||||||
if len == 254 {
|
if len == 254 {
|
||||||
let mut temp = [0u8];
|
let mut temp = [0u8];
|
||||||
|
@ -119,7 +137,6 @@ async fn read_varint(mut reader: impl AsyncReadExt + Unpin) -> Result<i32, ReadE
|
||||||
|
|
||||||
impl<T: Read> ReadExt for T {}
|
impl<T: Read> ReadExt for T {}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait WriteExt: AsyncWriteExt + Unpin {
|
pub trait WriteExt: AsyncWriteExt + Unpin {
|
||||||
async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> {
|
async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> {
|
||||||
for _ in 0..5 {
|
for _ in 0..5 {
|
||||||
|
@ -156,10 +173,10 @@ pub enum HandshakeType {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handshake {
|
impl Handshake {
|
||||||
pub fn new(mut packet: &[u8]) -> anyhow::Result<Self> {
|
pub fn new(mut packet: &[u8]) -> eyre::Result<Self> {
|
||||||
let packet_type = packet.read_varint()?;
|
let packet_type = packet.read_varint()?;
|
||||||
if packet_type != 0 {
|
if packet_type != 0 {
|
||||||
Err(anyhow::anyhow!("Not a Handshake packet"))
|
Err(eyre::eyre!("Not a Handshake packet"))
|
||||||
} else {
|
} else {
|
||||||
let protocol_version = packet.read_varint()?;
|
let protocol_version = packet.read_varint()?;
|
||||||
let server_address = packet.read_string()?;
|
let server_address = packet.read_string()?;
|
||||||
|
@ -167,7 +184,7 @@ impl Handshake {
|
||||||
let next_state = match packet.read_varint()? {
|
let next_state = match packet.read_varint()? {
|
||||||
1 => HandshakeType::Status,
|
1 => HandshakeType::Status,
|
||||||
2 => HandshakeType::Login,
|
2 => HandshakeType::Login,
|
||||||
_ => return Err(anyhow::anyhow!("Invalid next state")),
|
_ => return Err(eyre::eyre!("Invalid next state")),
|
||||||
};
|
};
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
protocol_version,
|
protocol_version,
|
||||||
|
|
|
@ -1,28 +1,35 @@
|
||||||
|
use governor::DefaultKeyedRateLimiter;
|
||||||
|
use governor::Quota;
|
||||||
use log::info;
|
use log::info;
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
|
use quinn::RecvStream;
|
||||||
|
use quinn::SendStream;
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use tokio::net::TcpStream;
|
use std::net::IpAddr;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
use crate::netty::Handshake;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum RouterRequest {
|
pub enum RouterRequest {
|
||||||
RouteRequest(RouterConnection),
|
RouteRequest(RouterCallback),
|
||||||
BroadcastRequest(String),
|
BroadcastRequest(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
type RouterConnection = (Handshake, TcpStream);
|
type RouterCallback = oneshot::Sender<(SendStream, RecvStream)>;
|
||||||
type RouteRequestReceiver = mpsc::UnboundedSender<RouterRequest>;
|
type RouteRequestReceiver = mpsc::UnboundedSender<RouterRequest>;
|
||||||
|
|
||||||
#[allow(clippy::module_name_repetitions)]
|
#[allow(clippy::module_name_repetitions)]
|
||||||
#[derive(Default)]
|
|
||||||
pub struct RoutingTable {
|
pub struct RoutingTable {
|
||||||
table: RwLock<HashMap<String, RouteRequestReceiver>>,
|
table: RwLock<HashMap<String, RouteRequestReceiver>>,
|
||||||
base_domain: String,
|
base_domain: String,
|
||||||
|
limiter: DefaultKeyedRateLimiter<IpAddr>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum RoutingError {
|
||||||
|
InvalidDomain,
|
||||||
|
RateLimited,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RoutingTable {
|
impl RoutingTable {
|
||||||
|
@ -30,6 +37,7 @@ impl RoutingTable {
|
||||||
RoutingTable {
|
RoutingTable {
|
||||||
table: RwLock::default(),
|
table: RwLock::default(),
|
||||||
base_domain,
|
base_domain,
|
||||||
|
limiter: DefaultKeyedRateLimiter::dashmap(Quota::per_minute(30.try_into().unwrap())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,41 +53,44 @@ impl RoutingTable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn route(&self, domain: &str, conn: RouterConnection) -> Option<()> {
|
pub async fn route_limited(
|
||||||
|
&self,
|
||||||
|
domain: &str,
|
||||||
|
ip: IpAddr,
|
||||||
|
) -> Result<(SendStream, RecvStream), RoutingError> {
|
||||||
|
if self.limiter.check_key(&ip).is_err() {
|
||||||
|
return Err(RoutingError::RateLimited);
|
||||||
|
}
|
||||||
|
self.limiter.retain_recent();
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
self.table
|
self.table
|
||||||
.read()
|
.read()
|
||||||
.get(domain)?
|
.get(domain)
|
||||||
.send(RouterRequest::RouteRequest(conn))
|
.ok_or(RoutingError::InvalidDomain)?
|
||||||
|
.send(RouterRequest::RouteRequest(send))
|
||||||
.ok()
|
.ok()
|
||||||
|
.ok_or(RoutingError::InvalidDomain)?;
|
||||||
|
recv.await.ok().ok_or(RoutingError::InvalidDomain)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn random_domain(&self) -> String {
|
||||||
|
format!(
|
||||||
|
"{}-{}.{}",
|
||||||
|
crate::wordlist::ID_WORDS.choose(&mut rand::rng()).unwrap(),
|
||||||
|
crate::wordlist::ID_WORDS.choose(&mut rand::rng()).unwrap(),
|
||||||
|
self.base_domain
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn register(&self) -> RoutingHandle {
|
pub fn register(&self) -> RoutingHandle {
|
||||||
let mut lock = self.table.write();
|
let mut lock = self.table.write();
|
||||||
let mut domain = format!(
|
let mut domain = self.random_domain();
|
||||||
"{}-{}.{}",
|
|
||||||
crate::wordlist::ID_WORDS
|
|
||||||
.choose(&mut rand::thread_rng())
|
|
||||||
.unwrap(),
|
|
||||||
crate::wordlist::ID_WORDS
|
|
||||||
.choose(&mut rand::thread_rng())
|
|
||||||
.unwrap(),
|
|
||||||
self.base_domain
|
|
||||||
);
|
|
||||||
while lock.contains_key(&domain) {
|
while lock.contains_key(&domain) {
|
||||||
warn!(
|
warn!(
|
||||||
"Randomly selected domain {} conflicts; trying again",
|
"Randomly selected domain {} conflicts; trying again",
|
||||||
domain
|
domain
|
||||||
);
|
);
|
||||||
domain = format!(
|
domain = self.random_domain();
|
||||||
"{}-{}.{}",
|
|
||||||
crate::wordlist::ID_WORDS
|
|
||||||
.choose(&mut rand::thread_rng())
|
|
||||||
.unwrap(),
|
|
||||||
crate::wordlist::ID_WORDS
|
|
||||||
.choose(&mut rand::thread_rng())
|
|
||||||
.unwrap(),
|
|
||||||
self.base_domain
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
domain = crate::unicode_madness::validate_and_normalize_domain(&domain)
|
domain = crate::unicode_madness::validate_and_normalize_domain(&domain)
|
||||||
.expect("Resulting domain is not valid");
|
.expect("Resulting domain is not valid");
|
||||||
|
|
13
src/serverlistping_response_rate.json
Normal file
13
src/serverlistping_response_rate.json
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
{
|
||||||
|
"version": {
|
||||||
|
"name": "e4mc",
|
||||||
|
"protocol": -1
|
||||||
|
},
|
||||||
|
"players": {
|
||||||
|
"max": 0,
|
||||||
|
"online": 0
|
||||||
|
},
|
||||||
|
"description": {
|
||||||
|
"text": "You are trying to connect too fast. Please wait a minute before retrying."
|
||||||
|
}
|
||||||
|
}
|
7989
src/wordlist.rs
7989
src/wordlist.rs
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue