From 70bdcef3e784c9a2a930b3704ee224418d54d3b2 Mon Sep 17 00:00:00 2001 From: Skye Date: Mon, 26 May 2025 13:07:12 +0900 Subject: [PATCH] actually commit --- src/main.rs | 78 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/src/main.rs b/src/main.rs index 55d4eb4..2d870b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,6 @@ use clickhouse::{inserter::Inserter, Row}; use eyre::{eyre, Context}; use log::{error, info, warn}; use netty::{Handshake, ReadError}; -use parking_lot::Mutex; use quinn::{ crypto::rustls::QuicServerConfig, rustls::pki_types::{CertificateDer, PrivateKeyDer}, @@ -29,6 +28,7 @@ use time::OffsetDateTime; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, + sync::Mutex, }; use crate::{ @@ -121,7 +121,8 @@ async fn main() -> eyre::Result<()> { tokio::try_join!( listen_quic(endpoint, routing_table), listen_control(endpoint, routing_table), - listen_minecraft(routing_table, inserter) + listen_minecraft(routing_table, inserter.clone()), + send_commits(inserter) )?; Ok(()) } @@ -277,21 +278,23 @@ async fn try_handle_minecraft( match routing_table.route_limited(&address, peer.ip()).await { Ok(val) => val, Err(RoutingError::InvalidDomain) => { - if let Err(e) = inserter.lock().write(&Connection { - established: OffsetDateTime::now_utc(), - region: routing_table.base_domain(), - client: match peer.ip() { - std::net::IpAddr::V4(ipv4_addr) => ipv4_addr.to_ipv6_mapped(), - std::net::IpAddr::V6(ipv6_addr) => ipv6_addr, - }, - intent: match handshake.next_state { - netty::HandshakeType::Status => "status", - netty::HandshakeType::Login => "login", - }, - successful: false, - }) { - error!("Failed to send telemetry: {e:?}"); - } + tokio::task::spawn(async move { + if let Err(e) = inserter.lock().await.write(&Connection { + established: OffsetDateTime::now_utc(), + region: routing_table.base_domain(), + client: match peer.ip() { + std::net::IpAddr::V4(ipv4_addr) => ipv4_addr.to_ipv6_mapped(), + std::net::IpAddr::V6(ipv6_addr) => ipv6_addr, + }, + intent: match handshake.next_state { + netty::HandshakeType::Status => "status", + netty::HandshakeType::Login => "login", + }, + successful: false, + }) { + error!("Failed to send telemetry: {e:?}"); + } + }); return politely_disconnect(connection, handshake).await; } Err(RoutingError::RateLimited) => { @@ -299,21 +302,23 @@ async fn try_handle_minecraft( return impolitely_disconnect(connection, handshake).await; } }; - if let Err(e) = inserter.lock().write(&Connection { - established: OffsetDateTime::now_utc(), - region: routing_table.base_domain(), - client: match peer.ip() { - std::net::IpAddr::V4(ipv4_addr) => ipv4_addr.to_ipv6_mapped(), - std::net::IpAddr::V6(ipv6_addr) => ipv6_addr, - }, - intent: match handshake.next_state { - netty::HandshakeType::Status => "status", - netty::HandshakeType::Login => "login", - }, - successful: true, - }) { - error!("Failed to send telemetry: {e:?}"); - } + tokio::task::spawn(async move { + if let Err(e) = inserter.lock().await.write(&Connection { + established: OffsetDateTime::now_utc(), + region: routing_table.base_domain(), + client: match peer.ip() { + std::net::IpAddr::V4(ipv4_addr) => ipv4_addr.to_ipv6_mapped(), + std::net::IpAddr::V6(ipv6_addr) => ipv6_addr, + }, + intent: match handshake.next_state { + netty::HandshakeType::Status => "status", + netty::HandshakeType::Login => "login", + }, + successful: true, + }) { + error!("Failed to send telemetry: {e:?}"); + } + }); handshake.send(&mut send_host).await?; let (mut recv_client, mut send_client) = connection.split(); tokio::select! { @@ -454,3 +459,12 @@ async fn listen_minecraft( } } } + +async fn send_commits(inserter: Arc>>) -> eyre::Result { + loop { + if let Err(e) = inserter.lock().await.commit().await { + error!("Error committing: {e:?}"); + } + tokio::time::sleep(Duration::from_secs(5)).await; + } +}