Online Users in realtime

Alrighty, we’re production ready! Tested on Edge, Safari and Firefox as well as on mobile, somehow Opera doesn’t seem to send the cookie though. Might be my local settings. I thought I was going to have to configure a lot more based on what I saw on so many configurations for SSL but it appears I’m cruising here!

Configuration

I’m using Coolify, which uses Traefik and LetsEncrypt SSL. I did not have to do anything special in my docker compose or traefik configuration. I am not adding certs or keys to any configuration. I am however enabling CORS in my Redwood application and declaring my api and web domains.

ActivityContext.tsx

import React, { useRef, useState, useEffect } from 'react'

import { io, Socket } from 'socket.io-client'

import { toast } from '@redwoodjs/web/dist/toast'

type UserActivity = {
  userId: number
  status: string
  lastOnline: string
}

interface ActivityContext {
  users: UserActivity[]
  activity: 'ONLINE' | 'OFFLINE' | 'BUSY'
  setActivity: (activity: string) => void
}

export const ActivityContext = React.createContext<ActivityContext>({
  users: [],
  activity: 'ONLINE',
  setActivity: () => {},
})

interface ActivityProviderProps {
  children: React.ReactNode
}

const ActivityProvider = ({ children }: ActivityProviderProps) => {
  const socketUri =
    process.env.NODE_ENV === 'development'
      ? 'ws://localhost:8911/ws'
      : 'https://api.app.com/ws'
  const [usersActivity, setUserActivity] = useState([])
  const [myActivity, setMyActivity] = useState<'ONLINE' | 'OFFLINE' | 'BUSY'>(
    'ONLINE'
  )

  const websocket = useRef<Socket>()

  // Initialize the websocket
  useEffect(() => {
    // Establish websocket
    const socket = io(socketUri, {
      reconnectionDelayMax: 10000,
      transports: ['websocket'],
      withCredentials: true,
    })

    // Activity channel updates
    socket.on('activity', (users) => {
      // Update users' activity through context
      setUserActivity(users)
    })

    // Online on connection
    socket.on('connect', () => {
      // Let everyone know current user is online
      socket.emit('activity', 'ONLINE')
    })

    // Offline on disconnection
    socket.on('disconnect', () => {
      // Reset everyone to offline
      setUserActivity({})
      setMyActivity('OFFLINE')
    })

    // Error handling
    socket.on('connect_error', (error) => {
      toast.error('Application error: ' + error)
      setMyActivity('OFFLINE')
    })

    websocket.current = socket

    return () => {
      socket.disconnect()
    }
  }, [])

  // Change the current user activity
  const changeActivity = (activity: 'ONLINE' | 'OFFLINE' | 'BUSY') => {
    if (websocket.current.connected) {
      websocket.current.emit('activity', activity)
      setMyActivity(activity)
    }
  }

  // Create the context value to use down the tree
  const contextValue: ActivityContext = {
    users: usersActivity, // Provide access to the users activity
    activity: myActivity, // Provide access to the current user activity
    setActivity: changeActivity, // Set the current user activity and emit the change to the websocket (use this in the UI to allow users to set busy or offline themselves)
  }

  return (
       (
      <ActivityContext.Provider value={contextValue}>
        {children}
      </ActivityContext.Provider>
    )
  )
}

export default ActivityProvider

server.ts

import { Server } from 'socket.io'
import { v7 as uuidv7 } from 'uuid'

import { isProduction } from '@redwoodjs/api/logger'
import { createServer } from '@redwoodjs/api-server'
import { decryptSession, getSession } from '@redwoodjs/auth-dbauth-api'
import {
  AuthenticationError,
  ForbiddenError,
  ValidationError,
} from '@redwoodjs/graphql-server'

import { cookieName } from 'src/lib/auth'
import { logger as parentLogger } from 'src/lib/logger'

import { redis } from './lib/redis' // Just sets up a connection to my redis instance

const logger = parentLogger.child({})
logger.level = isProduction ? 'warn' : 'trace'

