first commit

main
Skye 1 year ago
commit 81610da67d
Signed by: me
GPG Key ID: 0104BC05F41B77B8

@ -0,0 +1,3 @@
export BASE_DOMAIN=lc.bs2k.me
export WS_BIND_ADDR=127.0.0.1:8080
export RUST_LOG=info

1
.gitignore vendored

@ -0,0 +1 @@
/target

1266
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -0,0 +1,6 @@
[workspace]
members = [
"server",
"client",
]

@ -0,0 +1,21 @@
[package]
name = "e4mc-client"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.70"
async-trait = "0.1.68"
async-tungstenite = { version = "0.20.0", features = ["tokio-runtime", "tokio-rustls-native-certs"] }
clap = { version = "4.2.1", features = ["derive"] }
env_logger = "0.10.0"
futures = "0.3.28"
futures-util = { version = "0.3.28", features = ["io"] }
lazy_static = "1.4.0"
log = "0.4.17"
serde = { version = "1.0.159", features = ["derive"] }
serde_json = "1.0.95"
thiserror = "1.0.40"
tokio = { version = "1.27.0", features = ["rt-multi-thread", "sync", "macros", "net", "io-util"] }

@ -0,0 +1,186 @@
use std::{collections::HashMap, net::SocketAddr};
use anyhow::anyhow;
use async_tungstenite::tokio::connect_async;
use async_tungstenite::tungstenite::Message;
use futures_util::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use log::{error, info, trace};
use serde::{Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
},
sync::mpsc::{UnboundedReceiver, UnboundedSender},
sync::RwLock,
task,
};
enum ChannelHandlerMessage {
Data(Vec<u8>),
Shutdown,
}
lazy_static! {
static ref CHANNEL_MAP: RwLock<HashMap<u8, UnboundedSender<ChannelHandlerMessage>>> =
RwLock::new(HashMap::new());
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let args = Args::parse();
let server = args.server.unwrap_or("wss://e4mc.skyevg.systems".to_string());
let (ws_conn, _) = connect_async(server).await?;
let (mut send, mut recv) = ws_conn.split();
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let sender = &*Box::leak(Box::new(sender)); // the task::spawn below requires 'static and this is main
tokio::try_join!(
async move {
while let Some(Ok(message)) = recv.next().await {
match message {
Message::Text(message) => {
info!("{}", message);
let message: ClientboundControlMessage = serde_json::from_str(&message)?;
match message {
ClientboundControlMessage::DomainAssigned(domain) => {
println!("Domain assigned: {}", domain);
}
ClientboundControlMessage::ChannelOpen(id, _) => {
match TcpStream::connect(("127.0.0.1", args.port)).await {
Ok(conn) => {
let (send, recv) = tokio::sync::mpsc::unbounded_channel();
CHANNEL_MAP.write().await.insert(id, send);
task::spawn(async move {
if let Err(e) =
handle_channel(id, sender.clone(), conn, recv).await
{
error!("Error handling channel: {}", e);
}
CHANNEL_MAP.write().await.remove(&id);
info!("Sending close request for channel: {}", id);
sender
.send(Message::Text(
serde_json::to_string(
&ServerboundControlMessage::ChannelClosed(
id,
),
)
.unwrap(),
))
.unwrap();
});
}
Err(e) => {
error!("Error creating channel: {}", e);
sender
.send(Message::Text(
serde_json::to_string(
&ServerboundControlMessage::ChannelClosed(
id,
),
)
.unwrap(),
))
.unwrap();
}
};
}
ClientboundControlMessage::ChannelClosed(id) => {
let mut channel_map = CHANNEL_MAP.write().await;
if let Some(send) = channel_map.remove(&id) {
if let Err(e) = send.send(ChannelHandlerMessage::Shutdown) {
error!("Error closing channel: {}", e);
}
}
}
}
}
Message::Binary(buf) => {
trace!("recv: {:?}", buf);
if let Some(send) = CHANNEL_MAP.write().await.get_mut(&buf[0]) {
if let Err(e) = send.send(ChannelHandlerMessage::Data(buf[1..].to_vec())) {
error!("Error transferring data: {}", e);
}
}
}
_ => {}
}
}
Ok(()) as anyhow::Result<()>
},
async move {
while let Some(message) = receiver.recv().await {
send.send(message.clone()).await?;
}
Ok(()) as anyhow::Result<()>
}
)?;
Ok(())
}
async fn handle_channel(
id: u8,
sender: UnboundedSender<Message>,
mut conn: TcpStream,
mut recv: UnboundedReceiver<ChannelHandlerMessage>,
) -> anyhow::Result<()> {
let (mut read, mut write) = conn.split();
tokio::select!(_ = async move {
let mut buf = [0u8; 1024];
loop {
let ready = read.ready(tokio::io::Interest::READABLE).await?;
if ready.is_read_closed() {
return Ok(())
}
let len = read.read(&mut buf).await?;
if len == 0 {
continue;
}
let mut packet = Vec::from(&buf[..len]);
packet.insert(0, id);
trace!("send: {:?}", packet);
sender.send(Message::Binary(packet))?;
}
Ok(()) as anyhow::Result<()>
} => {},
_ = async move {
while let Some(message) = recv.recv().await {
match message {
ChannelHandlerMessage::Data(buf) => write.write_all(&buf).await?,
ChannelHandlerMessage::Shutdown => return Ok(()),
}
}
Ok(()) as anyhow::Result<()>
} => {});
conn.shutdown().await?;
Ok(())
}
use clap::Parser;
/// Simple program to greet a person
#[derive(Parser, Debug)]
struct Args {
/// Port of Minecraft server
#[arg(short, long, default_value_t = 25565)]
port: u16,
/// Address of e4mc-server instance
#[arg(short, long)]
server: Option<String>,
}
#[derive(Deserialize)]
enum ClientboundControlMessage {
DomainAssigned(String),
ChannelOpen(u8, SocketAddr),
ChannelClosed(u8),
}
#[derive(Serialize)]
enum ServerboundControlMessage {
ChannelClosed(u8),
}

