actually commit

This commit is contained in:
Skye 2025-05-26 13:07:12 +09:00
parent b6e4213dd0
commit 70bdcef3e7

View file

@ -17,7 +17,6 @@ use clickhouse::{inserter::Inserter, Row};
use eyre::{eyre, Context}; use eyre::{eyre, Context};
use log::{error, info, warn}; use log::{error, info, warn};
use netty::{Handshake, ReadError}; use netty::{Handshake, ReadError};
use parking_lot::Mutex;
use quinn::{ use quinn::{
crypto::rustls::QuicServerConfig, crypto::rustls::QuicServerConfig,
rustls::pki_types::{CertificateDer, PrivateKeyDer}, rustls::pki_types::{CertificateDer, PrivateKeyDer},
@ -29,6 +28,7 @@ use time::OffsetDateTime;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream, net::TcpStream,
sync::Mutex,
}; };
use crate::{ use crate::{
@ -121,7 +121,8 @@ async fn main() -> eyre::Result<()> {
tokio::try_join!( tokio::try_join!(
listen_quic(endpoint, routing_table), listen_quic(endpoint, routing_table),
listen_control(endpoint, routing_table), listen_control(endpoint, routing_table),
listen_minecraft(routing_table, inserter) listen_minecraft(routing_table, inserter.clone()),
send_commits(inserter)
)?; )?;
Ok(()) Ok(())
} }
@ -277,7 +278,8 @@ async fn try_handle_minecraft(
match routing_table.route_limited(&address, peer.ip()).await { match routing_table.route_limited(&address, peer.ip()).await {
Ok(val) => val, Ok(val) => val,
Err(RoutingError::InvalidDomain) => { Err(RoutingError::InvalidDomain) => {
if let Err(e) = inserter.lock().write(&Connection { tokio::task::spawn(async move {
if let Err(e) = inserter.lock().await.write(&Connection {
established: OffsetDateTime::now_utc(), established: OffsetDateTime::now_utc(),
region: routing_table.base_domain(), region: routing_table.base_domain(),
client: match peer.ip() { client: match peer.ip() {
@ -292,6 +294,7 @@ async fn try_handle_minecraft(
}) { }) {
error!("Failed to send telemetry: {e:?}"); error!("Failed to send telemetry: {e:?}");
} }
});
return politely_disconnect(connection, handshake).await; return politely_disconnect(connection, handshake).await;
} }
Err(RoutingError::RateLimited) => { Err(RoutingError::RateLimited) => {
@ -299,7 +302,8 @@ async fn try_handle_minecraft(
return impolitely_disconnect(connection, handshake).await; return impolitely_disconnect(connection, handshake).await;
} }
}; };
if let Err(e) = inserter.lock().write(&Connection { tokio::task::spawn(async move {
if let Err(e) = inserter.lock().await.write(&Connection {
established: OffsetDateTime::now_utc(), established: OffsetDateTime::now_utc(),
region: routing_table.base_domain(), region: routing_table.base_domain(),
client: match peer.ip() { client: match peer.ip() {
@ -314,6 +318,7 @@ async fn try_handle_minecraft(
}) { }) {
error!("Failed to send telemetry: {e:?}"); error!("Failed to send telemetry: {e:?}");
} }
});
handshake.send(&mut send_host).await?; handshake.send(&mut send_host).await?;
let (mut recv_client, mut send_client) = connection.split(); let (mut recv_client, mut send_client) = connection.split();
tokio::select! { tokio::select! {
@ -454,3 +459,12 @@ async fn listen_minecraft(
} }
} }
} }
async fn send_commits(inserter: Arc<Mutex<Inserter<Connection>>>) -> eyre::Result<Infallible> {
loop {
if let Err(e) = inserter.lock().await.commit().await {
error!("Error committing: {e:?}");
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}