Online Users in realtime

I went down the rabbit hole experimenting with websockets. Sadly, the fastify/websocket plugin has shown its limitations as well as the security nightmare of staying up to date in production. I also found it difficult to find good docs on upgrading the request over https. I got messages and ping/pong to work but I realize it’s not something I can deploy in production easily (Thanks Tobbe. So, before I jump into using something like SocketIO I was curious if there was another way of doing this?

I’ve gotten subscriptions to work and I use them without issue. Perhaps what I want to do is possible with the other realtime mechanisms that exist. I’m looking for guidance as I’m a bit lost on what to use for my use case.

Use case: showing online status

When a user authenticates and logs into my Redwoodjs app I would like to track their status. This could be online, offline or busy (which the user could set themselves).

There’s a pretty neat module that can set expiration time on a hash field in Redis, which I’ve gotten to work and loaded in my Redis db. Reasoning being that it’s impossible from what I can figure out to let the api know when a user “disconnects”, ie. closes the browser tab. So on the backend, I can call hset() with Ioredis to set a field that expires in 1 minute and return all online users by calling hgetall(). The only fields in that key would be users whose fields haven’t expired.

The question is how do I implement this heartbeat on the frontend? The user moves around on pages and might not necessarily call any mutations or queries and I don’t want to bloat my existing services. So I figured I create a context that wraps around my app and inside there I create a 1 minute interval to refresh the hset() key expiration. If the user closes the tab, this refresh doesn’t occur and the field expires in 1 minute, thus the user is offline and won’t be returned by hgetall().

Can I just create a service that’s not in my prisma schema and instead of returning records from my postgres db I can return the hgetall() from my redis? This service would have an update and query which respectively uses the Ioredis hset() and hgetall() to make this work. Something like:

# Show current user's status
export const getOnlineUser: QueryResolvers['getOnlineUser'] = () => {
  return ioredis.hget('online_users', context.currentUser?.id)
}
# Return all users' status
export const getOnlineUsers: QueryResolvers['getOnlineUser's] = () => {
  return ioredis.hgetall('online_users')
}
# Update the current user's status
export const updateOnlineUser: MutationResolvers['updateOnlineUser'] = ({ input }) => {
  ioredis.hset('online_users', context.currentUser?.id, input.status)
}

And then grab useMutation in the context to send the status (pseudo code to follow).

const UPDATE_ONLINE_USER = gql`
  mutation UpdateOnlineUser($input: UpdateOnlineUserInput!) {
    updateOnlineUser(input: $input) {
        ...id? not sure since we're not returning anything
    }
  }
`
...
setInterval => 1 minute:
updateOnlineUser({
        variables: {
          input: {
            status: 'online',
          },
        },
      })

I tried both subscriptions and liveQueries but neither seemed to really work well. I would’ve preferred to use websockets but I think long term the headache that introduces is too great. I’ve settled on this implementation, let me know what you think or if this could be handled differently?

service resolves:

import { redis } from 'src/lib/redis' // I create an ioredis instance here

function cb(err, onlineUsers) {
  if (err) {
    throw new Error('Unable to get online users')
  }
  return onlineUsers
}

export const getUsersActivity = async () => {
  try {
    return await redis.hgetall('online_users', cb)
  } catch (error) {
    throw new Error('Unable to get online users')
    return []
  }
}

export const setUserActivity = async ({ input }) => {
  const { activity } = input
  try {
    // Save to redis
    await redis.hset('online_users', context.currentUser?.id, activity)
    await redis.call(
      'EXPIREMEMBER',
      'online_users',
      context.currentUser?.id,
      30
    )

    return true
  } catch (error) {
    throw new Error('Unable to update user activity')
  }
}

sdl:

  input UserActivityInput {
    activity: String!
  }
  type Query {
    getUsersActivity: JSONObject! @requireAuth
}
  type Mutation {
    setUserActivity(input: UserActivityInput!): Boolean! @requireAuth
}

context:

import React, { useState, useEffect } from 'react'

import { useMutation, useQuery } from '@redwoodjs/web'

import { useAuth } from 'src/auth'

export const UPDATE_ACTIVITY = gql`
  mutation updateUserActivity($input: UserActivityInput!) {
    setUserActivity(input: $input)
  }
`

export const QUERY_ACTIVITY = gql`
  query getUsersActivity {
    getUsersActivity
  }
`

type UserActivity = [string, string]

interface ActivityContext {
  usersActivity: UserActivity[]
}

export const ActivityContext = React.createContext<ActivityContext>({
  usersActivity: [],
})

interface ActivityProviderProps {
  children: React.ReactNode
}

const ActivityProvider = ({ children }: ActivityProviderProps) => {
  const [usersActivity, setUserActivity] = useState([])
  const { currentUser } = useAuth()

  // Start a live query to listen for users activity
  useQuery(QUERY_ACTIVITY, {
    onCompleted(data) {
      setUserActivity(data.getUsersActivity)
    },
    pollInterval: 35000,
  })

  const [updateUserActivity] = useMutation(UPDATE_ACTIVITY)

  // User is logged in, let's make sure they show online
  useEffect(() => {
    updateUserActivity({
      variables: {
        input: {
          activity: 'ONLINE',
        },
      },
    })

    const intervalId = setInterval(() => {
      updateUserActivity({
        variables: {
          input: {
            activity: currentUser?.preference.activity,
          },
        },
      })
    }, 30000)

    return () => clearInterval(intervalId)
  }, [])

  // Create the context value to use down the tree
  const contextValue: ActivityContext = {
    usersActivity, // Provide access to the users activity
  }

  return (
    <ActivityContext.Provider value={contextValue}>
      {children}
    </ActivityContext.Provider>
  )
}

export default ActivityProvider

I settled for a simple polling solution and interval. I update redis with the current user’s activity preference and grab the result of all fields on the hash key to show who’s online (field exists, hasn’t expired) and who isn’t.

Would love to hear any input :slight_smile:

Revisiting this issue I decided to try out SocketIO. It was much less buggy and way easier to implement than @fastify/websocket. The docs are also more detailed. I’ve now got the following and discarded the liveQuery solution above:

server.ts

import { Server } from 'socket.io'
import { v7 as uuidv7 } from 'uuid'

import { isProduction } from '@redwoodjs/api/logger'
import { createServer } from '@redwoodjs/api-server'

import { logger as parentLogger } from 'src/lib/logger'

const logger = parentLogger.child({})
logger.level = isProduction ? 'warn' : 'trace'

async function main() {
  const server = await createServer({
    logger,
  })

  await server.register(import('@fastify/rate-limit'), {
    max: 500,
    timeWindow: '1 minutes',
  })

  server.addContentTypeParser(/^image\/.*/, (req, payload, done) => {
    payload.on('error', (err) => {
      done(err)
    })

    payload.on('end', () => {
      done(null)
    })
  })

  const io = new Server({
    pingInterval: 10000,
    pingTimeout: 5000,
  })

  io.attach(server.server)

  const connections = {}
  const users = {}

  const handleDisconnect = (uuid: string) => {
    // Remove the socket and user
    delete connections[uuid]
    delete users[uuid]
    // Emit users so clients know latest user activity
    io.of('/ws').emit('activity', users)
  }

  const handleActivity = (uuid: string, activity: string) => {
    // Set the user's activity
    users[uuid].state.activity = activity
    // Emit users so clients know latest user activity
    io.of('/ws').emit('activity', users)
  }

  // Authentication
  io.use(async (socket, next) => {
    if (socket.handshake.auth.token) {
      // Check token
      next()
    } else {
      next(new Error('Unauthenticated'))
    }
  })

  io.of('/ws').on('connection', (socket) => {
    const { username } = socket.handshake.auth.token // For testing purposes I used just username
    const uuid = uuidv7()

    // Keep track of users and connections
    connections[uuid] = socket
    users[uuid] = {
      username,
      state: {
        activity: 'ONLINE',
      },
    }

    // Events
    socket.on('disconnect', () => handleDisconnect(uuid))
    socket.on('activity', (activity) => handleActivity(uuid, activity))
  })

  await server.start()
}

main()

ActivityContext

import React, { useState, useEffect } from 'react'

import { io } from 'socket.io-client'

import { useAuth } from 'src/auth'

type UserActivity = {
  username: string
  state: {
    activity: string
  }
}

interface ActivityContext {
  users: Record<string, UserActivity>
}

export const ActivityContext = React.createContext<ActivityContext>({
  users: {},
})

interface ActivityProviderProps {
  children: React.ReactNode
}

const ActivityProvider = ({ children }: ActivityProviderProps) => {
  const [usersActivity, setUserActivity] = useState({})
  const { currentUser } = useAuth()
  const userActivity = currentUser?.activity
  const userEmail = currentUser?.email

  useEffect(() => {
    const socket = io('ws://localhost:8911/ws', {
      reconnectionDelayMax: 10000,
      auth: {
        token: userEmail, // This is where I don't know what to supply? The request header cookie?
      },
    })

    socket.on('activity', (users) => {
      setUserActivity(users)
    })

    socket.on('connect', () => {
      socket.emit('activity', userActivity)
    })

    socket.on('disconnect', () => {
      console.log('disconnected')
    })

    socket.io.on('error', (error) => {
      console.log(error)
    })

    return () => {
      socket.disconnect()
    }
  }, [])

  // Create the context value to use down the tree
  const contextValue: ActivityContext = {
    users: usersActivity, // Provide access to the users activity
  }

  return (
    <ActivityContext.Provider value={contextValue}>
      {children}
    </ActivityContext.Provider>
  )
}

export default ActivityProvider

My current problem lies in how to authenticate the websocket connection. I’m using dbAuth, which from my understanding uses session and cookies, not token.

Since the ActivityContext runs inside an authenticated and authorized part of my frontend, I’m not sure how to communicate the dbAuth cookie to my socketIO server. I can see when I log in to my app the auth?method=getToken request validates my session and I have cookies like so:

"Request Headers (666 B)": {
	"headers": [
		{
			"name": "Cookie",
			"value": "session=xyz session_8911=xyz
		},

If I send that to socketIO server, how can it validate the cookie? Before I implement my own auth function I was wondering if server.ts can access something on the backend for dbAuth and verify this cookie?

Hi @jamesj - lots to read :smiley: and looks like doing some exciting things with Redwood which is great to see.

I’ll reply later in more depth but wanted to share two things:

  1. I believe what you are describing is presence. For example this is Supabase’s solution https://supabase.com/docs/guides/realtime/presence

  2. Verify session cookie. Here’s the decoder redwood/packages/auth-providers/dbAuth/api/src/decoder.ts at main · redwoodjs/redwood · GitHub and also

@KrisCoulson have you implemented a user online presence in Redwood that may help @jamesj here?

Hey @dthyresson sir, how’s it going? I’ve been digging further and I saw the decoder. That’s where I’m at right now myself, I’m not using Supabase because I’m trying to handle this in-house.

So from what I can gather in the server.ts file I can use DbAuth’s decoding mechanism to decode the session cookie which is nice:

import { createAuthDecoder } from '@redwoodjs/auth-dbauth-api'
import { cookieName } from 'src/lib/auth'

const authDecoder = createAuthDecoder(cookieName)

However I’m stuck at the socketIO authentication middleware:

io.use(async (socket, next) => {
    if (socket.handshake.auth.token) {
      const { user } = await authDecoder(socket.handshake.auth.token)

      if (user) {
        next()
      } else {
        next(new Error('Invalid token'))
      }
    } else {
      next(new Error('Unauthenticated'))
    }
  })

I’m missing a type argument and req: { event: APIGatewayProxyEvent | Request; context: Context; }). I figure for the type it’s ‘dbAuth’ but I have no idea how to construct the req. A socket from socketIO gives me a socket.request but it’s not in the correct type.

On the frontend in my context, I’m still not sure where I get the session cookie from. I tried:

  const { getToken } = useAuth()
...
const socket = io('ws://localhost:8911/ws', {
      reconnectionDelayMax: 10000,
      auth: {
        token: getToken(),
      },
    })

However getToken() just resolves the promise and returns ‘1’, so that doesn’t seem to work either. I feel like I’m close, if I can deliver the token from the user session who’s already authenticated and figure out how to set up the authDecoder on the server I’ll be able to establish an authenticated session!

SocketIO allows you to create cookies when connecting and create your own authentication with the session, but I figured since I’m already using dbAuth and it handles all the authentication on http it’s simpler to just pass it and verify it on the socketIO server. Perhaps I’m wrong and this is a fool’s errand :joy:

Usually the cookie will be on the request and the your middleware or request handler can get the headers and cookie form it and then pass to on your case Socket.

We’re working on middleware as well for this and other purposes and I’d have to check if it’s in the current release or not.

Here’s the middleware for dbAuth thy will be used for SSR and RSC.

You might be able to write your own middleware for example
.

1 Like

Okay we’re getting somewhere!

I found out how to pass the session cookies to the websocket server. It’s not as socketIO documented with the auth.token, but rather like this:

    const socket = io('ws://localhost:8911/ws', {
      reconnectionDelayMax: 10000,
      withCredentials: true,
    })

Now when I inspect network traffic I see the cookie from dbAuth on the socket connection. Now I just have to decode it!

1 Like

For example here is some middleware I need to finish for Unkey unkey/packages/redwoodjs/src/ratelimit/middleware/createRatelimitMiddleware.ts at redwood · dthyresson/unkey · GitHub

1 Like

I see you’re doing something like:

const decryptedSession = dbAuthSession(
    req as Request,
    cookieNameCreator(cookieName),
  )

const { currentUser, decryptedSession } = await validateSession({
        req,
        cookieName,
        getCurrentUser,
      })

I’ll look into this!

You might be able to write your own middleware for example

These are great examples, exactly what I was looking for. I guess it would be too easy to just call dbAuthSession on my server and pass it the cookie. I just thought since I have access to the session secret I could destructure it and validate it but perhaps I have to create a lot more code to do this like in your examples. Thank you! I’ll report back with more :disguised_face:

Sorry for all these posts, but I’m trying to keep a running record to show what I’ve done. So as I mentioned previously, the cookie can be sent with the websocket if you start your socket with ‘withCredentials: true’. Sadly, you’ll get a bunch of (xhr) CORS No Allow Credentials errors when you try to make a socket connection now. This is on my local dev machine.

I saw that the docs have a section about that, but I’m seeing some weird behavior here.

I have added:

Server.ts

  const io = new Server({
    pingInterval: 10000,
    pingTimeout: 5000,
    cors: {
      origin: '*',
      credentials: true,
    },
  })

Auth.ts (api/src/functions) handler

    cors: {
      origin: '*', // Probably not idea in production, but I'll worry about that later
      credentials: true,
    },

Auth.ts (web/src)

const dbAuthClient = createDbAuthClient({
  fetchConfig: { credentials: 'include' },
})

Now my sockets open sometimes, but sometimes they still show CORS No Allow Credentials :thinking:
Capture

Solution

Notice how my connections in the picture have transport=polling? Read on…

You do not need to edit anything for dbAuth, so forget all those settings above. The issue is with the SocketIO client. All you need to specify the transports, otherwise it’ll assume http with their fallback strategy. This is SocketIO specific, since they start with http long polling. If you do require http polling you might very well have to deal with CORS. :face_with_diagonal_mouth: Setting the transport specifically to websocket also automatically sends the cookie in headers, so you can take out withCredentials.

    const socket = io('ws://localhost:8911/ws', {
      reconnectionDelayMax: 10000,
--remove      withCredentials: true,
-->      transports: ['websocket'],
    })

:man_facepalming: I now have access to the cookie and I’ll work on figuring out how to validate it on the server.

If you’ve done everything correctly you should see a “Sec-WebSocket-Accept” in the response and request headers. According to the RFC this means the server upgraded and accepted the initial http handshake. We are still operating over ws:// and not wss:// however!

I got it to work. It was actually simpler than I thought after experimenting with what’s available in dbAuth. I just needed decryptSession and getSession. Here’s authenticated websockets with SocketIO and RedwoodJS! This example just uses sockets to see the status of users, for example online, offline, busy, etc. No edits in auth.ts or such are needed, the changes to the server file and a client component or context are the only thing necessary to get it to work.

Capture

Dependencies

yarn workspace api add socket.io
yarn workspace web add socket.io-client

Server.ts

import { Server } from 'socket.io'
import { v7 as uuidv7 } from 'uuid'

import { isProduction } from '@redwoodjs/api/logger'
import { createServer } from '@redwoodjs/api-server'
import {
  decryptSession,
  getSession,
} from '@redwoodjs/auth-dbauth-api'
import { AuthenticationError, ValidationError } from '@redwoodjs/graphql-server'

import { cookieName } from 'src/lib/auth'
import { logger as parentLogger } from 'src/lib/logger'

const logger = parentLogger.child({})
logger.level = isProduction ? 'warn' : 'trace'

async function main() {
  const server = await createServer({
    logger,
  })

  // Websocket Server
  const io = new Server({
    pingInterval: 10000,
    pingTimeout: 5000,
  })

  io.attach(server.server)

  let userId: number

  // Authentication middleware, executed sequentially once per connection
  io.of('/ws').use((socket, next) => {
    if (socket.handshake.headers.cookie) {
      next()
    } else {
      next(
        new AuthenticationError(
          'Unauthenticated from ' + socket.handshake.address
        )
      )
    }
  })

  // Sanitize the sessionCookie
  io.of('/ws').use((socket, next) => {
    // Extract session cookie
    const sessionCookie = socket.handshake.headers.cookie
      .split(';')
      .find((item) => item.trim().startsWith('session_8911'))
      ?.trim()

    if (!sessionCookie) {
      next(new ValidationError('Invalid token'))
    }

    // Validate the session cookie
    const [session, _csrfToken] = decryptSession(
      getSession(sessionCookie, cookieName?.replace('%port%', '8911'))
    )

    if (!session.id) {
      next(new ValidationError('Invalid session'))
    }

    // Continue
    userId = session.id
    next()
  })

  // Should this be in Redis instead? Could consume a lot of I/O
  const connections = {}
  const users = {}

  const handleDisconnect = (uuid: string) => {
    // Remove the socket and user
    delete connections[uuid]
    delete users[uuid]
    // Emit users so clients know latest user activity
    io.of('/ws').emit('activity', users)
  }

  const handleActivity = (uuid: string, activity: string) => {
    // Set the user's activity
    users[uuid].state.activity = activity
    // Emit users so clients know latest user activity
    io.of('/ws').emit('activity', users)
  }

  // New socket connection at /ws
  io.of('/ws').on('connection', (socket) => {
    const uuid = uuidv7()

    // History
    connections[uuid] = socket
    users[uuid] = {
      userId,
      state: {
        activity: 'ONLINE', // Set initial status as online
      },
    }

    // Events
    socket.on('disconnect', () => handleDisconnect(uuid))
    socket.on('activity', (activity) => handleActivity(uuid, activity))
  })

  await server.start()
}

main()

ActivityContext.tsx

import React, { useState, useEffect } from 'react'

import { io } from 'socket.io-client'

import { useAuth } from 'src/auth'

type UserActivity = {
  userId: number
  state: {
    activity: string
  }
}

interface ActivityContext {
  users: Record<string, UserActivity>
}

export const ActivityContext = React.createContext<ActivityContext>({
  users: {},
})

interface ActivityProviderProps {
  children: React.ReactNode
}

const ActivityProvider = ({ children }: ActivityProviderProps) => {
  const [usersActivity, setUserActivity] = useState({})
  const { currentUser } = useAuth()
  const userActivity = currentUser?.preference.activity

  useEffect(() => {
    const socket = io('ws://localhost:8911/ws', {
      reconnectionDelayMax: 10000,
      transports: ['websocket'],
    })

    socket.on('activity', (users) => {
      setUserActivity(users)
    })

    socket.on('connect', () => {
      socket.emit('activity', userActivity)
    })

    socket.on('disconnect', () => {
      console.log('disconnected')
    })

    socket.io.on('error', (error) => {
      console.log(error)
    })

    return () => {
      socket.disconnect()
    }
  }, [])

  // Create the context value to use down the tree
  const contextValue: ActivityContext = {
    users: usersActivity, // Provide access to the users activity
  }

  return (
    <ActivityContext.Provider value={contextValue}>
      {children}
    </ActivityContext.Provider>
  )
}

export default ActivityProvider

Display result in a component from context

 const { users } = useContext(ActivityContext)

  return (
    <>
      {users && (
        <div>
          {Object.entries(users).map((user) => (
            <p key={user[1].userId}>
              ID: {user[1].userId} Status: {user[1].state.activity}
            </p>
          ))}
        </div>
      )}
    </>
  )

Authentication measures

  • Trying to connect from outside my logged in client I get CORS errors which is good, because I don’t want just anyone hitting my websocket server
  • If I allow CORS * in my SocketIO server config but I don’t send a cookie I get my first error: Unauthenticated
  • If the socket connection does have cookies, we extract the session_8911 cookie, if it doesn’t exist my second error occurs: Invalid token
  • Lastly, if the session_8911 cookie exists we try to decrypt it, but if there is no session.id (really it’s the user’s id encrypted in the cookie, it holds id for the user and the masked e-mail or username you used when setting up dbAuth) the final error occurs: Invalid session

Next steps

  • I should sanitize the cookie to prevent malicious actors from injecting code during the initial handshake
  • I will now focus on enabling TLS so I can use wss://
  • Instead of saving the history of connections and users in a set, perhaps a non-AOF Redis database could be performant if connections and users reach >5k. Potentially with AOF you can also have a persistent store but I’m not sure if that’s necessary? Do I really care about saving these values for later use? Probably not.
  • Bring authorization into this, perhaps since we know the userId we can make a call to the db and fetch the roles/permissions attached to that userId to allow a socket connection to emit/listen to certain channels
  • There should be some kind of rate limiting to prevent DDoS on my endpoint
  • Deployment over https and secure cookies should work

Golden Boy

2 Likes

Alrighty, we’re production ready! Tested on Edge, Safari and Firefox as well as on mobile, somehow Opera doesn’t seem to send the cookie though. Might be my local settings. I thought I was going to have to configure a lot more based on what I saw on so many configurations for SSL but it appears I’m cruising here!

Configuration

I’m using Coolify, which uses Traefik and LetsEncrypt SSL. I did not have to do anything special in my docker compose or traefik configuration. I am not adding certs or keys to any configuration. I am however enabling CORS in my Redwood application and declaring my api and web domains.

ActivityContext.tsx

import React, { useRef, useState, useEffect } from 'react'

import { io, Socket } from 'socket.io-client'

import { toast } from '@redwoodjs/web/dist/toast'

type UserActivity = {
  userId: number
  status: string
  lastOnline: string
}

interface ActivityContext {
  users: UserActivity[]
  activity: 'ONLINE' | 'OFFLINE' | 'BUSY'
  setActivity: (activity: string) => void
}

export const ActivityContext = React.createContext<ActivityContext>({
  users: [],
  activity: 'ONLINE',
  setActivity: () => {},
})

interface ActivityProviderProps {
  children: React.ReactNode
}

const ActivityProvider = ({ children }: ActivityProviderProps) => {
  const socketUri =
    process.env.NODE_ENV === 'development'
      ? 'ws://localhost:8911/ws'
      : 'https://api.app.com/ws'
  const [usersActivity, setUserActivity] = useState([])
  const [myActivity, setMyActivity] = useState<'ONLINE' | 'OFFLINE' | 'BUSY'>(
    'ONLINE'
  )

  const websocket = useRef<Socket>()

  // Initialize the websocket
  useEffect(() => {
    // Establish websocket
    const socket = io(socketUri, {
      reconnectionDelayMax: 10000,
      transports: ['websocket'],
      withCredentials: true,
    })

    // Activity channel updates
    socket.on('activity', (users) => {
      // Update users' activity through context
      setUserActivity(users)
    })

    // Online on connection
    socket.on('connect', () => {
      // Let everyone know current user is online
      socket.emit('activity', 'ONLINE')
    })

    // Offline on disconnection
    socket.on('disconnect', () => {
      // Reset everyone to offline
      setUserActivity({})
      setMyActivity('OFFLINE')
    })

    // Error handling
    socket.on('connect_error', (error) => {
      toast.error('Application error: ' + error)
      setMyActivity('OFFLINE')
    })

    websocket.current = socket

    return () => {
      socket.disconnect()
    }
  }, [])

  // Change the current user activity
  const changeActivity = (activity: 'ONLINE' | 'OFFLINE' | 'BUSY') => {
    if (websocket.current.connected) {
      websocket.current.emit('activity', activity)
      setMyActivity(activity)
    }
  }

  // Create the context value to use down the tree
  const contextValue: ActivityContext = {
    users: usersActivity, // Provide access to the users activity
    activity: myActivity, // Provide access to the current user activity
    setActivity: changeActivity, // Set the current user activity and emit the change to the websocket (use this in the UI to allow users to set busy or offline themselves)
  }

  return (
       (
      <ActivityContext.Provider value={contextValue}>
        {children}
      </ActivityContext.Provider>
    )
  )
}

export default ActivityProvider

server.ts

import { Server } from 'socket.io'
import { v7 as uuidv7 } from 'uuid'

import { isProduction } from '@redwoodjs/api/logger'
import { createServer } from '@redwoodjs/api-server'
import { decryptSession, getSession } from '@redwoodjs/auth-dbauth-api'
import {
  AuthenticationError,
  ForbiddenError,
  ValidationError,
} from '@redwoodjs/graphql-server'

import { cookieName } from 'src/lib/auth'
import { logger as parentLogger } from 'src/lib/logger'

import { redis } from './lib/redis' // Just sets up a connection to my redis instance

const logger = parentLogger.child({})
logger.level = isProduction ? 'warn' : 'trace'

const cookieRegex = /([a-zA-Z0-9+/|=]{110})\w+/
const tokenRegex =
  /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i

async function main() {
  const server = await createServer({
    logger,
  })

  await server.register(import('@fastify/rate-limit'), {
    max: 500,
    timeWindow: '1 minutes',
  })

  server.addContentTypeParser(/^image\/.*/, (req, payload, done) => {
    payload.on('error', (err) => {
      done(err)
    })

    payload.on('end', () => {
      done(null)
    })
  })

  // Websocket Server
  const io = new Server({
    pingInterval: 60000,
    cors: {
      origin: isProduction
        ? [
            'https://web.app.com',
            'https://api.app.com'
          ]
        : ['http://localhost:8911', 'http://localhost:8910'],
      methods: ['GET', 'POST'],
      credentials: true,
    },
  })

  io.attach(server.server)

  let userId: number
  const users = new Set()

  // Authentication middleware, executed sequentially once per connection
  io.of('/ws').use((socket, next) => {
    // Ensure we have a cookie to validate
    if (socket.handshake.headers.cookie) {
      next()
    } else {
      next(new AuthenticationError('Unauthenticated'))
    }
  })

  // Sanitize the sessionCookie
  io.of('/ws').use((socket, next) => {
    let isValid = true
    // Extract session cookie
    const sessionCookie = socket.handshake.headers.cookie
      .split(';')
      .find((item) => item.trim().startsWith('session_8911'))
      ?.trim()
    const sessionCookieContent = sessionCookie?.substring(
      sessionCookie.indexOf('=') + 1
    )

    if (!sessionCookie) {
      isValid = false
      next(new ValidationError('Invalid token'))
    }

    if (!cookieRegex.test(sessionCookieContent)) {
      isValid = false
      next(new ValidationError('Invalid token format'))
    }

    // Validate the session cookie
    const [session, _csrfToken] = decryptSession(
      getSession(sessionCookie, cookieName?.replace('%port%', '8911'))
    )

    if (!session.id) {
      isValid = false
      next(new ValidationError('Invalid session'))
    }

    if (!tokenRegex.test(_csrfToken)) {
      isValid = false
      next(new ValidationError('Invalid session format'))
    }

    // Continue if valid
    if (isValid) {
      userId = session.id
      users.add(userId)
      next()
    }
  })

  // Limit user to 20 connections
  io.of('/ws').use(async (_, next) => {
    const sessionsCount = (await redis.call(
      'JSON.OBJLEN',
      `connections:${userId}`
    )) as number
    if (sessionsCount < 20) {
      next()
    } else {
      next(new ForbiddenError('Maximum connections reached'))
    }
  })

  function extractJson(result) {
    const extractedData = result
      .map(([_, jsonString]) => {
        if (jsonString) {
          const data = JSON.parse(jsonString)
          const { userId, status, lastOnline } = data
          return { userId, status, lastOnline }
        }
        return null // Handle cases where jsonString is null or undefined
      })
      .filter((data) => data !== null) // Filter out any null entries

    return extractedData
  }

  // Not efficient, look into RediSearch to index instead
  const emitActivity = async () => {
    // Grab all users activity and emit to all clients
    redis.multi({ pipeline: false })
    users.forEach((id) => {
      redis.call('JSON.GET', `users:${id}`)
    })
    await redis.exec((err, result) => {
      // Emit updated activity to all clients
      if (!err) {
        const resultJson = extractJson(result)
        io.of('/ws').emit('activity', resultJson)
      } else if (err) {
        logger.error('Unable to emit activity: ', err)
      }
    })
  }

  const handleDisconnect = async (uuid: string, userId: number) => {
    // Set the user to offline in Redis
    try {
      // Remove connection from Redis
      await redis.call('JSON.DEL', `connections:${userId}`, uuid)
      // Get all connections for the user
      const sessionsCount = await redis.call(
        'JSON.OBJLEN',
        `connections:${userId}`
      )
      // If no connections exist, set the status to offline
      if (sessionsCount === 0) {
        await redis.call(
          'JSON.SET',
          `users:${userId}`,
          '.status',
          JSON.stringify('OFFLINE')
        )
        await redis.call(
          'JSON.SET',
          `users:${userId}`,
          '.lastOnline',
          JSON.stringify(new Date().toISOString())
        )
        emitActivity()
      }
    } catch (err) {
      logger.error(`Unable to handle disconnect for ${uuid}: `, err)
    }
  }

  const handleActivity = async (userId: number, activity: string) => {
    // Set the user's activity in Redis
    try {
      await redis.call(
        'JSON.SET',
        `users:${userId}`,
        '.status',
        JSON.stringify(activity)
      )
      emitActivity()
    } catch (err) {
      logger.error(`Unable to handle activity for ${userId}: `, err)
    }
  }

  // New socket connection at /ws
  io.of('/ws').on('connection', async (socket) => {
    const uuid = uuidv7()

    const userObject = {
      userId: userId,
      status: 'ONLINE',
      lastOnline: new Date().toISOString(),
    }
    await redis
      .call('JSON.SET', `users:${userId}`, '.', JSON.stringify(userObject))
      .catch((err) => {
        logger.error('Unable to create user details: ', err)
      })

    // Save new connection to Redis
    const connectionObject = {
      [uuid]: {
        sid: socket.id,
      },
    }

    // Check Redis if connection key already exists, 0 false, 1 true
    const connectionExists = await redis.exists(`connections:${userId}`)

    if (connectionExists === 0) {
      // Save new connection to Redis
      await redis
        .call(
          'JSON.SET',
          `connections:${userId}`,
          '.',
          JSON.stringify(connectionObject)
        )
        .catch((err) => {
          logger.error('Unable to create connection details: ', err)
        })
    } else {
      // Merge new connection to existing user
      await redis
        .call(
          'JSON.MERGE',
          `connections:${userId}`,
          '.',
          JSON.stringify(connectionObject)
        )
        .catch((err) => {
          logger.error('Unable to merge connection details: ', err)
        })
    }

    // On connection send out most up to date user activity
    emitActivity()

    // Events
    socket.on('disconnect', () => handleDisconnect(uuid, userId))
    socket.on('activity', (activity) => handleActivity(userId, activity))
  })

  await server.start()
}

main()

Frontend component to show connections (example)

const { users } = useContext(ActivityContext)

  return (
    <>
      {users && (
        <div>
          {users.map((user) => (
            <p key={user.userId}>
              ID: {user.userId} Status: {user.status} Last Online:{' '}
              {user.lastOnline}
            </p>
          ))}
        </div>
      )}
    </>
  )

An example of users logging in and then closing their browser window (user 2):
frontend

Redis

As you can see, I’m doing quite a bit of nasty redis logic here (multi/exec). While it works, my next goal is to set up RediSearch so I can just index these json documents and have a much better flow on retrieving the online status for all users on the app. This is absolutely important in a production environment where hundreds of connections could be happening and a multi/exec situation isn’t ideal.
What it looks like on redis:
redis
No connected sessions:
session-off
Connected sessions:
session-on

Security implementations

  • Authentication works as described in my previous post, the socket.io middleware correctly allows connections that meet those checks.
  • Connections are only allowed on my app’s domains, so someone can’t just fire up a client and hit the server continuously.
  • Connections are going through TLS with a secure cookie. The cookie is sanitized (regex), decrypted and validated.
  • Connections are tracked by user in Redis, every time a new connections is made by the same user the server checks if the user already has 20 open connections, if they do the connection is decline. This prevents someone opening 100 tabs, so it’s like a soft rate limit.

The only thing remaining is implementing RediSearch and authorization logic. I have the userId and functions ready to check permissions, I’m just thinking about the load if I made queries to the postgres db where the permissions per user are connected every time a user makes a request through the socket connection.

I could replicate the permissions into redis. So when a new row or update is inserted into my postgres bring them into redis and then I can query based on the userId to see what that user is allowed to do on the websocket server.

1 Like