# Using AWS Lambda and DynamoDB to Handle Websocket Connections
# Lambda as Websocket Handler
AWS provides ability to handle websocket connections using serverless services only: Lambda functions and API Gateway. Most likely you will also need DynamoDB, though this isn't strictly required.
This example shows Serverless Framework (opens new window) configuration file to provision required AWS services, and JavaScript Lambda function that handles websocket connections.
Your serverless.yml should look as follows:
service: backend
provider:
name: aws
region: us-east-2
stage: dev
runtime: nodejs12.x
functions:
default:
handler: src/handler.default
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default
We configured the same Lambda function to handle connect, disconnect, and default (message receiving) routes. Note handler: src/handler.default above: src/handler is the filename where the Lambda function is defined, and default is the name of the function itself.
In the example we echo back the message we receive via websocket: event.body contains the message, event.requestContext.connectionId - current connection id.
src/handler.js:
'use strict';
const response = (code, body) => {
return {
statusCode: code,
body: body
};
}
const echoReply = (event) => {
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});
await apigwManagementApi.postToConnection({
ConnectionId: event.requestContext.connectionId,
Data: event.body
}).promise();
}
exports.default = async (event, context) => {
switch(event.requestContext.routeKey) {
case '$connect':
// TODO
break;
case '$disconnect':
// TODO
break;
case '$default':
default:
echoReply(event);
}
// Return a 200 status to tell API Gateway the message was processed successfully.
// Otherwise, API Gateway will return a 500 to the client.
return response(200, "Ok.");
}
# DynamoDB for ConnectionIds
You will most likely want to store all active connection ids to database (i.e. DynamoDB), so you can forward messages between the connected clients, instead of simply echoing it back to the same connection.
In this case your serverless.yml should look as follows:
service: backend
provider:
name: aws
region: us-east-2
stage: dev
runtime: nodejs12.x
environment:
CONNECTIONS_TABLE: ${self:service}-${opt:stage, self:provider.stage}-connections
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
Resource: "arn:aws:dynamodb:${opt:region, self:provider.region}:*:table/${self:provider.environment.CONNECTIONS_TABLE}"
functions:
default:
handler: src/handler.default
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default
resources:
Resources:
ConnectionsTable:
Type: 'AWS::DynamoDB::Table'
DeletionPolicy: Retain
Properties:
AttributeDefinitions:
- AttributeName: connectionId
AttributeType: S
KeySchema:
- AttributeName: connectionId
KeyType: HASH
BillingMode: PAY_PER_REQUEST
TableName: ${self:provider.environment.CONNECTIONS_TABLE}
src/handler.js:
'use strict';
const Conn = require('./db/connections');
const response = (code, body) => {
return {
statusCode: code,
body: body
};
}
const connect = async (event) => {
try {
await Conn.createConnection(event.requestContext.connectionId);
} catch (e) {
return response(500, 'Failed to connect: ' + JSON.stringify(e));
}
return response(200, 'Connected.');
};
const disconnect = async (event) => {
try {
await Conn.deleteConnection(event.requestContext.connectionId);
} catch (err) {
return response(500, 'Failed to disconnect: ' + JSON.stringify(err));
}
return response(200, 'Disconnected.');
};
const sendMessage = async (event) => {
let connectionIds = await Conn.getConnectionIds();
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});
const postCalls = connectionIds.map(async (item) => {
try {
console.log(`Sending to ${item.connectionId}: ${event.body}`);
await apigwManagementApi.postToConnection({
ConnectionId: item.connectionId,
Data: event.body
}).promise();
} catch (e) {
if (e.statusCode === 410) {
console.info(`Deleting stale connection: ${item.connectionId}`);
await Conn.deleteConnection(item.connectionId);
} else {
throw e;
}
}
});
try {
await Promise.all(postCalls);
} catch (e) {
return response(500, e.stack);
}
return response(200, 'Data sent.');
};
exports.default = async (event, context) => {
let resp;
switch(event.requestContext.routeKey) {
case '$connect':
resp = connect(event);
break;
case '$disconnect':
resp = disconnect(event);
break;
case '$default':
default:
resp = sendMessage(event);
}
// Return a 200 status to tell API Gateway the message was processed successfully.
// Otherwise, API Gateway will return a 500 to the client.
return resp;
}
It is not a bad idea to create a separate module to handle database.
src/db/connections.js:
'use strict';
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });
const { CONNECTIONS_TABLE } = process.env;
/**
* @param {string} connectionId
*/
const createConnection = async (connectionId) => {
await ddb.put({
TableName: CONNECTIONS_TABLE,
Item: {
connectionId: connectionId
}
}).promise();
};
/**
* @param {string} connectionId
*/
const deleteConnection = async (connectionId) => {
await ddb.delete({
TableName: CONNECTIONS_TABLE,
Key: {
connectionId: connectionId
}
}).promise();
};
/**
* @returns {Promise<Array<{connectionId:string}>>}
*/
const getConnectionIds = async () => {
let scanData = await ddb.scan({
TableName: CONNECTIONS_TABLE,
ProjectionExpression: 'connectionId'
}).promise();
return scanData.Items;
};
module.exports = {
createConnection,
deleteConnection,
getConnectionIds
};