Compare commits

..

4 commits

Author SHA1 Message Date
0d4aae1d63
Revert "sentry and other misc stuff"
This reverts commit eafc9e355c.
2024-06-17 21:01:40 +09:00
dc836cd2d3
Revert "more stuff"
This reverts commit f2a2c28202.
2024-06-17 21:01:39 +09:00
9fd86b0cd8
Revert "fuck"
This reverts commit d0551eb090.
2024-06-17 21:01:36 +09:00
63a8339e5e
Revert "fuck"
This reverts commit ad2b6c2525.
2024-06-17 21:01:29 +09:00
7 changed files with 392 additions and 1504 deletions

1680
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,22 +7,21 @@ license = "MIT OR Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
axum = "0.7.5" anyhow = { version = "1.0.71", features = ["backtrace"] }
eyre = "0.6.12" async-trait = "0.1.68"
idna = "1.0.0" axum = "0.6.18"
parking_lot = "0.12.3" env_logger = "0.10.0"
quinn = "0.11.2" idna = "0.4.0"
log = "0.4.19"
parking_lot = "0.12.1"
quinn = "0.10.1"
rand = "0.8.5" rand = "0.8.5"
rustls = "0.23.10" rustls = "0.21.9"
rustls-pemfile = "2.1.2" rustls-pemfile = "1.0.2"
sentry = { version = "0.34.0", default-features = false, features = ["backtrace", "contexts", "panic", "debug-images", "reqwest", "rustls"] }
sentry-tracing = "0.34.0"
serde = { version = "1.0.164", features = ["derive"] } serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1.0.97" serde_json = "1.0.97"
thiserror = "1.0.40" thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["rt-multi-thread", "fs", "macros", "io-util", "net"] } tokio = { version = "1.28.2", features = ["rt-multi-thread", "fs", "macros", "io-util", "net"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
[profile.release] [profile.release]
lto = "fat" lto = "fat"

View file

@ -3,8 +3,8 @@
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 0, "lastModified": 0,
"narHash": "sha256-1+ua0ggXlYYPLTmMl3YeYYsBXDSCqT+Gw3u6l4gvMhA=", "narHash": "sha256-doPgfj+7FFe9rfzWo1siAV2mVCasW+Bh8I1cToAXEE4=",
"path": "/nix/store/x887lkxvgnrrcfgrzz351qhfvvrkm80x-source", "path": "/nix/store/asymc3nsl739p1wwr0w6xbjnqs3qb94p-source",
"type": "path" "type": "path"
}, },
"original": { "original": {

View file

@ -115,12 +115,6 @@
example = "/path/to/key.pem"; example = "/path/to/key.pem";
description = lib.mdDoc "Path to TLS key to use for quiclime connections."; description = lib.mdDoc "Path to TLS key to use for quiclime connections.";
}; };
sentryDsn = mkOption {
type = types.str;
example = "https://key@sentry.io/42";
description = lib.mdDoc "Sentry DSN to use for error reports.";
};
}; };
}; };
@ -147,7 +141,6 @@
QUICLIME_BIND_ADDR_WEB = cfg.controlAddr; QUICLIME_BIND_ADDR_WEB = cfg.controlAddr;
QUICLIME_CERT_PATH = cfg.cert; QUICLIME_CERT_PATH = cfg.cert;
QUICLIME_KEY_PATH = cfg.key; QUICLIME_KEY_PATH = cfg.key;
SENTRY_DSN = cfg.sentryDsn;
}; };
}; };

View file