@ -0,0 +1,233 @@
use async_std::io::{ReadExt, WriteExt};
use async_trait::async_trait;
use log::{info, error};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum NettyReadError {
#[error("{0}")]
IoError(std::io::Error),
#[error("Was not a netty packet, but a Legacy ServerListPing")]
LegacyServerListPing,
}
impl From<std::io::Error> for NettyReadError {
fn from(value: std::io::Error) -> Self {
Self::IoError(value)
}
}
impl From<std::io::ErrorKind> for NettyReadError {
fn from(value: std::io::ErrorKind) -> Self {
Self::IoError(value.into())
}
}
#[async_trait]
pub trait ReadExtNetty: ReadExt + Unpin {
async fn read_bool(&mut self) -> Result<bool, NettyReadError> {
let mut buf = [0u8];
self.read_exact(&mut buf).await?;
match buf[0] {
0 => Ok(false),
1 => Ok(true),
_ => Err(std::io::ErrorKind::InvalidData.into()),
}
}
async fn read_byte(&mut self) -> Result<i8, NettyReadError> {
let mut buf = [0u8];
self.read_exact(&mut buf).await?;
Ok(i8::from_be_bytes(buf))
}
async fn read_unsigned_byte(&mut self) -> Result<u8, NettyReadError> {
let mut buf = [0u8];
self.read_exact(&mut buf).await?;
Ok(buf[0])
}
async fn read_short(&mut self) -> Result<i16, NettyReadError> {
let mut buf = [0u8; 2];
self.read_exact(&mut buf).await?;
Ok(i16::from_be_bytes(buf))
}
async fn read_unsigned_short(&mut self) -> Result<u16, NettyReadError> {
let mut buf = [0u8; 2];
self.read_exact(&mut buf).await?;
Ok(u16::from_be_bytes(buf))
}
async fn read_int(&mut self) -> Result<i32, NettyReadError> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf).await?;
Ok(i32::from_be_bytes(buf))
}
async fn read_long(&mut self) -> Result<i64, NettyReadError> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf).await?;
Ok(i64::from_be_bytes(buf))
}
async fn read_float(&mut self) -> Result<f32, NettyReadError> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf).await?;
Ok(f32::from_be_bytes(buf))
}
async fn read_double(&mut self) -> Result<f64, NettyReadError> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf).await?;
Ok(f64::from_be_bytes(buf))
}
async fn read_string(&mut self) -> Result<String, NettyReadError> {
let len = self.read_varint().await?;
let mut buf = vec![0u8; len as usize];
self.read_exact(&mut buf).await?;
String::from_utf8(buf).map_err(|_| std::io::ErrorKind::InvalidData.into())
}
async fn read_varint(&mut self) -> Result<i32, NettyReadError> {
let mut res = 0i32;
for i in 0..5 {
let part = self.read_unsigned_byte().await?;
res |= (part as i32 & 0x7F) << (7 * i);
if part & 0x80 == 0 {
return Ok(res);
}
}
error!("Varint is invalid");
Err(std::io::ErrorKind::InvalidData.into())
}
// async fn read_varint(&mut self) -> Result<i32, NettyReadError> {
// let mut value = 0i32;
// let mut buf = [0u8; 1];
// let mut pos = 0u8;
// loop {
// self.read_exact(&mut buf).await?;
// println!("{}", buf[0]);
// value |= ((buf[0] & 0b01111111) << pos) as i32;
// if (buf[0] & 0b10000000) == 0 {
// break;
// };
// pos += 7;
// if pos >= 32 {
// return Err(std::io::ErrorKind::InvalidData.into());
// };
// }
// Ok(value)
// }
async fn read_varlong(&mut self) -> Result<i64, NettyReadError> {
let mut value = 0i64;
let mut buf = [0u8; 1];
let mut position = 0u8;
loop {
self.read_exact(&mut buf).await?;
value |= ((buf[0] & 0b01111111) << position) as i64;
if (buf[0] & 0b10000000) == 0 {
break;
};
position += 7;
if position >= 64 {
return Err(std::io::ErrorKind::InvalidData.into());
};
}
Ok(value)
}
async fn read_vec3(&mut self) -> Result<(i32, i16, i32), NettyReadError> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf).await?;
let packed = u64::from_be_bytes(buf);
let x: i32 = ((packed & 0xffffffc000000000) >> 38) as _;
let y: i16 = (packed & 0xfff) as _;
let z: i32 = ((packed & 0x3ffffff000) >> 12) as _;
Ok((
match x {
i32::MIN..=0x1ffffff => x,
0x2000000.. => x - 0x2000000,
},
match y {
i16::MIN..=0x7ff => y,
0x800.. => y - 0x800,
},
match z {
i32::MIN..=0x1ffffff => z,
0x2000000.. => z - 0x2000000,
},
))
}
async fn read_uuid(&mut self) -> Result<u128, NettyReadError> {
let mut buf = [0u8; 16];
self.read_exact(&mut buf).await?;
Ok(u128::from_be_bytes(buf))
}
async fn read_packet(&mut self) -> Result<Vec<u8>, NettyReadError> {
let len = self.read_varint().await?;
let mut buf = vec![0u8; len as usize];
if len == 254 {
let mut temp = [0u8];
self.read_exact(&mut temp).await?;
if temp[0] == 0xFA {
// FE 01 FA: Legacy ServerListPing
return Err(NettyReadError::LegacyServerListPing);
}
buf[0] = temp[0];
self.read_exact(&mut buf[1..]).await?;
} else {
self.read_exact(&mut buf).await?;
}
Ok(buf)
}
// fn read_packet_compressed(&mut self) -> Result<Vec<u8>, NettyReadError> {
// let len = self.read_varint()?;
// let len_decompressed = self.read_varint()?;
// let mut buf = vec![0u8; len as usize];
// self.read_exact(&mut buf)?;
// if len_decompressed == 0 {
// return Ok(buf);
// }
// let mut buf_decompressed = vec![0u8; len_decompressed as usize];
// if flate2::Decompress::new(true)
// .decompress(&buf, &mut buf_decompressed, flate2::FlushDecompress::Finish)
// .is_err()
// {
// return Err(std::io::ErrorKind::InvalidData.into());
// };
// Ok(buf_decompressed)
// }
}
impl<T: ReadExt + Unpin> ReadExtNetty for T {}
#[async_trait]
pub trait WriteExtNetty: WriteExt + Unpin {
async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> {
for _ in 0..5 {
if val & !0x7F == 0 {
self.write_all(&[val as u8]).await?;
return Ok(());
}
self.write_all(&[(val & 0x7F | 0x80) as u8]).await?;
val >>= 7;
}
Err(std::io::ErrorKind::InvalidData.into())
}
async fn write_string(&mut self, s: &str) -> std::io::Result<()> {
self.write_varint(s.len() as i32).await?;
self.write_all(s.as_bytes()).await?;
Ok(())
}
}
impl<T: WriteExt + Unpin> WriteExtNetty for T {}

