1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
use crate::{
    connection_info::ConnectionInfo,
    messages::{AssignClientId, MessageFromClient, MessageFromServer},
};
use actix::{
    dev::MessageResponse, Actor, ActorContext, AsyncContext, Context, Handler, Message,
    MessageResult, Recipient, SpawnHandle,
};
use jamsocket::{ClientId, MessageRecipient};
use std::{collections::HashMap, time::SystemTime};

/// Actor model representation of a “room”. A room is a set of clients
/// that share an instance of a Jamsocket instance. Conceptually, this
/// is like a room in a chat service. Events (such as messages) and their
/// side-effects are isolated to the room in which they occur.
pub struct RoomActor {
    service_actor: Option<Recipient<MessageFromClient>>,
    connections: HashMap<ClientId, Recipient<MessageFromServer>>,
    /// User IDs are assigned sequentially within the context of each room,
    /// ensuring that they never overlap. `next_id` stores the next ID that
    /// will be assigned.
    next_id: u32,
    shutdown_handle: Option<SpawnHandle>,

    inactive_since: Option<SystemTime>,
}

struct Shutdown;

impl Message for Shutdown {
    type Result = ();
}

#[derive(Message)]
#[rtype(result = "ConnectionInfo")]
pub struct GetConnectionInfo;

impl RoomActor {
    #[must_use]
    pub fn new(service_actor: Recipient<MessageFromClient>) -> Self {
        RoomActor {
            service_actor: Some(service_actor),
            connections: HashMap::default(),
            next_id: 1,
            shutdown_handle: None,
            inactive_since: Some(SystemTime::now()),
        }
    }
}

impl Actor for RoomActor {
    type Context = Context<Self>;
}

impl Handler<MessageFromServer> for RoomActor {
    type Result = ();

    fn handle(&mut self, message: MessageFromServer, _ctx: &mut Context<Self>) {
        match message.to_client {
            MessageRecipient::Broadcast => {
                for addr in self.connections.values() {
                    if addr.do_send(message.clone()).is_err() {
                        tracing::warn!("Could not forward server-sent message to client",);
                    }
                }
            }
            MessageRecipient::Client(client_id) => {
                if let Some(client_connection) = self.connections.get(&client_id) {
                    if client_connection.do_send(message).is_err() {
                        tracing::warn!(
                            "Could not forward server-sent binary message to client in room",
                        );
                    }
                } else {
                    tracing::warn!(
                        ?client_id,
                        "Could not get address of user, who may have disconnected",
                    );
                }
            }
        }
    }
}

impl Handler<MessageFromClient> for RoomActor {
    type Result = ();

    fn handle(&mut self, message: MessageFromClient, ctx: &mut Context<Self>) {
        if let Some(service_actor) = &self.service_actor {
            match &message {
                MessageFromClient::Connect(client, resp) => {
                    self.connections.insert(*client, resp.clone());
                    self.inactive_since = None;
                    if service_actor.do_send(message).is_err() {
                        tracing::warn!("Couldn't forward client message to service",);
                    }

                    // If this task was scheduled to shut down becuse the room is empty,
                    // cancel that.
                    self.shutdown_handle.take().map(|t| ctx.cancel_future(t));
                }
                MessageFromClient::Disconnect(client_id) => {
                    self.connections.remove(client_id);

                    if self.connections.is_empty() {
                        self.inactive_since = Some(SystemTime::now());
                    }

                    if service_actor.do_send(message).is_err() {
                        tracing::warn!("Couldn't forward client message to service",);
                    }
                }
                MessageFromClient::Message { .. } => {
                    if service_actor.do_send(message).is_err() {
                        tracing::warn!("Couldn't forward message from client to service",);
                    }
                }
            }
        } else {
            tracing::warn!("MessageFromClient received on room with no service attached",);
        }
    }
}

impl MessageResponse<RoomActor, AssignClientId> for ClientId {
    fn handle(self, _: &mut Context<RoomActor>, tx: Option<actix::dev::OneshotSender<ClientId>>) {
        if let Some(tx) = tx {
            if let Err(error) = tx.send(self) {
                // TODO: checking this avoids a linter warning, but I need to better
                // understand the series of events that would lead to this triggering.
                tracing::error!(?error, "Error returning response to AssignClientId");
            }
        }
    }
}

impl Handler<AssignClientId> for RoomActor {
    type Result = ClientId;

    fn handle(&mut self, _: AssignClientId, _ctx: &mut Context<Self>) -> ClientId {
        let result = self.next_id;
        self.next_id += 1;

        result.into()
    }
}

impl Handler<Shutdown> for RoomActor {
    type Result = ();

    fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
        tracing::info!(
            "Shutting down service actor because no clients are left and the timeout period has elapsed",
        );

        ctx.stop();
    }
}

impl Handler<GetConnectionInfo> for RoomActor {
    type Result = MessageResult<GetConnectionInfo>;

    fn handle(&mut self, _: GetConnectionInfo, _: &mut Self::Context) -> Self::Result {
        let seconds_inactive = self
            .inactive_since
            .map(|d| SystemTime::now().duration_since(d).unwrap().as_secs())
            .unwrap_or(0);

        MessageResult(ConnectionInfo {
            active_connections: self.connections.len() as _,
            listening: true,
            seconds_inactive: seconds_inactive as _,
        })
    }
}