Server

examples/server.py
import enum
import io
import json
import logging
import signal

import anyio
import anyio.abc

from cubes import net
from cubes.net import serializers

_VERSION = "1.20.5-1.20.6"
_PROTOCOL = 766
_SERVER_DESCRIPTION = "Example server"


class ConnectionState(enum.IntEnum):
    HANDSHAKE = 0
    STATUS = 1
    LOGIN = 2
    TRANSFER = 3
    CONFIGURATION = 4
    PLAY = 5


CONNECTION_STATES: dict[net.Connection, ConnectionState] = {}


async def process_handshake(conn: net.Connection, packet: io.BytesIO):
    protocol = serializers.VarIntSerializer.from_buffer(packet)
    serializers.StringSerializer.from_buffer(packet)  # host
    serializers.UnsignedShortSerializer.from_buffer(packet)  # port
    CONNECTION_STATES[conn] = intention = ConnectionState(
        serializers.VarIntSerializer.from_buffer(packet)
    )
    if (
        intention in (ConnectionState.LOGIN, ConnectionState.TRANSFER)
        and protocol != _PROTOCOL
    ):
        disconnect_packet = io.BytesIO()
        serializers.VarIntSerializer(0).to_buffer(disconnect_packet)
        serializers.StringSerializer(
            json.dumps(
                {
                    "translate": "disconnect.genericReason",
                    "with": [{"text": f'Unsupported protocol version "{protocol}".'}],
                }
            )
        ).to_buffer(disconnect_packet)
        await conn.send(disconnect_packet)
        await conn.close()


async def process_legacy_ping(conn: net.Connection):
    await conn.close()


async def process_status(conn: net.Connection):
    response = io.BytesIO()
    serializers.VarIntSerializer(0).to_buffer(response)
    serializers.StringSerializer(
        json.dumps(
            {
                "version": {"name": _VERSION, "protocol": _PROTOCOL},
                "players": {"max": 0, "online": 0},
                "description": {"text": _SERVER_DESCRIPTION},
            }
        )
    ).to_buffer(response)
    await conn.send(response)


async def process_status_ping(conn: net.Connection, packet: io.BytesIO):
    packet.seek(0)
    await conn.send(packet)
    await conn.close()


async def process_packet(conn: net.Connection, packet: io.BytesIO):
    state = CONNECTION_STATES[conn]
    packet_id = serializers.VarIntSerializer.from_buffer(packet)
    match (state, packet_id):
        case (ConnectionState.HANDSHAKE, 0x00):
            await process_handshake(conn, packet)
        case (ConnectionState.HANDSHAKE, 0xFE):
            await process_legacy_ping(conn)
        case (ConnectionState.STATUS, 0x00):
            await process_status(conn)
        case (ConnectionState.STATUS, 0x01):
            await process_status_ping(conn, packet)
        case _:
            pass


async def process_new_connection(conn: net.Connection):
    logging.info('"%s:%i" connected to server.', *conn.remote_address)
    CONNECTION_STATES[conn] = ConnectionState.HANDSHAKE


async def process_packet_receive_timeout(conn: net.Connection):
    if CONNECTION_STATES[conn] == ConnectionState.LOGIN:
        packet = io.BytesIO()
        serializers.VarIntSerializer(0x00).to_buffer(packet)
        serializers.StringSerializer(
            json.dumps({"translate": "disconnect.timeout"})
        ).to_buffer(packet)
        await conn.send(packet)
    await conn.close()


async def process_close_connection(conn: net.Connection, reason: Exception | None):
    logging.info(
        '"%s:%i" disconnected from server. Reason: %s.',
        *conn.remote_address,
        repr(reason),
    )
    del CONNECTION_STATES[conn]


async def sygnal_handler(scope: anyio.CancelScope):
    with anyio.open_signal_receiver(signal.SIGINT, signal.SIGTERM) as signals:
        async for _ in signals:
            scope.cancel()


async def main():
    server = net.Server(
        process_new_connection,
        process_packet_receive_timeout,
        process_packet,
        process_close_connection,
        packet_receive_timeout=5,
    )
    async with anyio.create_task_group() as task_group:
        task_group.start_soon(sygnal_handler, task_group.cancel_scope)
        task_group.start_soon(server.run, "127.0.0.1", 25560)


if __name__ == "__main__":
    logging.basicConfig(level="DEBUG")
    # you shoud use anyio to run the server
    # you can use trio or asyncio as backend
    anyio.run(main, backend="trio")