@ -2,30 +2,22 @@
#![allow(clippy::cast_possible_truncation)] #![allow(clippy::cast_possible_truncation)]
#![allow(clippy::cast_possible_wrap)] #![allow(clippy::cast_possible_wrap)]
use std::{ use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
convert::Infallible,
net::SocketAddr,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};
use anyhow::{anyhow, Context};
use axum::{ use axum::{
http::StatusCode, http::StatusCode,
routing::{get, post}, routing::{get, post},
}; };
use eyre::{eyre, OptionExt, WrapErr}; use log::{error, info};
use netty::{Handshake, ReadError}; use netty::{Handshake, ReadError};
use quinn::{ use quinn::{Connecting, ConnectionError, Endpoint, ServerConfig, TransportConfig};
crypto::rustls::QuicServerConfig, ConnectionError, Endpoint, Incoming, ServerConfig,
TransportConfig,
};
use routing::RoutingTable; use routing::RoutingTable;
use rustls::{Certificate, PrivateKey};
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream}, net::TcpStream,
}; };
use tracing::{error, info};
use tracing_subscriber::{prelude::*, EnvFilter};
use crate::{ use crate::{
netty::{ReadExt, WriteExt}, netty::{ReadExt, WriteExt},
@ -38,24 +30,50 @@ mod routing;
mod unicode_madness; mod unicode_madness;
mod wordlist; mod wordlist;
async fn create_server_config() -> eyre::Result<ServerConfig> { fn any_private_keys(rd: &mut dyn std::io::BufRead) -> Result<Vec<Vec<u8>>, std::io::Error> {
let cert_file = let mut keys = Vec::<Vec<u8>>::new();
tokio::fs::read(std::env::var("QUICLIME_CERT_PATH").context("Reading QUICLIME_CERT_PATH")?)
.await?; loop {
let cert_chain = rustls_pemfile::certs(&mut cert_file.as_slice()) match rustls_pemfile::read_one(rd)? {
.filter_map(Result::ok) None => return Ok(keys),
Some(
rustls_pemfile::Item::RSAKey(key)
| rustls_pemfile::Item::PKCS8Key(key)
| rustls_pemfile::Item::ECKey(key),
) => keys.push(key),
_ => {}
};
}
}
fn get_certs() -> anyhow::Result<(Vec<Certificate>, PrivateKey)> {
let mut cert_file = std::io::BufReader::new(std::fs::File::open(
std::env::var("QUICLIME_CERT_PATH").context("Reading QUICLIME_CERT_PATH")?,
)?);
let certs = rustls_pemfile::certs(&mut cert_file)?
.into_iter()
.map(Certificate)
.collect(); .collect();
let key_file = let mut key_file = std::io::BufReader::new(std::fs::File::open(
tokio::fs::read(std::env::var("QUICLIME_KEY_PATH").context("Reading QUICLIME_KEY_PATH")?) std::env::var("QUICLIME_KEY_PATH").context("Reading QUICLIME_KEY_PATH")?,
.await?; )?);
let key_der = rustls_pemfile::private_key(&mut key_file.as_slice())? let key = PrivateKey(
.ok_or_eyre("No private key in QUICLIME_KEY_PATH!")?; any_private_keys(&mut key_file)?
.into_iter()
.next()
.ok_or(anyhow::anyhow!("No private key?"))?,
);
Ok((certs, key))
}
async fn create_server_config() -> anyhow::Result<ServerConfig> {
let (cert_chain, key_der) = tokio::task::spawn_blocking(get_certs).await??;
let mut rustls_config = rustls::ServerConfig::builder() let mut rustls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth() .with_no_client_auth()
.with_single_cert(cert_chain, key_der)?; .with_single_cert(cert_chain, key_der)?;
rustls_config.alpn_protocols = vec![b"quiclime".to_vec()]; rustls_config.alpn_protocols = vec![b"quiclime".to_vec()];
let config: QuicServerConfig = rustls_config.try_into()?; let mut config = ServerConfig::with_crypto(Arc::new(rustls_config));
let mut config = ServerConfig::with_crypto(Arc::new(config));
let mut transport = TransportConfig::default(); let mut transport = TransportConfig::default();
transport transport
.max_concurrent_bidi_streams(1u32.into()) .max_concurrent_bidi_streams(1u32.into())
@ -65,32 +83,9 @@ async fn create_server_config() -> eyre::Result<ServerConfig> {
Ok(config) Ok(config)
} }
static CLIENT_COUNT: AtomicUsize = AtomicUsize::new(0);
struct ClientCounterGuard;
impl ClientCounterGuard {
fn new() -> Self {
CLIENT_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self
}
}
impl Drop for ClientCounterGuard {
fn drop(&mut self) {
CLIENT_COUNT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[tokio::main] #[tokio::main]
async fn main() -> eyre::Result<()> { async fn main() -> anyhow::Result<()> {
let _guard = sentry::init(std::env::var("SENTRY_DSN").ok()); env_logger::init();
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer())
.with(sentry_tracing::layer())
.init();
rustls::crypto::aws_lc_rs::default_provider().install_default().unwrap();
// JUSTIFICATION: this lives until the end of the entire program // JUSTIFICATION: this lives until the end of the entire program
let endpoint = Box::leak(Box::new(Endpoint::server( let endpoint = Box::leak(Box::new(Endpoint::server(
create_server_config().await?, create_server_config().await?,
@ -111,7 +106,10 @@ async fn main() -> eyre::Result<()> {
Ok(()) Ok(())
} }
async fn try_handle_quic(connection: Incoming, routing_table: &RoutingTable) -> eyre::Result<()> { async fn try_handle_quic(
connection: Connecting,
routing_table: &RoutingTable,
) -> anyhow::Result<()> {
let connection = connection.await?; let connection = connection.await?;
info!( info!(
"QUIClime connection established to: {}", "QUIClime connection established to: {}",
@ -165,7 +163,7 @@ async fn try_handle_quic(connection: Incoming, routing_table: &RoutingTable) ->
} else if let Err(ConnectionError::ConnectionClosed(_)) = pair { } else if let Err(ConnectionError::ConnectionClosed(_)) = pair {
break; break;
} }
remote.send(pair?).map_err(|e| eyre!("{:?}", e))?; remote.send(pair?).map_err(|e| anyhow::anyhow!("{:?}", e))?;
} }
routing::RouterRequest::BroadcastRequest(message) => { routing::RouterRequest::BroadcastRequest(message) => {
let response = let response =
@ -182,10 +180,8 @@ async fn try_handle_quic(connection: Incoming, routing_table: &RoutingTable) ->
} }
} }
#[tracing::instrument(skip(routing_table))] async fn handle_quic(connection: Connecting, routing_table: &RoutingTable) {
async fn handle_quic(connection: Incoming, routing_table: &RoutingTable) {
if let Err(e) = try_handle_quic(connection, routing_table).await { if let Err(e) = try_handle_quic(connection, routing_table).await {
sentry::capture_error::<dyn std::error::Error>(e.as_ref());
error!("Error handling QUIClime connection: {}", e); error!("Error handling QUIClime connection: {}", e);
}; };
info!("Finished handling QUIClime connection"); info!("Finished handling QUIClime connection");
@ -194,27 +190,21 @@ async fn handle_quic(connection: Incoming, routing_table: &RoutingTable) {
async fn listen_quic( async fn listen_quic(
endpoint: &'static Endpoint, endpoint: &'static Endpoint,
routing_table: &'static RoutingTable, routing_table: &'static RoutingTable,
) -> eyre::Result<Infallible> { ) -> anyhow::Result<Infallible> {
while let Some(connection) = endpoint.accept().await { while let Some(connection) = endpoint.accept().await {
tokio::spawn(handle_quic(connection, routing_table)); tokio::spawn(handle_quic(connection, routing_table));
} }
Err(eyre!("quiclime endpoint closed")) Err(anyhow!("quiclime endpoint closed"))
} }
async fn listen_control( async fn listen_control(
endpoint: &'static Endpoint, endpoint: &'static Endpoint,
routing_table: &'static RoutingTable, routing_table: &'static RoutingTable,
) -> eyre::Result<Infallible> { ) -> anyhow::Result<Infallible> {
let app = axum::Router::new() let app = axum::Router::new()
.route( .route(
"/metrics", "/metrics",
get(|| async { get(|| async { format!("host_count {}", routing_table.size()) }),
format!(
"host_count {}\nguest_count {}\n",
routing_table.size(),
CLIENT_COUNT.load(std::sync::atomic::Ordering::Relaxed)
)
}),
) )
.route( .route(
"/reload-certs", "/reload-certs",
@ -235,24 +225,23 @@ async fn listen_control(
.route( .route(
"/stop", "/stop",
post(|| async { post(|| async {
routing_table.broadcast("e4mc relay server stopping!");
tokio::time::sleep(Duration::from_secs(1)).await;
endpoint.close(0u32.into(), b"e4mc closing"); endpoint.close(0u32.into(), b"e4mc closing");
}), }),
); );
let listener = TcpListener::bind( axum::Server::bind(
std::env::var("QUICLIME_BIND_ADDR_WEB").context("Reading QUICLIME_BIND_ADDR_WEB")?, &std::env::var("QUICLIME_BIND_ADDR_WEB")
.context("Reading QUICLIME_BIND_ADDR_WEB")?
.parse()?,
) )
.serve(app.into_make_service())
.await?; .await?;
axum::serve(listener, app).await?; Err(anyhow!("control endpoint closed"))
Err(eyre!("control endpoint closed"))
} }
async fn try_handle_minecraft( async fn try_handle_minecraft(
mut connection: TcpStream, mut connection: TcpStream,
routing_table: &'static RoutingTable, routing_table: &'static RoutingTable,
) -> eyre::Result<()> { ) -> anyhow::Result<()> {
let guard = ClientCounterGuard::new();
let peer = connection.peer_addr()?; let peer = connection.peer_addr()?;
info!("Minecraft client connected from: {}", peer); info!("Minecraft client connected from: {}", peer);
let handshake = netty::read_packet(&mut connection).await; let handshake = netty::read_packet(&mut connection).await;
@ -270,24 +259,29 @@ async fn try_handle_minecraft(
return politely_disconnect(connection, handshake).await; return politely_disconnect(connection, handshake).await;
}; };
handshake.send(&mut send_host).await?; handshake.send(&mut send_host).await?;
let mut conn_host = tokio::io::join(&mut recv_host, &mut send_host); let (mut recv_client, mut send_client) = connection.split();
_ = tokio::io::copy_bidirectional(&mut connection, &mut conn_host); tokio::select! {
_ = tokio::io::copy(&mut recv_client, &mut send_host) => (),
_ = tokio::io::copy(&mut recv_host, &mut send_client) => ()
}
_ = connection.shutdown().await; _ = connection.shutdown().await;
_ = send_host.finish(); _ = send_host.finish().await;
_ = recv_host.stop(0u32.into()); _ = recv_host.stop(0u32.into());
info!("Minecraft client disconnected from: {}", peer); info!("Minecraft client disconnected from: {}", peer);
drop(guard);
Ok(()) Ok(())
} }
async fn politely_disconnect(mut connection: TcpStream, handshake: Handshake) -> eyre::Result<()> { async fn politely_disconnect(
mut connection: TcpStream,
handshake: Handshake,
) -> anyhow::Result<()> {
match handshake.next_state { match handshake.next_state {
netty::HandshakeType::Status => { netty::HandshakeType::Status => {
let packet = netty::read_packet(&mut connection).await?; let packet = netty::read_packet(&mut connection).await?;
let mut packet = packet.as_slice(); let mut packet = packet.as_slice();
let id = packet.read_varint()?; let id = packet.read_varint()?;
if id != 0 { if id != 0 {
return Err(eyre!( return Err(anyhow!(
"Packet isn't a Status Request(0x00), but {:#04x}", "Packet isn't a Status Request(0x00), but {:#04x}",
id id
)); ));
@ -302,7 +296,10 @@ async fn politely_disconnect(mut connection: TcpStream, handshake: Handshake) ->
let mut packet = packet.as_slice(); let mut packet = packet.as_slice();
let id = packet.read_varint()?; let id = packet.read_varint()?;
if id != 1 { if id != 1 {
return Err(eyre!("Packet isn't a Ping Request(0x01), but {:#04x}", id)); return Err(anyhow!(
"Packet isn't a Ping Request(0x01), but {:#04x}",
id
));
} }
let payload = packet.read_long()?; let payload = packet.read_long()?;
let mut buf = Vec::with_capacity(1 + 8); let mut buf = Vec::with_capacity(1 + 8);
@ -324,15 +321,13 @@ async fn politely_disconnect(mut connection: TcpStream, handshake: Handshake) ->
Ok(()) Ok(())
} }
#[tracing::instrument(skip(routing_table))]
async fn handle_minecraft(connection: TcpStream, routing_table: &'static RoutingTable) { async fn handle_minecraft(connection: TcpStream, routing_table: &'static RoutingTable) {
if let Err(e) = try_handle_minecraft(connection, routing_table).await { if let Err(e) = try_handle_minecraft(connection, routing_table).await {
sentry::capture_error::<dyn std::error::Error>(e.as_ref()); error!("Error handling Minecraft connection: {}", e.backtrace());
error!("Error handling Minecraft connection: {:#}", e);
}; };
} }
async fn listen_minecraft(routing_table: &'static RoutingTable) -> eyre::Result<Infallible> { async fn listen_minecraft(routing_table: &'static RoutingTable) -> anyhow::Result<Infallible> {
let server = tokio::net::TcpListener::bind( let server = tokio::net::TcpListener::bind(
std::env::var("QUICLIME_BIND_ADDR_MC") std::env::var("QUICLIME_BIND_ADDR_MC")
.context("Reading QUICLIME_BIND_ADDR_MC")? .context("Reading QUICLIME_BIND_ADDR_MC")?
@ -345,7 +340,6 @@ async fn listen_minecraft(routing_table: &'static RoutingTable) -> eyre::Result<
tokio::spawn(handle_minecraft(connection, routing_table)); tokio::spawn(handle_minecraft(connection, routing_table));
} }
Err(e) => { Err(e) => {
sentry::capture_error(&e);
error!("Error accepting minecraft connection: {}", e); error!("Error accepting minecraft connection: {}", e);
} }
} }

View file

@ -4,8 +4,9 @@ use std::io::Read;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use async_trait::async_trait;
use log::error;
use thiserror::Error; use thiserror::Error;
use tracing::error;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum ReadError { pub enum ReadError {
@ -118,6 +119,7 @@ async fn read_varint(mut reader: impl AsyncReadExt + Unpin) -> Result<i32, ReadE
impl<T: Read> ReadExt for T {} impl<T: Read> ReadExt for T {}
#[async_trait]
pub trait WriteExt: AsyncWriteExt + Unpin { pub trait WriteExt: AsyncWriteExt + Unpin {
async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> { async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> {
for _ in 0..5 { for _ in 0..5 {
@ -154,10 +156,10 @@ pub enum HandshakeType {
} }
impl Handshake { impl Handshake {
pub fn new(mut packet: &[u8]) -> eyre::Result<Self> { pub fn new(mut packet: &[u8]) -> anyhow::Result<Self> {
let packet_type = packet.read_varint()?; let packet_type = packet.read_varint()?;
if packet_type != 0 { if packet_type != 0 {
Err(eyre::eyre!("Not a Handshake packet")) Err(anyhow::anyhow!("Not a Handshake packet"))
} else { } else {
let protocol_version = packet.read_varint()?; let protocol_version = packet.read_varint()?;
let server_address = packet.read_string()?; let server_address = packet.read_string()?;
@ -165,7 +167,7 @@ impl Handshake {
let next_state = match packet.read_varint()? { let next_state = match packet.read_varint()? {
1 => HandshakeType::Status, 1 => HandshakeType::Status,
2 => HandshakeType::Login, 2 => HandshakeType::Login,
_ => return Err(eyre::eyre!("Invalid next state")), _ => return Err(anyhow::anyhow!("Invalid next state")),
}; };
Ok(Self { Ok(Self {
protocol_version, protocol_version,

View file

@ -1,3 +1,5 @@
use log::info;
use log::warn;
use parking_lot::RwLock; use parking_lot::RwLock;
use quinn::RecvStream; use quinn::RecvStream;
use quinn::SendStream; use quinn::SendStream;
@ -5,8 +7,6 @@ use rand::prelude::*;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::info;
use tracing::warn;
#[derive(Debug)] #[derive(Debug)]
pub enum RouterRequest { pub enum RouterRequest {