@ -0,0 +1,77 @@
{
"nodes": {
"naersk": {
"inputs": {
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1679567394,
"narHash": "sha256-ZvLuzPeARDLiQUt6zSZFGOs+HZmE+3g4QURc8mkBsfM=",
"owner": "nix-community",
"repo": "naersk",
"rev": "88cd22380154a2c36799fe8098888f0f59861a15",
"type": "github"
},
"original": {
"owner": "nix-community",
"ref": "master",
"repo": "naersk",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1680273054,
"narHash": "sha256-Bs6/5LpvYp379qVqGt9mXxxx9GSE789k3oFc+OAL07M=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "3364b5b117f65fe1ce65a3cdd5612a078a3b31e3",
"type": "github"
},
"original": {
"id": "nixpkgs",
"type": "indirect"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1680273054,
"narHash": "sha256-Bs6/5LpvYp379qVqGt9mXxxx9GSE789k3oFc+OAL07M=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "3364b5b117f65fe1ce65a3cdd5612a078a3b31e3",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"naersk": "naersk",
"nixpkgs": "nixpkgs_2",
"utils": "utils"
}
},
"utils": {
"locked": {
"lastModified": 1678901627,
"narHash": "sha256-U02riOqrKKzwjsxc/400XnElV+UtPUQWpANPlyazjH0=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "93a2b84fc4b70d9e089d029deacc3583435c2ed6",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

@ -0,0 +1,36 @@
{
inputs = {
naersk.url = "github:nix-community/naersk/master";
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
utils.url = "github:numtide/flake-utils";
};
outputs = { self, nixpkgs, utils, naersk }:
utils.lib.eachDefaultSystem (system:
let
pkgs = import nixpkgs { inherit system; };
naersk-lib = pkgs.callPackage naersk { };
in
rec {
packages.e4mc-server = naersk-lib.buildPackage {
pname = "e4mc-server";
src = ./.;
};
packages.e4mc-client = naersk-lib.buildPackage {
pname = "e4mc-client";
src = ./.;
};
dockerImage = pkgs.dockerTools.buildImage {
name = "e4mc-server";
tag = "latest";
config = {
Cmd = [ "${packages.e4mc-server}/bin/e4mc-server" ];
};
};
devShell = with pkgs; mkShell {
buildInputs = [ cargo rustc rustfmt pre-commit rustPackages.clippy ];
RUST_SRC_PATH = rustPlatform.rustLibSrc;
};
});
}

@ -0,0 +1,20 @@
[package]
name = "e4mc-server"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.70"
async-trait = "0.1.68"
async-tungstenite = { version = "0.20.0", features = ["tokio-runtime", "tokio-rustls-native-certs"] }
env_logger = "0.10.0"
futures = "0.3.28"
lazy_static = "1.4.0"
log = "0.4.17"
nanoid = "0.4.0"
serde = { version = "1.0.159", features = ["derive"] }
serde_json = "1.0.95"
thiserror = "1.0.40"
tokio = { version = "1.27.0", features = ["rt-multi-thread", "sync", "macros", "net", "io-util"] }

@ -0,0 +1,3 @@
{
"text": "Unknown server. Check address and try again."
}

@ -0,0 +1,392 @@
use std::collections::{HashMap};
use std::env;
use std::net::SocketAddr;
use anyhow::{anyhow, Result};
use futures::{SinkExt, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::RwLock;
use tokio::task;
use async_tungstenite::tungstenite::Message;
use lazy_static::lazy_static;
use log::{error, info};
use nanoid::nanoid;
use netty::ReadExtNetty;
use serde::{Deserialize, Serialize};
use crate::netty::{NettyReadError, WriteExtNetty};
mod netty;
#[derive(Debug, Clone)]
struct Handshake {
protocol_version: i32,
server_address: String,
server_port: u16,
next_state: HandshakeType,
}
#[derive(Debug, Clone)]
#[repr(i32)]
enum HandshakeType {
Status = 1,
Login = 2,
}
impl Handshake {
async fn new(mut packet: &[u8]) -> Result<Self> {
let packet_type = packet.read_varint().await?;
if packet_type != 0 {
Err(anyhow!("Not a Handshake packet"))
} else {
let protocol_version = packet.read_varint().await?;
let server_address = packet.read_string().await?;
let server_port = packet.read_unsigned_short().await?;
let next_state = match packet.read_varint().await? {
1 => HandshakeType::Status,
2 => HandshakeType::Login,
_ => return Err(anyhow!("Invalid next state")),
};
Ok(Self {
protocol_version,
server_address,
server_port,
next_state,
})
}
}
}
lazy_static! {
static ref EXPOSER_MAP: RwLock<HashMap<String, UnboundedSender<(Handshake, TcpStream, SocketAddr)>>> =
RwLock::new(HashMap::new());
static ref BASE_DOMAIN: String = env::var("BASE_DOMAIN").expect("BASE_DOMAIN missing");
}
#[tokio::main]
async fn main() -> Result<()> {
let _ = env_logger::try_init();
let ws_bind_addr = env::var("WS_BIND_ADDR").unwrap_or_else(|_| "127.0.0.1:80".to_string());
let mc_bind_addr = env::var("MC_BIND_ADDR").unwrap_or_else(|_| "0.0.0.0:25565".to_string());
futures::try_join!(
async {
let listener = TcpListener::bind(&ws_bind_addr).await?;
info!("WebSocket Listening on: {}", ws_bind_addr);
while let Ok((stream, _)) = listener.accept().await {
task::spawn(async {
if let Err(e) = accept_ws_connection(stream).await {
error!("Error handling WebSocket connection: {}", e);
}
});
}
Ok(()) as Result<()>
},
async {
let listener = TcpListener::bind(&mc_bind_addr).await?;
info!("Minecraft Listening on: {}", mc_bind_addr);
while let Ok((stream, _)) = listener.accept().await {
task::spawn(async {
if let Err(e) = accept_mc_connection(stream).await {
error!("Error handling Minecraft connection: {}", e);
}
});
}
Ok(()) as Result<()>
}
)?;
Ok(())
}
#[derive(Serialize)]
enum ClientboundControlMessage {
DomainAssigned(String),
ChannelOpen(u8, SocketAddr),
ChannelClosed(u8),
}
#[derive(Deserialize)]
enum ServerboundControlMessage {
ChannelClosed(u8),
}
struct ExposerMapHandle(String, UnboundedReceiver<(Handshake, TcpStream, SocketAddr)>);
impl ExposerMapHandle {
async fn new(domain: String) -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
EXPOSER_MAP.write().await.insert(domain.clone(), sender);
Self(domain, receiver)
}
async fn recv(&mut self) -> Option<(Handshake, TcpStream, SocketAddr)> {
self.1.recv().await
}
}
impl Drop for ExposerMapHandle {
fn drop(&mut self) {
let domain = self.0.clone();
task::spawn(async move {
EXPOSER_MAP.write().await.remove(&domain);
});
}
}
struct ChannelHandle(
u8,
UnboundedReceiver<MinecraftConnectionMessage>,
UnboundedSender<WebsocketConnectionMessage>,
);
impl ChannelHandle {
async fn new(
id: u8,
addr: SocketAddr,
send: UnboundedSender<WebsocketConnectionMessage>,
) -> Result<Self> {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
send.send(WebsocketConnectionMessage::Open(id, addr, sender))
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(Self(id, receiver, send))
}
async fn recv(&mut self) -> Option<MinecraftConnectionMessage> {
self.1.recv().await
}
}
impl Drop for ChannelHandle {
fn drop(&mut self) {
self.2.send(WebsocketConnectionMessage::Close(self.0)).map_err(|e| anyhow::anyhow!("{}", e)).unwrap();
}
}
enum MinecraftConnectionMessage {
Data(Vec<u8>),
Close,
}
enum WebsocketConnectionMessage {
Data(Vec<u8>),
Close(u8),
Open(u8, SocketAddr, UnboundedSender<MinecraftConnectionMessage>),
}
async fn accept_ws_connection(stream: TcpStream) -> Result<()> {
let addr = stream.peer_addr()?;
info!("WebSocker Peer address: {}", addr);
let mut ws_stream = async_tungstenite::tokio::accept_async(stream).await?;
info!("New WebSocket connection: {}", addr);
let mut domain = get_random_domain();
let exposer_map = EXPOSER_MAP.read().await;
while exposer_map.contains_key(&domain) {
domain = get_random_domain();
}
let domain = domain;
info!("Connection {} was assigned domain {}", addr, domain);
ws_stream
.send(Message::Text(serde_json::to_string(
&ClientboundControlMessage::DomainAssigned(domain.clone()),
)?))
.await?;
drop(exposer_map);
let mut exposer_handle = ExposerMapHandle::new(domain.clone()).await;
let channel_map = &RwLock::new(HashMap::new());
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let (mut send, mut recv) = ws_stream.split();
futures::try_join!(
async move {
while let Some(message) = receiver.recv().await {
match message {
WebsocketConnectionMessage::Data(buf) => {
send.send(Message::Binary(buf)).await?
}
WebsocketConnectionMessage::Close(id) => {
send.send(Message::Text(serde_json::to_string(
&ClientboundControlMessage::ChannelClosed(id),
)?))
.await?;
channel_map.write().await.remove(&id);
}
WebsocketConnectionMessage::Open(id, addr, channel) => {
send.send(Message::Text(serde_json::to_string(
&ClientboundControlMessage::ChannelOpen(id, addr),
)?))
.await?;
channel_map.write().await.insert(id, channel);
}
}
}
Ok(()) as Result<()>
},
async move {
while let Some(Ok(message)) = recv.next().await {
match message {
Message::Text(message) => {
let message: ServerboundControlMessage = serde_json::from_str(&message)?;
match message {
ServerboundControlMessage::ChannelClosed(id) => {
if let Some(channel) = channel_map.write().await.remove(&id) {
info!("Closing channel id: {}", id);
channel.send(MinecraftConnectionMessage::Close).map_err(|e| anyhow::anyhow!("{}", e))?;
}
}
}
}
Message::Binary(buf) => {
if let Some(channel) = channel_map.read().await.get(&buf[0]) {
channel
.send(MinecraftConnectionMessage::Data(buf[1..].to_vec())).map_err(|e| anyhow::anyhow!("{}", e))?;
}
}
_ => {}
}
}
Ok(()) as Result<()>
},
async move {
while let Some((handshake, mc_stream, addr)) = exposer_handle.recv().await {
if let Some(channel_id) =
get_available_channel(channel_map.read().await.keys().copied().collect())
{
task::spawn(handle_sent_connection(
handshake,
mc_stream,
addr,
sender.clone(),
channel_id,
));
}
}
Ok(()) as Result<()>
}
)?;
Ok(())
}
async fn handle_sent_connection(
handshake: Handshake,
mut mc_stream: TcpStream,
addr: SocketAddr,
send: UnboundedSender<WebsocketConnectionMessage>,
channel_id: u8,
) -> Result<()> {
let mut handle = ChannelHandle::new(channel_id, addr, send.clone()).await?;
let mut buf = vec![];
buf.write_varint(0).await?;
buf.write_varint(handshake.protocol_version).await?;
buf.write_string(&handshake.server_address).await?;
buf.write_all(&handshake.server_port.to_be_bytes()).await?;
buf.write_varint(handshake.next_state as i32).await?;
let mut len_buf = vec![channel_id];
len_buf.write_varint(buf.len() as i32).await?;
len_buf.append(&mut buf);
send.send(WebsocketConnectionMessage::Data(len_buf)).map_err(|e| anyhow::anyhow!("{}", e))?;
let (mut read, mut write) = mc_stream.split();
tokio::select!(_ =
#[allow(unreachable_code)]
async move {
let mut buf = [0u8; 1024];
loop {
read.readable().await?;
let len = read.read(&mut buf).await?;
if len == 0 {
continue;
}
let mut packet = Vec::from(&buf[..len]);
packet.insert(0, channel_id);
send.send(WebsocketConnectionMessage::Data(packet)).map_err(|e| anyhow::anyhow!("{}", e))?;
}
Ok(()) as anyhow::Result<()>
} => {},
_ = async move {
while let Some(message) = handle.recv().await {
match message {
MinecraftConnectionMessage::Data(buf) => {
write.write_all(&buf).await?;
}
MinecraftConnectionMessage::Close => {
return Ok(())
}
}
}
Ok(()) as anyhow::Result<()>
} => {}
);
mc_stream.shutdown().await?;
Ok(())
}
async fn accept_mc_connection(mut stream: TcpStream) -> Result<()> {
let addr = stream.peer_addr()?;
info!("New Minecraft connection: {}", addr);
let packet = stream.read_packet().await;
if let Err(NettyReadError::LegacyServerListPing) = packet {
stream
.write_all(include_bytes!("./legacy_serverlistping_response.bin"))
.await?;
return Ok(());
}
let packet = packet?;
let handshake = Handshake::new(&packet).await?;
if let Some(sender) = EXPOSER_MAP.read().await.get(&handshake.server_address) {
sender.send((handshake, stream, addr)).map_err(|e| anyhow::anyhow!("{}", e))?;
} else {
match handshake.next_state {
HandshakeType::Status => {
let mut buf = vec![];
buf.write_varint(0).await?;
buf.write_string(include_str!("./serverlistping_response.json"))
.await?;
stream.write_varint(buf.len() as i32).await?;
stream.write_all(&buf).await?;
}
HandshakeType::Login => {
let _ = stream.read_packet().await?;
let mut buf = vec![];
buf.write_varint(0).await?;
buf.write_string(include_str!("./disconnect_response.json"))
.await?;
stream.write_varint(buf.len() as i32).await?;
stream.write_all(&buf).await?;
}
}
}
Ok(())
}
const ID_ALPHABET: [char; 36] = [
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
];
const ID_LENGTH: usize = 8;
fn get_random_domain() -> String {
format!(
"{}.{}",
nanoid!(ID_LENGTH, &ID_ALPHABET),
BASE_DOMAIN.as_str()
)
}
fn get_available_channel(used: Vec<u8>) -> Option<u8> {
(0u8..=255).find(|&i| !used.contains(&i))
}

@ -0,0 +1,234 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use async_trait::async_trait;
use log::{error};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum NettyReadError {
#[error("{0}")]
IoError(std::io::Error),
#[error("Was not a netty packet, but a Legacy ServerListPing")]
LegacyServerListPing,
}
impl From<std::io::Error> for NettyReadError {
fn from(value: std::io::Error) -> Self {
Self::IoError(value)
}
}
impl From<std::io::ErrorKind> for NettyReadError {
fn from(value: std::io::ErrorKind) -> Self {
Self::IoError(value.into())
}
}
#[async_trait]
pub trait ReadExtNetty: AsyncReadExt + Unpin {
async fn read_bool(&mut self) -> Result<bool, NettyReadError> {
let mut buf = [0u8];
self.read_exact(&mut buf).await?;
match buf[0] {
0 => Ok(false),
1 => Ok(true),
_ => Err(std::io::ErrorKind::InvalidData.into()),
}
}
async fn read_byte(&mut self) -> Result<i8, NettyReadError> {
let mut buf = [0u8];
self.read_exact(&mut buf).await?;
Ok(i8::from_be_bytes(buf))
}
async fn read_unsigned_byte(&mut self) -> Result<u8, NettyReadError> {
let mut buf = [0u8];
self.read_exact(&mut buf).await?;
Ok(buf[0])
}
async fn read_short(&mut self) -> Result<i16, NettyReadError> {
let mut buf = [0u8; 2];
self.read_exact(&mut buf).await?;
Ok(i16::from_be_bytes(buf))
}
async fn read_unsigned_short(&mut self) -> Result<u16, NettyReadError> {
let mut buf = [0u8; 2];
self.read_exact(&mut buf).await?;
Ok(u16::from_be_bytes(buf))
}
async fn read_int(&mut self) -> Result<i32, NettyReadError> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf).await?;
Ok(i32::from_be_bytes(buf))
}
async fn read_long(&mut self) -> Result<i64, NettyReadError> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf).await?;
Ok(i64::from_be_bytes(buf))
}
async fn read_float(&mut self) -> Result<f32, NettyReadError> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf).await?;
Ok(f32::from_be_bytes(buf))
}
async fn read_double(&mut self) -> Result<f64, NettyReadError> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf).await?;
Ok(f64::from_be_bytes(buf))
}
async fn read_string(&mut self) -> Result<String, NettyReadError> {
let len = self.read_varint().await?;
let mut buf = vec![0u8; len as usize];
self.read_exact(&mut buf).await?;
String::from_utf8(buf).map_err(|_| std::io::ErrorKind::InvalidData.into())
}
async fn read_varint(&mut self) -> Result<i32, NettyReadError> {
let mut res = 0i32;
for i in 0..5 {
let part = self.read_unsigned_byte().await?;
res |= (part as i32 & 0x7F) << (7 * i);
if part & 0x80 == 0 {
return Ok(res);
}
}
error!("Varint is invalid");
Err(std::io::ErrorKind::InvalidData.into())
}
// async fn read_varint(&mut self) -> Result<i32, NettyReadError> {
// let mut value = 0i32;
// let mut buf = [0u8; 1];
// let mut pos = 0u8;
// loop {
// self.read_exact(&mut buf).await?;
// println!("{}", buf[0]);
// value |= ((buf[0] & 0b01111111) << pos) as i32;
// if (buf[0] & 0b10000000) == 0 {
// break;
// };
// pos += 7;
// if pos >= 32 {
// return Err(std::io::ErrorKind::InvalidData.into());
// };
// }
// Ok(value)
// }
async fn read_varlong(&mut self) -> Result<i64, NettyReadError> {
let mut value = 0i64;
let mut buf = [0u8; 1];
let mut position = 0u8;
loop {
self.read_exact(&mut buf).await?;
value |= ((buf[0] & 0b01111111) << position) as i64;
if (buf[0] & 0b10000000) == 0 {
break;
};
position += 7;
if position >= 64 {
return Err(std::io::ErrorKind::InvalidData.into());
};
}
Ok(value)
}
async fn read_vec3(&mut self) -> Result<(i32, i16, i32), NettyReadError> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf).await?;
let packed = u64::from_be_bytes(buf);
let x: i32 = ((packed & 0xffffffc000000000) >> 38) as _;
let y: i16 = (packed & 0xfff) as _;
let z: i32 = ((packed & 0x3ffffff000) >> 12) as _;
Ok((
match x {
i32::MIN..=0x1ffffff => x,
0x2000000.. => x - 0x2000000,
},
match y {
i16::MIN..=0x7ff => y,
0x800.. => y - 0x800,
},
match z {
i32::MIN..=0x1ffffff => z,
0x2000000.. => z - 0x2000000,
},
))
}
async fn read_uuid(&mut self) -> Result<u128, NettyReadError> {
let mut buf = [0u8; 16];
self.read_exact(&mut buf).await?;
Ok(u128::from_be_bytes(buf))
}
async fn read_packet(&mut self) -> Result<Vec<u8>, NettyReadError> {
let len = self.read_varint().await?;
let mut buf = vec![0u8; len as usize];
if len == 254 {
let mut temp = [0u8];
self.read_exact(&mut temp).await?;
if temp[0] == 0xFA {
// FE 01 FA: Legacy ServerListPing
return Err(NettyReadError::LegacyServerListPing);
}
buf[0] = temp[0];
self.read_exact(&mut buf[1..]).await?;
} else {
self.read_exact(&mut buf).await?;
}
Ok(buf)
}
// fn read_packet_compressed(&mut self) -> Result<Vec<u8>, NettyReadError> {
// let len = self.read_varint()?;
// let len_decompressed = self.read_varint()?;
// let mut buf = vec![0u8; len as usize];
// self.read_exact(&mut buf)?;
// if len_decompressed == 0 {
// return Ok(buf);
// }
// let mut buf_decompressed = vec![0u8; len_decompressed as usize];
// if flate2::Decompress::new(true)
// .decompress(&buf, &mut buf_decompressed, flate2::FlushDecompress::Finish)
// .is_err()
// {
// return Err(std::io::ErrorKind::InvalidData.into());
// };
// Ok(buf_decompressed)
// }
}
impl<T: AsyncReadExt + Unpin> ReadExtNetty for T {}
#[async_trait]
pub trait WriteExtNetty: AsyncWriteExt + Unpin {
async fn write_varint(&mut self, mut val: i32) -> std::io::Result<()> {
for _ in 0..5 {
if val & !0x7F == 0 {
self.write_all(&[val as u8]).await?;
return Ok(());
}
self.write_all(&[(val & 0x7F | 0x80) as u8]).await?;
val >>= 7;
}
Err(std::io::ErrorKind::InvalidData.into())
}
async fn write_string(&mut self, s: &str) -> std::io::Result<()> {
self.write_varint(s.len() as i32).await?;
self.write_all(s.as_bytes()).await?;
Ok(())
}
}
impl<T: AsyncWriteExt + Unpin> WriteExtNetty for T {}

@ -0,0 +1,13 @@
{
"version": {
"name": "e4mc",
"protocol": -1
},
"players": {
"max": 0,
"online": 0
},
"description": {
"text": "Unknown server. Check address and try again."
}
}
Loading…
Cancel
Save