const cookieRegex = /([a-zA-Z0-9+/|=]{110})\w+/
const tokenRegex =
  /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i

async function main() {
  const server = await createServer({
    logger,
  })

  await server.register(import('@fastify/rate-limit'), {
    max: 500,
    timeWindow: '1 minutes',
  })

  server.addContentTypeParser(/^image\/.*/, (req, payload, done) => {
    payload.on('error', (err) => {
      done(err)
    })

    payload.on('end', () => {
      done(null)
    })
  })

  // Websocket Server
  const io = new Server({
    pingInterval: 60000,
    cors: {
      origin: isProduction
        ? [
            'https://web.app.com',
            'https://api.app.com'
          ]
        : ['http://localhost:8911', 'http://localhost:8910'],
      methods: ['GET', 'POST'],
      credentials: true,
    },
  })

  io.attach(server.server)

  let userId: number
  const users = new Set()

  // Authentication middleware, executed sequentially once per connection
  io.of('/ws').use((socket, next) => {
    // Ensure we have a cookie to validate
    if (socket.handshake.headers.cookie) {
      next()
    } else {
      next(new AuthenticationError('Unauthenticated'))
    }
  })

  // Sanitize the sessionCookie
  io.of('/ws').use((socket, next) => {
    let isValid = true
    // Extract session cookie
    const sessionCookie = socket.handshake.headers.cookie
      .split(';')
      .find((item) => item.trim().startsWith('session_8911'))
      ?.trim()
    const sessionCookieContent = sessionCookie?.substring(
      sessionCookie.indexOf('=') + 1
    )

    if (!sessionCookie) {
      isValid = false
      next(new ValidationError('Invalid token'))
    }

    if (!cookieRegex.test(sessionCookieContent)) {
      isValid = false
      next(new ValidationError('Invalid token format'))
    }

    // Validate the session cookie
    const [session, _csrfToken] = decryptSession(
      getSession(sessionCookie, cookieName?.replace('%port%', '8911'))
    )

    if (!session.id) {
      isValid = false
      next(new ValidationError('Invalid session'))
    }

    if (!tokenRegex.test(_csrfToken)) {
      isValid = false
      next(new ValidationError('Invalid session format'))
    }

    // Continue if valid
    if (isValid) {
      userId = session.id
      users.add(userId)
      next()
    }
  })

  // Limit user to 20 connections
  io.of('/ws').use(async (_, next) => {
    const sessionsCount = (await redis.call(
      'JSON.OBJLEN',
      `connections:${userId}`
    )) as number
    if (sessionsCount < 20) {
      next()
    } else {
      next(new ForbiddenError('Maximum connections reached'))
    }
  })

  function extractJson(result) {
    const extractedData = result
      .map(([_, jsonString]) => {
        if (jsonString) {
          const data = JSON.parse(jsonString)
          const { userId, status, lastOnline } = data
          return { userId, status, lastOnline }
        }
        return null // Handle cases where jsonString is null or undefined
      })
      .filter((data) => data !== null) // Filter out any null entries

    return extractedData
  }

  // Not efficient, look into RediSearch to index instead
  const emitActivity = async () => {
    // Grab all users activity and emit to all clients
    redis.multi({ pipeline: false })
    users.forEach((id) => {
      redis.call('JSON.GET', `users:${id}`)
    })
    await redis.exec((err, result) => {
      // Emit updated activity to all clients
      if (!err) {
        const resultJson = extractJson(result)
        io.of('/ws').emit('activity', resultJson)
      } else if (err) {
        logger.error('Unable to emit activity: ', err)
      }
    })
  }

  const handleDisconnect = async (uuid: string, userId: number) => {
    // Set the user to offline in Redis
    try {
      // Remove connection from Redis
      await redis.call('JSON.DEL', `connections:${userId}`, uuid)
      // Get all connections for the user
      const sessionsCount = await redis.call(
        'JSON.OBJLEN',
        `connections:${userId}`
      )
      // If no connections exist, set the status to offline
      if (sessionsCount === 0) {
        await redis.call(
          'JSON.SET',
          `users:${userId}`,
          '.status',
          JSON.stringify('OFFLINE')
        )
        await redis.call(
          'JSON.SET',
          `users:${userId}`,
          '.lastOnline',
          JSON.stringify(new Date().toISOString())
        )
        emitActivity()
      }
    } catch (err) {
      logger.error(`Unable to handle disconnect for ${uuid}: `, err)
    }
  }

  const handleActivity = async (userId: number, activity: string) => {
    // Set the user's activity in Redis
    try {
      await redis.call(
        'JSON.SET',
        `users:${userId}`,
        '.status',
        JSON.stringify(activity)
      )
      emitActivity()
    } catch (err) {
      logger.error(`Unable to handle activity for ${userId}: `, err)
    }
  }

  // New socket connection at /ws
  io.of('/ws').on('connection', async (socket) => {
    const uuid = uuidv7()

    const userObject = {
      userId: userId,
      status: 'ONLINE',
      lastOnline: new Date().toISOString(),
    }
    await redis
      .call('JSON.SET', `users:${userId}`, '.', JSON.stringify(userObject))
      .catch((err) => {
        logger.error('Unable to create user details: ', err)
      })

    // Save new connection to Redis
    const connectionObject = {
      [uuid]: {
        sid: socket.id,
      },
    }

    // Check Redis if connection key already exists, 0 false, 1 true
    const connectionExists = await redis.exists(`connections:${userId}`)

    if (connectionExists === 0) {
      // Save new connection to Redis
      await redis
        .call(
          'JSON.SET',
          `connections:${userId}`,
          '.',
          JSON.stringify(connectionObject)
        )
        .catch((err) => {
          logger.error('Unable to create connection details: ', err)
        })
    } else {
      // Merge new connection to existing user
      await redis
        .call(
          'JSON.MERGE',
          `connections:${userId}`,
          '.',
          JSON.stringify(connectionObject)
        )
        .catch((err) => {
          logger.error('Unable to merge connection details: ', err)
        })
    }

    // On connection send out most up to date user activity
    emitActivity()

    // Events
    socket.on('disconnect', () => handleDisconnect(uuid, userId))
    socket.on('activity', (activity) => handleActivity(userId, activity))
  })

  await server.start()
}

