restore state on reload/new join

This commit is contained in:
sparshg
2024-09-22 04:41:53 +05:30
parent b994cd7439
commit 44e72d77f2
7 changed files with 333 additions and 48 deletions

View File

@@ -4,8 +4,13 @@ use axum::Router;
use board::Board;
use dotenv::dotenv;
use futures_util::stream::StreamExt;
use game::{add_board, add_room, attack, disconnect, join_room, start, ROOM_CODE_LENGTH};
use game::{
add_board, add_room, attack, delete_sid, get_game_state, get_room, join_room,
room_if_player_exists, start, to_delete_sid, update_sid, Error, ROOM_CODE_LENGTH,
};
use rand::Rng;
use serde::Deserialize;
use socketioxide::{
extract::{Data, SocketRef, State},
SocketIo,
@@ -26,6 +31,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = std::env::var("DATABASE_URL")?;
let pool = sqlx::postgres::PgPool::connect(&url).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
sqlx::query("DELETE FROM players").execute(&pool).await?;
sqlx::query("DELETE FROM abandoned_players")
.execute(&pool)
.await?;
sqlx::query("DELETE FROM rooms").execute(&pool).await?;
let (layer, io) = SocketIo::builder().with_state(pool).build_layer();
io.ns("/", on_connect);
@@ -40,8 +50,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
fn on_connect(socket: SocketRef) {
#[derive(Debug, Deserialize)]
struct AuthPayload {
pub session: Option<String>,
}
async fn on_connect(socket: SocketRef, Data(auth): Data<AuthPayload>, pool: State<PgPool>) {
tracing::info!("Connected: {:?}", socket.id);
tracing::info!("Connected: {:?}", auth.session);
if let Some(sid) = auth.session {
update_sid(&sid, socket.id.as_str(), &pool).await.unwrap();
let sid = socket.id.as_str();
if let Some(room) = room_if_player_exists(&sid, &pool).await.unwrap() {
let data = get_game_state(&sid, &room, &pool).await.unwrap();
socket
.emit(
"restore",
serde_json::json!({"turn": data.0, "player": data.1, "opponent": data.2}),
)
.unwrap();
socket.join(room.clone()).unwrap();
emit_update_room(
&socket,
&room,
socket.within(room.clone()).sockets().unwrap().len(),
);
}
}
socket.on(
"create",
|socket: SocketRef, pool: State<PgPool>| async move {
@@ -59,6 +96,7 @@ fn on_connect(socket: SocketRef) {
.map(|x| char::to_ascii_uppercase(&(x as char)))
.collect();
tracing::info!("Creating room: {:?}", room);
// TODO: Handle duplicates
if let Err(e) = add_room(socket.id, room.clone(), &pool).await {
tracing::error!("{:?}", e);
@@ -66,7 +104,11 @@ fn on_connect(socket: SocketRef) {
}
socket.leave_all().unwrap();
socket.join(room.clone()).unwrap();
socket.emit("joined-room", &room).unwrap();
emit_update_room(
&socket,
&room,
socket.within(room.clone()).sockets().unwrap().len(),
);
},
);
@@ -77,15 +119,30 @@ fn on_connect(socket: SocketRef) {
return;
}
tracing::info!("Joining room: {:?}", room);
if let Err(e) = join_room(socket.id, room.clone(), &pool).await {
tracing::error!("{:?}", e);
return;
let room_error = join_room(socket.id, room.clone(), &pool).await;
if let Err(e) = &room_error {
if let Error::RoomFull(Some(player)) = &e {
tracing::warn!("{:?}", e);
update_sid(&player, socket.id.as_str(), &pool).await.unwrap();
let data = get_game_state(socket.id.as_str(), &room, &pool).await.unwrap();
socket
.emit(
"restore",
serde_json::json!({"turn": data.0, "player": data.1, "opponent": data.2}),
)
.unwrap();
} else {
tracing::error!("{:?}", e);
return;
}
}
socket.leave_all().unwrap();
socket.join(room.clone()).unwrap();
socket.emit("joined-room", &room).unwrap();
if socket.within(room.clone()).sockets().unwrap().len() != 2 {
let users = socket.within(room.clone()).sockets().unwrap().len();
emit_update_room(&socket, &room, users);
if users != 2 || room_error.is_err() {
return;
}
let ack_stream = socket
@@ -141,11 +198,44 @@ fn on_connect(socket: SocketRef) {
},
);
socket.on(
"leave",
|socket: SocketRef, pool: State<PgPool>| async move {
tracing::info!("Leaving Rooms: {:?}", socket.id);
leave_and_inform(&socket, &pool).await;
},
);
socket.on_disconnect(|socket: SocketRef, pool: State<PgPool>| async move {
tracing::info!("Disconnecting: {:?}", socket.id);
socket.leave_all().unwrap();
if let Err(e) = disconnect(socket.id, &pool).await {
tracing::error!("{:?}", e);
}
leave_and_inform(&socket, &pool).await;
});
}
async fn leave_and_inform(socket: &SocketRef, pool: &PgPool) {
let room = socket
.rooms()
.unwrap()
.first()
.map(|s| s.to_string())
.or(get_room(socket.id, pool).await.unwrap());
let Some(room) = room else {
return;
};
let ops = socket.within(room.clone());
socket.leave_all().unwrap();
emit_update_room(socket, &room.to_string(), ops.sockets().unwrap().len());
if let Err(e) = to_delete_sid(socket.id.as_str(), pool).await {
tracing::error!("{:?}", e);
}
}
fn emit_update_room(socket: &SocketRef, room: &String, users: usize) {
socket
.within(room.clone())
.emit(
"update-room",
serde_json::json!({"room": &room, "users": users}),
)
.unwrap();
}