Compare commits
	
		
			5 commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| bf24b4ecaa | |||
| 998b210463 | |||
| 65505fa6e8 | |||
| 236f838662 | |||
| e621cca1c2 | 
					 9 changed files with 8928 additions and 1175 deletions
				
			
		
							
								
								
									
										1693
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1693
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							
							
								
								
									
										14
									
								
								Cargo.toml
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								Cargo.toml
									
									
									
									
									
								
							| 
						 | 
					@ -7,16 +7,16 @@ license = "MIT OR Apache-2.0"
 | 
				
			||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 | 
					# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[dependencies]
 | 
					[dependencies]
 | 
				
			||||||
anyhow = { version = "1.0.71", features = ["backtrace"] }
 | 
					 | 
				
			||||||
async-trait = "0.1.68"
 | 
					 | 
				
			||||||
axum = "0.6.18"
 | 
					axum = "0.6.18"
 | 
				
			||||||
env_logger = "0.10.0"
 | 
					env_logger = "0.10.0"
 | 
				
			||||||
idna = "0.4.0"
 | 
					eyre = "0.6.12"
 | 
				
			||||||
 | 
					governor = "0.10.0"
 | 
				
			||||||
 | 
					idna = "1.0.3"
 | 
				
			||||||
log = "0.4.19"
 | 
					log = "0.4.19"
 | 
				
			||||||
parking_lot = "0.12.1"
 | 
					parking_lot = "0.12.3"
 | 
				
			||||||
rand = "0.8.5"
 | 
					quinn = "0.11.6"
 | 
				
			||||||
rustls-pemfile = "1.0.2"
 | 
					rand = "0.9.0"
 | 
				
			||||||
s2n-quic = { version = "1.46.0", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] }
 | 
					rustls-pemfile = "2"
 | 
				
			||||||
serde = { version = "1.0.164", features = ["derive"] }
 | 
					serde = { version = "1.0.164", features = ["derive"] }
 | 
				
			||||||
serde_json = "1.0.97"
 | 
					serde_json = "1.0.97"
 | 
				
			||||||
thiserror = "1.0.40"
 | 
					thiserror = "1.0.40"
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -88,13 +88,13 @@
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            mcAddr = mkOption {
 | 
					            mcAddr = mkOption {
 | 
				
			||||||
              type = types.str;
 | 
					              type = types.str;
 | 
				
			||||||
              default = "0.0.0.0:25565";
 | 
					              default = "[::]:25565";
 | 
				
			||||||
              description = lib.mdDoc "The socket address to listen to Minecraft connections.";
 | 
					              description = lib.mdDoc "The socket address to listen to Minecraft connections.";
 | 
				
			||||||
            };
 | 
					            };
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            relayAddr = mkOption {
 | 
					            relayAddr = mkOption {
 | 
				
			||||||
              type = types.str;
 | 
					              type = types.str;
 | 
				
			||||||
              default = "0.0.0.0:25575";
 | 
					              default = "[::]:25575";
 | 
				
			||||||
              description = lib.mdDoc "The socket address to listen to quiclime connections.";
 | 
					              description = lib.mdDoc "The socket address to listen to quiclime connections.";
 | 
				
			||||||
            };
 | 
					            };
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										3
									
								
								src/disconnect_response_rate.json
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								src/disconnect_response_rate.json
									
									
									
									
									
										Normal file
									
								
							| 
						 | 
					@ -0,0 +1,3 @@
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    "text": "You are trying to connect too fast. Please wait a minute before retrying."
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										285
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										285
									
								
								src/main.rs
									
									
									
									
									
								
							| 
						 | 
					@ -2,26 +2,29 @@
 | 
				
			||||||
#![allow(clippy::cast_possible_truncation)]
 | 
					#![allow(clippy::cast_possible_truncation)]
 | 
				
			||||||
#![allow(clippy::cast_possible_wrap)]
 | 
					#![allow(clippy::cast_possible_wrap)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use std::{convert::Infallible, io::ErrorKind, net::SocketAddr, sync::Arc, time::Duration};
 | 
					use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use anyhow::{anyhow, Context};
 | 
					 | 
				
			||||||