main()

Frontend component to show connections (example)

const { users } = useContext(ActivityContext)

  return (
    <>
      {users && (
        <div>
          {users.map((user) => (
            <p key={user.userId}>
              ID: {user.userId} Status: {user.status} Last Online:{' '}
              {user.lastOnline}
            </p>
          ))}
        </div>
      )}
    </>
  )

An example of users logging in and then closing their browser window (user 2):
frontend

Redis

As you can see, I’m doing quite a bit of nasty redis logic here (multi/exec). While it works, my next goal is to set up RediSearch so I can just index these json documents and have a much better flow on retrieving the online status for all users on the app. This is absolutely important in a production environment where hundreds of connections could be happening and a multi/exec situation isn’t ideal.
What it looks like on redis:
redis
No connected sessions:
session-off
Connected sessions:
session-on

Security implementations

  • Authentication works as described in my previous post, the socket.io middleware correctly allows connections that meet those checks.
  • Connections are only allowed on my app’s domains, so someone can’t just fire up a client and hit the server continuously.
  • Connections are going through TLS with a secure cookie. The cookie is sanitized (regex), decrypted and validated.
  • Connections are tracked by user in Redis, every time a new connections is made by the same user the server checks if the user already has 20 open connections, if they do the connection is decline. This prevents someone opening 100 tabs, so it’s like a soft rate limit.

The only thing remaining is implementing RediSearch and authorization logic. I have the userId and functions ready to check permissions, I’m just thinking about the load if I made queries to the postgres db where the permissions per user are connected every time a user makes a request through the socket connection.

I could replicate the permissions into redis. So when a new row or update is inserted into my postgres bring them into redis and then I can query based on the userId to see what that user is allowed to do on the websocket server.

1 Like