use axum::{
 | 
					use axum::{
 | 
				
			||||||
    http::StatusCode,
 | 
					    http::StatusCode,
 | 
				
			||||||
    routing::{get, post},
 | 
					    routing::{get, post},
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use log::{error, info};
 | 
					use eyre::{eyre, Context};
 | 
				
			||||||
 | 
					use log::{error, info, warn};
 | 
				
			||||||
use netty::{Handshake, ReadError};
 | 
					use netty::{Handshake, ReadError};
 | 
				
			||||||
use routing::RoutingTable;
 | 
					use quinn::{
 | 
				
			||||||
use s2n_quic::{connection::Error as ConnectionError, Connection, Server};
 | 
					    crypto::rustls::QuicServerConfig,
 | 
				
			||||||
 | 
					    rustls::pki_types::{CertificateDer, PrivateKeyDer},
 | 
				
			||||||
 | 
					    ConnectionError, Endpoint, Incoming, ServerConfig, TransportConfig,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					use routing::{RoutingError, RoutingTable};
 | 
				
			||||||
use tokio::{
 | 
					use tokio::{
 | 
				
			||||||
    io::{AsyncReadExt, AsyncWriteExt},
 | 
					    io::{AsyncReadExt, AsyncWriteExt},
 | 
				
			||||||
    net::TcpStream,
 | 
					    net::TcpStream,
 | 
				
			||||||
    task::{JoinError, JoinSet},
 | 
					 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::{
 | 
					use crate::{
 | 
				
			||||||
    netty::{ReadExt, WriteExt},
 | 
					    netty::{ReadExt, WriteExt},
 | 
				
			||||||
    proto::{ClientboundControlMessage, ServerboundControlMessage}, routing::RouterRequest,
 | 
					    proto::{ClientboundControlMessage, ServerboundControlMessage},
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
mod netty;
 | 
					mod netty;
 | 
				
			||||||
| 
						 | 
					@ -30,50 +33,28 @@ mod routing;
 | 
				
			||||||
mod unicode_madness;
 | 
					mod unicode_madness;
 | 
				
			||||||
mod wordlist;
 | 
					mod wordlist;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn any_private_keys(rd: &mut dyn std::io::BufRead) -> Result<Vec<Vec<u8>>, std::io::Error> {
 | 
					fn get_certs() -> eyre::Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)> {
 | 
				
			||||||
    let mut keys = Vec::<Vec<u8>>::new();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    loop {
 | 
					 | 
				
			||||||
        match rustls_pemfile::read_one(rd)? {
 | 
					 | 
				
			||||||
            None => return Ok(keys),
 | 
					 | 
				
			||||||
            Some(
 | 
					 | 
				
			||||||
                rustls_pemfile::Item::RSAKey(key)
 | 
					 | 
				
			||||||
                | rustls_pemfile::Item::PKCS8Key(key)
 | 
					 | 
				
			||||||
                | rustls_pemfile::Item::ECKey(key),
 | 
					 | 
				
			||||||
            ) => keys.push(key),
 | 
					 | 
				
			||||||
            _ => {}
 | 
					 | 
				
			||||||
        };
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
fn get_certs() -> anyhow::Result<(Vec<Certificate>, PrivateKey)> {
 | 
					 | 
				
			||||||
    let mut cert_file = std::io::BufReader::new(std::fs::File::open(
 | 
					    let mut cert_file = std::io::BufReader::new(std::fs::File::open(
 | 
				
			||||||
        std::env::var("QUICLIME_CERT_PATH").context("Reading QUICLIME_CERT_PATH")?,
 | 
					        std::env::var("QUICLIME_CERT_PATH").context("Reading QUICLIME_CERT_PATH")?,
 | 
				
			||||||
    )?);
 | 
					    )?);
 | 
				
			||||||
    let certs = rustls_pemfile::certs(&mut cert_file)?
 | 
					    let certs = rustls_pemfile::certs(&mut cert_file)
 | 
				
			||||||
        .into_iter()
 | 
					        .filter_map(Result::ok)
 | 
				
			||||||
        .map(Certificate)
 | 
					 | 
				
			||||||
        .collect();
 | 
					        .collect();
 | 
				
			||||||
    let mut key_file = std::io::BufReader::new(std::fs::File::open(
 | 
					    let mut key_file = std::io::BufReader::new(std::fs::File::open(
 | 
				
			||||||
        std::env::var("QUICLIME_KEY_PATH").context("Reading QUICLIME_KEY_PATH")?,
 | 
					        std::env::var("QUICLIME_KEY_PATH").context("Reading QUICLIME_KEY_PATH")?,
 | 
				
			||||||
    )?);
 | 
					    )?);
 | 
				
			||||||
    let key = PrivateKey(
 | 
					    let key = rustls_pemfile::private_key(&mut key_file)?.ok_or(eyre!("No private key?"))?;
 | 
				
			||||||
        any_private_keys(&mut key_file)?
 | 
					 | 
				
			||||||
            .into_iter()
 | 
					 | 
				
			||||||
            .next()
 | 
					 | 
				
			||||||
            .ok_or(anyhow::anyhow!("No private key?"))?,
 | 
					 | 
				
			||||||
    );
 | 
					 | 
				
			||||||
    Ok((certs, key))
 | 
					    Ok((certs, key))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn create_server_config() -> anyhow::Result<ServerConfig> {
 | 
					async fn create_server_config() -> eyre::Result<ServerConfig> {
 | 
				
			||||||
    let (cert_chain, key_der) = tokio::task::spawn_blocking(get_certs).await??;
 | 
					    let (cert_chain, key_der) = tokio::task::spawn_blocking(get_certs).await??;
 | 
				
			||||||
    let mut rustls_config = rustls::ServerConfig::builder()
 | 
					    let mut rustls_config = quinn::rustls::ServerConfig::builder()
 | 
				
			||||||
        .with_safe_defaults()
 | 
					 | 
				
			||||||
        .with_no_client_auth()
 | 
					        .with_no_client_auth()
 | 
				
			||||||
        .with_single_cert(cert_chain, key_der)?;
 | 
					        .with_single_cert(cert_chain, key_der)?;
 | 
				
			||||||
    rustls_config.alpn_protocols = vec![b"quiclime".to_vec()];
 | 
					    rustls_config.alpn_protocols = vec![b"quiclime".to_vec()];
 | 
				
			||||||
    let mut config = ServerConfig::with_crypto(Arc::new(rustls_config));
 | 
					    let quic_rustls_config = QuicServerConfig::try_from(rustls_config)?;
 | 
				
			||||||
 | 
					    let mut config = ServerConfig::with_crypto(Arc::new(quic_rustls_config));
 | 
				
			||||||
    let mut transport = TransportConfig::default();
 | 
					    let mut transport = TransportConfig::default();
 | 
				
			||||||
    transport
 | 
					    transport
 | 
				
			||||||
        .max_concurrent_bidi_streams(1u32.into())
 | 
					        .max_concurrent_bidi_streams(1u32.into())
 | 
				
			||||||
| 
						 | 
					@ -84,7 +65,7 @@ async fn create_server_config() -> anyhow::Result<ServerConfig> {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[tokio::main]
 | 
					#[tokio::main]
 | 
				
			||||||
async fn main() -> anyhow::Result<()> {
 | 
					async fn main() -> eyre::Result<()> {
 | 
				
			||||||
    env_logger::init();
 | 
					    env_logger::init();
 | 
				
			||||||
    // JUSTIFICATION: this lives until the end of the entire program
 | 
					    // JUSTIFICATION: this lives until the end of the entire program
 | 
				
			||||||
    let endpoint = Box::leak(Box::new(Endpoint::server(
 | 
					    let endpoint = Box::leak(Box::new(Endpoint::server(
 | 
				
			||||||
| 
						 | 
					@ -106,136 +87,98 @@ async fn main() -> anyhow::Result<()> {
 | 
				
			||||||
    Ok(())
 | 
					    Ok(())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn try_handle_quic(
 | 
					async fn try_handle_quic(connection: Incoming, routing_table: &RoutingTable) -> eyre::Result<()> {
 | 
				
			||||||
    connection: Connection,
 | 
					    let connection = connection.await?;
 | 
				
			||||||
    routing_table: &RoutingTable,
 | 
					 | 
				
			||||||
) -> anyhow::Result<()> {
 | 
					 | 
				
			||||||
    info!(
 | 
					    info!(
 | 
				
			||||||
        "QUIClime connection established to: {}",
 | 
					        "QUIClime connection established to: {}",
 | 
				
			||||||
        connection.remote_addr()?
 | 
					        connection.remote_address()
 | 
				
			||||||
    );
 | 
					    );
 | 
				
			||||||
    let mut control = connection
 | 
					    let (mut send_control, mut recv_control) = connection.accept_bi().await?;
 | 
				
			||||||
        .accept_bidirectional_stream()
 | 
					    info!("Control channel open: {}", connection.remote_address());
 | 
				
			||||||
        .await?
 | 
					 | 
				
			||||||
        .ok_or(anyhow!(
 | 
					 | 
				
			||||||
            "Connection closed while waiting for control channel"
 | 
					 | 
				
			||||||
        ))?;
 | 
					 | 
				
			||||||
    info!("Control channel open: {}", connection.remote_addr()?);
 | 
					 | 
				
			||||||
    let mut handle = loop {
 | 
					    let mut handle = loop {
 | 
				
			||||||
        let mut buf = vec![0u8; control.read_u8().await? as _];
 | 
					        let mut buf = vec![0u8; recv_control.read_u8().await? as _];
 | 
				
			||||||
        control.read_exact(&mut buf).await?;
 | 
					        recv_control.read_exact(&mut buf).await?;
 | 
				
			||||||
        if let Ok(parsed) = serde_json::from_slice(&buf) {
 | 
					        if let Ok(parsed) = serde_json::from_slice(&buf) {
 | 
				
			||||||
            match parsed {
 | 
					            match parsed {
 | 
				
			||||||
                ServerboundControlMessage::RequestDomainAssignment => {
 | 
					                ServerboundControlMessage::RequestDomainAssignment => {
 | 
				
			||||||
                    let handle = routing_table.register();
 | 
					                    let handle = routing_table.register();
 | 
				
			||||||
                    info!(
 | 
					                    info!(
 | 
				
			||||||
                        "Domain assigned to {}: {}",
 | 
					                        "Domain assigned to {}: {}",
 | 
				
			||||||
                        connection.remote_addr()?,
 | 
					                        connection.remote_address(),
 | 
				
			||||||
                        handle.domain()
 | 
					                        handle.domain()
 | 
				
			||||||
                    );
 | 
					                    );
 | 
				
			||||||
                    let response =
 | 
					                    let response =
 | 
				
			||||||
                        serde_json::to_vec(&ClientboundControlMessage::DomainAssignmentComplete {
 | 
					                        serde_json::to_vec(&ClientboundControlMessage::DomainAssignmentComplete {
 | 
				
			||||||
                            domain: handle.domain().to_string(),
 | 
					                            domain: handle.domain().to_string(),
 | 
				
			||||||
                        })?;
 | 
					                        })?;
 | 
				
			||||||
                    control.write_all(&[response.len() as u8]).await?;
 | 
					                    send_control.write_all(&[response.len() as u8]).await?;
 | 
				
			||||||
                    control.write_all(&response).await?;
 | 
					                    send_control.write_all(&response).await?;
 | 
				
			||||||
                    break handle;
 | 
					                    break handle;
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        let response = serde_json::to_vec(&ClientboundControlMessage::UnknownMessage)?;
 | 
					        let response = serde_json::to_vec(&ClientboundControlMessage::UnknownMessage)?;
 | 
				
			||||||
        control.write_all(&[response.len() as u8]).await?;
 | 
					        send_control.write_all(&[response.len() as u8]).await?;
 | 
				
			||||||
        control.write_all(&response).await?;
 | 
					        send_control.write_all(&response).await?;
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
    let mut set = JoinSet::new();
 | 
					 | 
				
			||||||
    let (control_message_queue, mut control_message_queue_recv) = tokio::sync::mpsc::unbounded_channel();
 | 
					 | 
				
			||||||
    let (mut control_recv, mut control_send) = control.split();
 | 
					 | 
				
			||||||
    let send_task = tokio::spawn(async move {
 | 
					 | 
				
			||||||
        while let Some(event) = control_message_queue_recv.recv().await {
 | 
					 | 
				
			||||||
            let response = serde_json::to_vec(&event)?;
 | 
					 | 
				
			||||||
            control_send.write_all(&[response.len() as u8]).await?;
 | 
					 | 
				
			||||||
            control_send.write_all(&response).await?;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        Ok::<_, tokio::io::Error>(())
 | 
					 | 
				
			||||||
    });
 | 
					 | 
				
			||||||
    let control_message_queue_ref = &control_message_queue;
 | 
					 | 
				
			||||||
    set.spawn(async move {
 | 
					 | 
				
			||||||
        loop {
 | 
					 | 
				
			||||||
            let mut buf = vec![0u8; control_recv.read_u8().await? as _];
 | 
					 | 
				
			||||||
            control_recv.read_exact(&mut buf).await?;
 | 
					 | 
				
			||||||
            control_message_queue_ref.send(ClientboundControlMessage::UnknownMessage);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        Ok::<_, tokio::io::Error>(())
 | 
					 | 
				
			||||||
    });
 | 
					 | 
				
			||||||
    enum Event {
 | 
					 | 
				
			||||||
        RouterEvent(RouterRequest),
 | 
					 | 
				
			||||||
        TaskSet(Result<Result<(), tokio::io::Error>, JoinError>)
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    while let Some(remote) = tokio::select! {
 | 
					 | 
				
			||||||
        v = handle.next() => v.map(Event::RouterEvent),
 | 
					 | 
				
			||||||
        v = set.join_next() => v.map(Event::TaskSet),
 | 
					 | 
				
			||||||
    } {
 | 
					 | 
				
			||||||
        match remote {
 | 
					 | 
				
			||||||
            Event::RouterEvent(RouterRequest::RouteRequest((handshake, mut client_stream))) => {
 | 
					 | 
				
			||||||
                let stream = connection.open_bidirectional_stream().await;
 | 
					 | 
				
			||||||
                set.spawn(async move {
 | 
					 | 
				
			||||||
                    if let Err(
 | 
					 | 
				
			||||||
                        ConnectionError::Transport { .. }
 | 
					 | 
				
			||||||
                        | ConnectionError::Application { .. }
 | 
					 | 
				
			||||||
                        | ConnectionError::EndpointClosing { .. }
 | 
					 | 
				
			||||||
                        | ConnectionError::ImmediateClose { .. },
 | 
					 | 
				
			||||||
                    ) = stream
 | 
					 | 
				
			||||||
                    {
 | 
					 | 
				
			||||||
                        Ok(())
 | 
					 | 
				
			||||||
                    } else {
 | 
					 | 
				
			||||||
                        let mut stream = stream?;
 | 
					 | 
				
			||||||
                        handshake.send(&mut stream).await?;
 | 
					 | 
				
			||||||
                        tokio::io::copy_bidirectional(&mut stream, &mut client_stream).await?;
 | 
					 | 
				
			||||||
                        Ok::<_, tokio::io::Error>(())
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                });
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            Event::RouterEvent(RouterRequest::BroadcastRequest(message)) => {
 | 
					 | 
				
			||||||
                control_message_queue.send(ClientboundControlMessage::RequestMessageBroadcast {
 | 
					 | 
				
			||||||
                    message,
 | 
					 | 
				
			||||||
                });
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            Event::TaskSet(Ok(Ok(()))) => {}
 | 
					 | 
				
			||||||
            Event::TaskSet(Ok(Err(e))) => {
 | 
					 | 
				
			||||||
                if e.kind() != ErrorKind::UnexpectedEof {
 | 
					 | 
				
			||||||
                    error!("Error in task: {e:?}")
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            Event::TaskSet(Err(e)) => {
 | 
					 | 
				
			||||||
                error!("Error in task: {e:?}")
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    tokio::select! {
 | 
				
			||||||
 | 
					        e = connection.closed() => {
 | 
				
			||||||
 | 
					            match e {
 | 
				
			||||||
 | 
					                ConnectionError::ConnectionClosed(_)
 | 
				
			||||||
 | 
					                | ConnectionError::ApplicationClosed(_)
 | 
				
			||||||
 | 
					                | ConnectionError::LocallyClosed => Ok(()),
 | 
				
			||||||
 | 
					                e => Err(e.into()),
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					        r = async {
 | 
				
			||||||
 | 
					            while let Some(remote) = handle.next().await {
 | 
				
			||||||
 | 
					                match remote {
 | 
				
			||||||
 | 
					                    routing::RouterRequest::RouteRequest(remote) => {
 | 
				
			||||||
 | 
					                        let pair = connection.open_bi().await;
 | 
				
			||||||
 | 
					                        if let Err(ConnectionError::ApplicationClosed(_)) = pair {
 | 
				
			||||||
 | 
					                            break;
 | 
				
			||||||
 | 
					                        } else if let Err(ConnectionError::ConnectionClosed(_)) = pair {
 | 
				
			||||||
 | 
					                            break;
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                        remote.send(pair?).map_err(|e| eyre!("{:?}", e))?;
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                    routing::RouterRequest::BroadcastRequest(message) => {
 | 
				
			||||||
 | 
					                        let response =
 | 
				
			||||||
 | 
					                            serde_json::to_vec(&ClientboundControlMessage::RequestMessageBroadcast {
 | 
				
			||||||
 | 
					                                message,
 | 
				
			||||||
 | 
					                            })?;
 | 
				
			||||||
 | 
					                        send_control.write_all(&[response.len() as u8]).await?;
 | 
				
			||||||
 | 
					                        send_control.write_all(&response).await?;
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
    send_task.abort();
 | 
					 | 
				
			||||||
            Ok(())
 | 
					            Ok(())
 | 
				
			||||||
 | 
					        } => r
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn handle_quic(connection: Connection, routing_table: &RoutingTable) {
 | 
					async fn handle_quic(connection: Incoming, routing_table: &RoutingTable) {
 | 
				
			||||||
    if let Err(e) = try_handle_quic(connection, routing_table).await {
 | 
					    if let Err(e) = try_handle_quic(connection, routing_table).await {
 | 
				
			||||||
        error!("Error handling QUIClime connection: {}", e);
 | 
					        error!("Error handling QUIClime connection: {:#}", e);
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
    info!("Finished handling QUIClime connection");
 | 
					    info!("Finished handling QUIClime connection");
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn listen_quic(
 | 
					async fn listen_quic(
 | 
				
			||||||
    mut endpoint: Server,
 | 
					    endpoint: &'static Endpoint,
 | 
				
			||||||
    routing_table: &'static RoutingTable,
 | 
					    routing_table: &'static RoutingTable,
 | 
				
			||||||
) -> anyhow::Result<Infallible> {
 | 
					) -> eyre::Result<Infallible> {
 | 
				
			||||||
    while let Some(connection) = endpoint.accept().await {
 | 
					    while let Some(connection) = endpoint.accept().await {
 | 
				
			||||||
        tokio::spawn(handle_quic(connection, routing_table));
 | 
					        tokio::spawn(handle_quic(connection, routing_table));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    Err(anyhow!("quiclime endpoint closed"))
 | 
					    Err(eyre!("quiclime endpoint closed"))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn listen_control(
 | 
					async fn listen_control(
 | 
				
			||||||
 | 
					    endpoint: &'static Endpoint,
 | 
				
			||||||
    routing_table: &'static RoutingTable,
 | 
					    routing_table: &'static RoutingTable,
 | 
				
			||||||
) -> anyhow::Result<Infallible> {
 | 
					) -> eyre::Result<Infallible> {
 | 
				
			||||||
    let app = axum::Router::new()
 | 
					    let app = axum::Router::new()
 | 
				
			||||||
        .route(
 | 
					        .route(
 | 
				
			||||||
            "/metrics",
 | 
					            "/metrics",
 | 
				
			||||||
| 
						 | 
					@ -270,16 +213,16 @@ async fn listen_control(
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    .serve(app.into_make_service())
 | 
					    .serve(app.into_make_service())
 | 
				
			||||||
    .await?;
 | 
					    .await?;
 | 
				
			||||||
    Err(anyhow!("control endpoint closed"))
 | 
					    Err(eyre!("control endpoint closed"))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn try_handle_minecraft(
 | 
					async fn try_handle_minecraft(
 | 
				
			||||||
    mut connection: TcpStream,
 | 
					    mut connection: TcpStream,
 | 
				
			||||||
    routing_table: &'static RoutingTable,
 | 
					    routing_table: &'static RoutingTable,
 | 
				
			||||||
) -> anyhow::Result<()> {
 | 
					) -> eyre::Result<()> {
 | 
				
			||||||
    let peer = connection.peer_addr()?;
 | 
					    let peer = connection.peer_addr()?;
 | 
				
			||||||
    info!("Minecraft client connected from: {}", peer);
 | 
					    info!("Minecraft client connected from: {}", peer);
 | 
				
			||||||
    let handshake = netty::read_packet(&mut connection).await;
 | 
					    let handshake = netty::read_packet(&mut connection, 512).await;
 | 
				
			||||||
    if let Err(ReadError::LegacyServerListPing) = handshake {
 | 
					    if let Err(ReadError::LegacyServerListPing) = handshake {
 | 
				
			||||||
        connection
 | 
					        connection
 | 
				
			||||||
            .write_all(include_bytes!("legacy_serverlistping_response.bin"))
 | 
					            .write_all(include_bytes!("legacy_serverlistping_response.bin"))
 | 
				
			||||||
| 
						 | 
					@ -290,8 +233,16 @@ async fn try_handle_minecraft(
 | 
				
			||||||
    let Some(address) = handshake.normalized_address() else {
 | 
					    let Some(address) = handshake.normalized_address() else {
 | 
				
			||||||
        return politely_disconnect(connection, handshake).await;
 | 
					        return politely_disconnect(connection, handshake).await;
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
    let Some((mut send_host, mut recv_host)) = routing_table.route(&address).await else {
 | 
					    let (mut send_host, mut recv_host) =
 | 
				
			||||||
 | 
					        match routing_table.route_limited(&address, peer.ip()).await {
 | 
				
			||||||
 | 
					            Ok(val) => val,
 | 
				
			||||||
 | 
					            Err(RoutingError::InvalidDomain) => {
 | 
				
			||||||
                return politely_disconnect(connection, handshake).await;
 | 
					                return politely_disconnect(connection, handshake).await;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            Err(RoutingError::RateLimited) => {
 | 
				
			||||||
 | 
					                warn!("Connection from {} has been rate limited!", peer);
 | 
				
			||||||
 | 
					                return impolitely_disconnect(connection, handshake).await;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
    handshake.send(&mut send_host).await?;
 | 
					    handshake.send(&mut send_host).await?;
 | 
				
			||||||
    let (mut recv_client, mut send_client) = connection.split();
 | 
					    let (mut recv_client, mut send_client) = connection.split();
 | 
				
			||||||
| 
						 | 
					@ -300,23 +251,21 @@ async fn try_handle_minecraft(
 | 
				
			||||||
        _ = tokio::io::copy(&mut recv_host, &mut send_client) => ()
 | 
					        _ = tokio::io::copy(&mut recv_host, &mut send_client) => ()
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    _ = connection.shutdown().await;
 | 
					    _ = connection.shutdown().await;
 | 
				
			||||||
    _ = send_host.finish().await;
 | 
					    _ = send_host.finish();
 | 
				
			||||||
    _ = recv_host.stop(0u32.into());
 | 
					    _ = recv_host.stop(0u32.into());
 | 
				
			||||||
 | 
					    _ = send_host.stopped().await;
 | 
				
			||||||
    info!("Minecraft client disconnected from: {}", peer);
 | 
					    info!("Minecraft client disconnected from: {}", peer);
 | 
				
			||||||
    Ok(())
 | 
					    Ok(())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn politely_disconnect(
 | 
					async fn politely_disconnect(mut connection: TcpStream, handshake: Handshake) -> eyre::Result<()> {
 | 
				
			||||||
    mut connection: TcpStream,
 | 
					 | 
				
			||||||
    handshake: Handshake,
 | 
					 | 
				
			||||||
) -> anyhow::Result<()> {
 | 
					 | 
				
			||||||
    match handshake.next_state {
 | 
					    match handshake.next_state {
 | 
				
			||||||
        netty::HandshakeType::Status => {
 | 
					        netty::HandshakeType::Status => {
 | 
				
			||||||
            let packet = netty::read_packet(&mut connection).await?;
 | 
					            let packet = netty::read_packet(&mut connection, 1).await?;
 | 
				
			||||||
            let mut packet = packet.as_slice();
 | 
					            let mut packet = packet.as_slice();
 | 
				
			||||||
            let id = packet.read_varint()?;
 | 
					            let id = packet.read_varint()?;
 | 
				
			||||||
            if id != 0 {
 | 
					            if id != 0 {
 | 
				
			||||||
                return Err(anyhow!(
 | 
					                return Err(eyre!(
 | 
				
			||||||
                    "Packet isn't a Status Request(0x00), but {:#04x}",
 | 
					                    "Packet isn't a Status Request(0x00), but {:#04x}",
 | 
				
			||||||
                    id
 | 
					                    id
 | 
				
			||||||
                ));
 | 
					                ));
 | 
				
			||||||
| 
						 | 
					@ -327,14 +276,11 @@ async fn politely_disconnect(
 | 
				
			||||||
                .await?;
 | 
					                .await?;
 | 
				
			||||||
            connection.write_varint(buf.len() as i32).await?;
 | 
					            connection.write_varint(buf.len() as i32).await?;
 | 
				
			||||||
            connection.write_all(&buf).await?;
 | 
					            connection.write_all(&buf).await?;
 | 
				
			||||||
            let packet = netty::read_packet(&mut connection).await?;
 | 
					            let packet = netty::read_packet(&mut connection, 9).await?;
 | 
				
			||||||
            let mut packet = packet.as_slice();
 | 
					            let mut packet = packet.as_slice();
 | 
				
			||||||
            let id = packet.read_varint()?;
 | 
					            let id = packet.read_varint()?;
 | 
				
			||||||
            if id != 1 {
 | 
					            if id != 1 {
 | 
				
			||||||
                return Err(anyhow!(
 | 
					                return Err(eyre!("Packet isn't a Ping Request(0x01), but {:#04x}", id));
 | 
				
			||||||
                    "Packet isn't a Ping Request(0x01), but {:#04x}",
 | 
					 | 
				
			||||||
                    id
 | 
					 | 
				
			||||||
                ));
 | 
					 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            let payload = packet.read_long()?;
 | 
					            let payload = packet.read_long()?;
 | 
				
			||||||
            let mut buf = Vec::with_capacity(1 + 8);
 | 
					            let mut buf = Vec::with_capacity(1 + 8);
 | 
				
			||||||
| 
						 | 
					@ -344,7 +290,7 @@ async fn politely_disconnect(
 | 
				
			||||||
            connection.write_all(&buf).await?;
 | 
					            connection.write_all(&buf).await?;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        netty::HandshakeType::Login => {
 | 
					        netty::HandshakeType::Login => {
 | 
				
			||||||
            let _ = netty::read_packet(&mut connection).await?;
 | 
					            let _ = netty::read_packet(&mut connection, 128).await?;
 | 
				
			||||||
            let mut buf = vec![];
 | 
					            let mut buf = vec![];
 | 
				
			||||||
            buf.write_varint(0).await?;
 | 
					            buf.write_varint(0).await?;
 | 
				
			||||||
            buf.write_string(include_str!("./disconnect_response.json"))
 | 
					            buf.write_string(include_str!("./disconnect_response.json"))
 | 
				
			||||||
| 
						 | 
					@ -356,13 +302,60 @@ async fn politely_disconnect(
 | 
				
			||||||
    Ok(())
 | 
					    Ok(())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async fn impolitely_disconnect(
 | 
				
			||||||
 | 
					    mut connection: TcpStream,
 | 
				
			||||||
 | 
					    handshake: Handshake,
 | 
				
			||||||
 | 
					) -> eyre::Result<()> {
 | 
				
			||||||
 | 
					    match handshake.next_state {
 | 
				
			||||||
 | 
					        netty::HandshakeType::Status => {
 | 
				
			||||||
 | 
					            let packet = netty::read_packet(&mut connection, 1).await?;
 | 
				
			||||||
 | 
					            let mut packet = packet.as_slice();
 | 
				
			||||||
 | 
					            let id = packet.read_varint()?;
 | 
				
			||||||
 | 
					            if id != 0 {
 | 
				
			||||||
 | 
					                return Err(eyre!(
 | 
				
			||||||
 | 
					                    "Packet isn't a Status Request(0x00), but {:#04x}",
 | 
				
			||||||
 | 
					                    id
 | 
				
			||||||
 | 
					                ));
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            let mut buf = vec![];
 | 
				
			||||||
 | 
					            buf.write_varint(0).await?;
 | 
				
			||||||
 | 
					            buf.write_string(include_str!("./serverlistping_response_rate.json"))
 | 
				
			||||||
 | 
					                .await?;
 | 
				
			||||||
 | 
					            connection.write_varint(buf.len() as i32).await?;
 | 
				
			||||||
 | 
					            connection.write_all(&buf).await?;
 | 
				
			||||||
 | 
					            let packet = netty::read_packet(&mut connection, 9).await?;
 | 
				
			||||||
 | 
					            let mut packet = packet.as_slice();
 | 
				
			||||||
 | 
					            let id = packet.read_varint()?;
 | 
				
			||||||
 | 
					            if id != 1 {
 | 
				
			||||||
 | 
					                return Err(eyre!("Packet isn't a Ping Request(0x01), but {:#04x}", id));
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            let payload = packet.read_long()?;
 | 
				
			||||||
 | 
					            let mut buf = Vec::with_capacity(1 + 8);
 | 
				
			||||||
 | 
					            buf.write_varint(1).await?;
 | 
				
			||||||
 | 
					            buf.write_u64(payload).await?;
 | 
				
			||||||
 | 
					            connection.write_varint(buf.len() as i32).await?;
 | 
				
			||||||
 | 
					            connection.write_all(&buf).await?;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        netty::HandshakeType::Login => {
 | 
				
			||||||
 | 
					            let _ = netty::read_packet(&mut connection, 128).await?;
 | 
				
			||||||
 | 
					            let mut buf = vec![];
 | 
				
			||||||
 | 
					            buf.write_varint(0).await?;
 | 
				
			||||||
 | 
					            buf.write_string(include_str!("./disconnect_response_rate.json"))
 | 
				
			||||||
 | 
					                .await?;
 | 
				
			||||||
 | 
					            connection.write_varint(buf.len() as i32).await?;
 | 
				
			||||||
 | 
					            connection.write_all(&buf).await?;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    Ok(())
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn handle_minecraft(connection: TcpStream, routing_table: &'static RoutingTable) {
 | 
					async fn handle_minecraft(connection: TcpStream, routing_table: &'static RoutingTable) {
 | 
				
			||||||
    if let Err(e) = try_handle_minecraft(connection, routing_table).await {
 | 
					    if let Err(e) = try_handle_minecraft(connection, routing_table).await {
 | 
				
			||||||
        error!("Error handling Minecraft connection: {}", e.backtrace());
 | 
					        error!("Error handling Minecraft connection: {:#}", e);
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn listen_minecraft(routing_table: &'static RoutingTable) -> anyhow::Result<Infallible> {
 | 
					async fn listen_minecraft(routing_table: &'static RoutingTable) -> eyre::Result<Infallible> {
 | 
				
			||||||
    let server = tokio::net::TcpListener::bind(
 | 
					    let server = tokio::net::TcpListener::bind(
 | 
				
			||||||
        std::env::var("QUICLIME_BIND_ADDR_MC")
 | 
					        std::env::var("QUICLIME_BIND_ADDR_MC")
 | 
				
			||||||
            .context("Reading QUICLIME_BIND_ADDR_MC")?
 | 
					            .context("Reading QUICLIME_BIND_ADDR_MC")?
 | 
				
			||||||
| 
						 | 
					@ -375,7 +368,7 @@ async fn listen_minecraft(routing_table: &'static RoutingTable) -> anyhow::Resul
 | 
				
			||||||
                tokio::spawn(handle_minecraft(connection, routing_table));
 | 
					                tokio::spawn(handle_minecraft(connection, routing_table));
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            Err(e) => {
 | 
					            Err(e) => {
 | 
				
			||||||
                error!("Error accepting minecraft connection: {}", e);
 | 
					                error!("Error accepting minecraft connection: {:#}", e);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										29
									
								
								src/netty.rs
									
									
									
									
									
								
							
							
						
						
									
										29
									
								
								src/netty.rs
									
									
									
									
									
								
							| 
						 | 
					@ -4,7 +4,6 @@ use std::io::Read;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
					use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use async_trait::async_trait;
 | 
					 | 
				
			||||||
use log::error;
 | 
					use log::error;
 | 
				
			||||||
use thiserror::Error;
 | 
					use thiserror::Error;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -14,6 +13,8 @@ pub enum ReadError {
 | 
				
			||||||
    IoError(std::io::Error),
 | 
					    IoError(std::io::Error),
 | 
				
			||||||
    #[error("Was not a netty packet, but a Legacy ServerListPing")]
 | 
					    #[error("Was not a netty packet, but a Legacy ServerListPing")]
 | 
				
			||||||
    LegacyServerListPing,
 | 
					    LegacyServerListPing,
 | 
				
			||||||
 | 
					    #[error("Packet was too large")]
 | 
				
			||||||
 | 
					    PacketTooLarge,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl From<std::io::Error> for ReadError {
 | 
					impl From<std::io::Error> for ReadError {
 | 
				
			||||||
| 
						 | 
					@ -86,8 +87,25 @@ pub trait ReadExt: Read {
 | 
				
			||||||
    // }
 | 
					    // }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub async fn read_packet(mut reader: impl AsyncReadExt + Unpin) -> Result<Vec<u8>, ReadError> {
 | 
					pub async fn read_packet(
 | 
				
			||||||
 | 
					    mut reader: impl AsyncReadExt + Unpin,
 | 
				
			||||||
 | 
					    max_size: usize,
 | 
				
			||||||
 | 
					) -> Result<Vec<u8>, ReadError> {
 | 
				
			||||||
    let len = read_varint(&mut reader).await?;
 | 
					    let len = read_varint(&mut reader).await?;
 | 
				
			||||||
 | 
					    if len < 0 || (len as usize) > max_size {
 | 
				
			||||||
 | 
					        return Err(if len == 254 {
 | 
				
			||||||
 | 
					            let mut temp = [0u8];
 | 
				
			||||||
 | 
					            reader.read_exact(&mut temp).await?;
 | 
				
			||||||
 | 
					            if temp[0] == 0xFA {
 | 
				
			||||||
 | 
					                // FE 01 FA: Legacy ServerListPing
 | 
				
			||||||
 | 
					                ReadError::LegacyServerListPing
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                ReadError::PacketTooLarge
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            ReadError::PacketTooLarge
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    let mut buf = vec![0u8; len as usize];
 | 
					    let mut buf = vec![0u8; len as usize];
 | 
				
			||||||
    if len == 254 {
 | 
					    if len == 254 {
 | 
				
			||||||
        let mut temp = [0u8];
 | 
					        let mut temp = [0u8];
 | 
				
			||||||
| 
						 | 
					@ -119,7 +137,6 @@ async fn read_varint(mut reader: impl AsyncReadExt + Unpin) -> Result<i32, ReadE
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl<T: Read> ReadExt for T {}
 | 
					impl<T: Read> ReadExt for T {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[async_trait]
 | 
					 | 
				
			||||||
pub trait WriteExt: AsyncWriteExt + Unpin {
 | 
					pub trait WriteExt: AsyncWriteExt + Unpin {
 | 
				
			||||||
    async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> {
 | 
					    async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> {
 | 
				
			||||||
        for _ in 0..5 {
 | 
					        for _ in 0..5 {
 | 
				
			||||||
| 
						 | 
					@ -156,10 +173,10 @@ pub enum HandshakeType {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl Handshake {
 | 
					impl Handshake {
 | 
				
			||||||
    pub fn new(mut packet: &[u8]) -> anyhow::Result<Self> {
 | 
					    pub fn new(mut packet: &[u8]) -> eyre::Result<Self> {
 | 
				
			||||||
        let packet_type = packet.read_varint()?;
 | 
					        let packet_type = packet.read_varint()?;
 | 
				
			||||||
        if packet_type != 0 {
 | 
					        if packet_type != 0 {
 | 
				
			||||||
            Err(anyhow::anyhow!("Not a Handshake packet"))
 | 
					            Err(eyre::eyre!("Not a Handshake packet"))
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            let protocol_version = packet.read_varint()?;
 | 
					            let protocol_version = packet.read_varint()?;
 | 
				
			||||||
            let server_address = packet.read_string()?;
 | 
					            let server_address = packet.read_string()?;
 | 
				
			||||||
| 
						 | 
					@ -167,7 +184,7 @@ impl Handshake {
 | 
				
			||||||
            let next_state = match packet.read_varint()? {
 | 
					            let next_state = match packet.read_varint()? {
 | 
				
			||||||
                1 => HandshakeType::Status,
 | 
					                1 => HandshakeType::Status,
 | 
				
			||||||
                2 => HandshakeType::Login,
 | 
					                2 => HandshakeType::Login,
 | 
				
			||||||
                _ => return Err(anyhow::anyhow!("Invalid next state")),
 | 
					                _ => return Err(eyre::eyre!("Invalid next state")),
 | 
				
			||||||
            };
 | 
					            };
 | 
				
			||||||
            Ok(Self {
 | 
					            Ok(Self {
 | 
				
			||||||
                protocol_version,
 | 
					                protocol_version,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,28 +1,35 @@
 | 
				
			||||||
 | 
					use governor::DefaultKeyedRateLimiter;
 | 
				
			||||||
 | 
					use governor::Quota;
 | 
				
			||||||
use log::info;
 | 
					use log::info;
 | 
				
			||||||
use log::warn;
 | 
					use log::warn;
 | 
				
			||||||
use parking_lot::RwLock;
 | 
					use parking_lot::RwLock;
 | 
				
			||||||
 | 
					use quinn::RecvStream;
 | 
				
			||||||
 | 
					use quinn::SendStream;
 | 
				
			||||||
use rand::prelude::*;
 | 
					use rand::prelude::*;
 | 
				
			||||||
use std::collections::HashMap;
 | 
					use std::collections::HashMap;
 | 
				
			||||||
use tokio::net::TcpStream;
 | 
					use std::net::IpAddr;
 | 
				
			||||||
use tokio::sync::mpsc;
 | 
					use tokio::sync::mpsc;
 | 
				
			||||||
use tokio::sync::oneshot;
 | 
					use tokio::sync::oneshot;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::netty::Handshake;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
#[derive(Debug)]
 | 
					#[derive(Debug)]
 | 
				
			||||||
pub enum RouterRequest {
 | 
					pub enum RouterRequest {
 | 
				
			||||||
    RouteRequest(RouterConnection),
 | 
					    RouteRequest(RouterCallback),
 | 
				
			||||||
    BroadcastRequest(String),
 | 
					    BroadcastRequest(String),
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type RouterConnection = (Handshake, TcpStream);
 | 
					type RouterCallback = oneshot::Sender<(SendStream, RecvStream)>;
 | 
				
			||||||
type RouteRequestReceiver = mpsc::UnboundedSender<RouterRequest>;
 | 
					type RouteRequestReceiver = mpsc::UnboundedSender<RouterRequest>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[allow(clippy::module_name_repetitions)]
 | 
					#[allow(clippy::module_name_repetitions)]
 | 
				
			||||||
#[derive(Default)]
 | 
					 | 
				
			||||||
pub struct RoutingTable {
 | 
					pub struct RoutingTable {
 | 
				
			||||||
    table: RwLock<HashMap<String, RouteRequestReceiver>>,
 | 
					    table: RwLock<HashMap<String, RouteRequestReceiver>>,
 | 
				
			||||||
    base_domain: String,
 | 
					    base_domain: String,
 | 
				
			||||||
 | 
					    limiter: DefaultKeyedRateLimiter<IpAddr>,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub enum RoutingError {
 | 
				
			||||||
 | 
					    InvalidDomain,
 | 
				
			||||||
 | 
					    RateLimited,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl RoutingTable {
 | 
					impl RoutingTable {
 | 
				
			||||||
| 
						 | 
					@ -30,6 +37,7 @@ impl RoutingTable {
 | 
				
			||||||
        RoutingTable {
 | 
					        RoutingTable {
 | 
				
			||||||
            table: RwLock::default(),
 | 
					            table: RwLock::default(),
 | 
				
			||||||
            base_domain,
 | 
					            base_domain,
 | 
				
			||||||
 | 
					            limiter: DefaultKeyedRateLimiter::dashmap(Quota::per_minute(30.try_into().unwrap())),
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -45,41 +53,44 @@ impl RoutingTable {
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn route(&self, domain: &str, conn: RouterConnection) -> Option<()> {
 | 
					    pub async fn route_limited(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        domain: &str,
 | 
				
			||||||
 | 
					        ip: IpAddr,
 | 
				
			||||||
 | 
					    ) -> Result<(SendStream, RecvStream), RoutingError> {
 | 
				
			||||||
 | 
					        if self.limiter.check_key(&ip).is_err() {
 | 
				
			||||||
 | 
					            return Err(RoutingError::RateLimited);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        self.limiter.retain_recent();
 | 
				
			||||||
 | 
					        let (send, recv) = oneshot::channel();
 | 
				
			||||||
        self.table
 | 
					        self.table
 | 
				
			||||||
            .read()
 | 
					            .read()
 | 
				
			||||||
            .get(domain)?
 | 
					            .get(domain)
 | 
				
			||||||
            .send(RouterRequest::RouteRequest(conn))
 | 
					            .ok_or(RoutingError::InvalidDomain)?
 | 
				
			||||||
 | 
					            .send(RouterRequest::RouteRequest(send))
 | 
				
			||||||
            .ok()
 | 
					            .ok()
 | 
				
			||||||
 | 
					            .ok_or(RoutingError::InvalidDomain)?;
 | 
				
			||||||
 | 
					        recv.await.ok().ok_or(RoutingError::InvalidDomain)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn random_domain(&self) -> String {
 | 
				
			||||||
 | 
					        format!(
 | 
				
			||||||
 | 
					            "{}-{}.{}",
 | 
				
			||||||
 | 
					            crate::wordlist::ID_WORDS.choose(&mut rand::rng()).unwrap(),
 | 
				
			||||||
 | 
					            crate::wordlist::ID_WORDS.choose(&mut rand::rng()).unwrap(),
 | 
				
			||||||
 | 
					            self.base_domain
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn register(&self) -> RoutingHandle {
 | 
					    pub fn register(&self) -> RoutingHandle {
 | 
				
			||||||
        let mut lock = self.table.write();
 | 
					        let mut lock = self.table.write();
 | 
				
			||||||
        let mut domain = format!(
 | 
					        let mut domain = self.random_domain();
 | 
				
			||||||
            "{}-{}.{}",
 | 
					 | 
				
			||||||
            crate::wordlist::ID_WORDS
 | 
					 | 
				
			||||||
                .choose(&mut rand::thread_rng())
 | 
					 | 
				
			||||||
                .unwrap(),
 | 
					 | 
				
			||||||
            crate::wordlist::ID_WORDS
 | 
					 | 
				
			||||||
                .choose(&mut rand::thread_rng())
 | 
					 | 
				
			||||||
                .unwrap(),
 | 
					 | 
				
			||||||
            self.base_domain
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        while lock.contains_key(&domain) {
 | 
					        while lock.contains_key(&domain) {
 | 
				
			||||||
            warn!(
 | 
					            warn!(
 | 
				
			||||||
                "Randomly selected domain {} conflicts; trying again",
 | 
					                "Randomly selected domain {} conflicts; trying again",
 | 
				
			||||||
                domain
 | 
					                domain
 | 
				
			||||||
            );
 | 
					            );
 | 
				
			||||||
            domain = format!(
 | 
					            domain = self.random_domain();
 | 
				
			||||||
                "{}-{}.{}",
 | 
					 | 
				
			||||||
                crate::wordlist::ID_WORDS
 | 
					 | 
				
			||||||
                    .choose(&mut rand::thread_rng())
 | 
					 | 
				
			||||||
                    .unwrap(),
 | 
					 | 
				
			||||||
                crate::wordlist::ID_WORDS
 | 
					 | 
				
			||||||
                    .choose(&mut rand::thread_rng())
 | 
					 | 
				
			||||||
                    .unwrap(),
 | 
					 | 
				
			||||||
                self.base_domain
 | 
					 | 
				
			||||||
            );
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        domain = crate::unicode_madness::validate_and_normalize_domain(&domain)
 | 
					        domain = crate::unicode_madness::validate_and_normalize_domain(&domain)
 | 
				
			||||||
            .expect("Resulting domain is not valid");
 | 
					            .expect("Resulting domain is not valid");
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										13
									
								
								src/serverlistping_response_rate.json
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								src/serverlistping_response_rate.json
									
									
									
									
									
										Normal file
									
								
							| 
						 | 
					@ -0,0 +1,13 @@
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    "version": {
 | 
				
			||||||
 | 
					        "name": "e4mc",
 | 
				
			||||||
 | 
					        "protocol": -1
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "players": {
 | 
				
			||||||
 | 
					        "max": 0,
 | 
				
			||||||
 | 
					        "online": 0
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "description": {
 | 
				
			||||||
 | 
					        "text": "You are trying to connect too fast. Please wait a minute before retrying."
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										7989
									
								
								src/wordlist.rs
									
									
									
									
									
								
							
							
						
						
									
										7989
									
								
								src/wordlist.rs
									
									
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							
		Loading…
	
		Reference in a new issue