How to use Kinesis data streams in your serverless app
In this example we will look at how to create a Kinesis Data Stream in our serverless app using Serverless Stack (SST).
Requirements
- Node.js >= 10.15.1
- We’ll be using TypeScript
- An AWS account with the AWS CLI configured locally
Create an SST app
Let’s start by creating an SST app.
$ npm init sst -- typescript-starter kinesisstream
$ cd kinesisstream
By default our app will be deployed to an environment (or stage) called dev
and the us-east-1
AWS region. This can be changed in the sst.json
in your project root.
{
"name": "kinesisstream",
"region": "us-east-1",
"main": "stacks/index.ts"
}
Project layout
An SST app is made up of two parts.
-
stacks/
— App InfrastructureThe code that describes the infrastructure of your serverless app is placed in the
stacks/
directory of your project. SST uses AWS CDK, to create the infrastructure. -
backend/
— App CodeThe code that’s run when your API is invoked is placed in the
backend/
directory of your project.
Adding a Kinesis Data Stream
Amazon Kinesis Data Streams is a serverless streaming data service that makes it easy to capture, process, and store data streams at any scale. And you won’t get charged if you are not using it.
Replace the stacks/MyStack.ts
with the following.
import { Api, KinesisStream, StackContext } from "@serverless-stack/resources";
export function MyStack({ stack }: StackContext) {
// create a kinesis stream
const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "functions/consumer1.handler",
consumer2: "functions/consumer2.handler",
},
});
}
This creates an Kinesis Data Stream using KinesisStream
and it has a consumer that polls for messages from the Kinesis Data Stream. The consumer function will run when it has polled 1 or more messages.
Setting up the API
Now let’s add the API.
Add this below the KinesisStream
definition in stacks/MyStack.ts
.
// Create a HTTP API
const api = new Api(stack, "Api", {
defaults: {
function: {
environment: {
streamName: stream.streamName,
},
},
},
routes: {
"POST /": "functions/lambda.handler",
},
});
api.attachPermissions([stream]);
// Show the endpoint in the output
stack.addOutputs({
ApiEndpoint: api.url,
});
Our API simply has one endpoint (the root). When we make a POST
request to this endpoint the Lambda function called handler
in backend/functions/lambda.ts
will get invoked.
We also pass in the stream name to our API as an environment variable called streamName
. And we allow our API to send messages to the Kinesis Data Stream we just created.
Adding function code
We will create three functions, one for handling the API request, and the other two for the consumers.
Replace the backend/functions/lambda.ts
with the following.
export async function handler() {
console.log("Message queued!");
return {
statusCode: 200,
body: JSON.stringify({ status: "successful" }),
};
}
Add a backend/functions/consumer1.ts
.
export async function handler() {
console.log("Message 1 processed!");
return {};
}
Add a backend/functions/consumer2.ts
.
export async function handler() {
console.log("Message 2 processed!");
return {};
}
Now let’s test our new API.
Starting your dev environment
SST features a Live Lambda Development environment that allows you to work on your serverless apps live.
$ npm start
The first time you run this command it’ll take a couple of minutes to deploy your app and a debug stack to power the Live Lambda Development environment.
===============
Deploying app
===============
Preparing your SST app
Transpiling source
Linting source
Deploying stacks
dev-kinesisstream-my-stack: deploying...
✅ dev-kinesisstream-my-stack
Stack dev-kinesisstream-my-stack
Status: deployed
Outputs:
ApiEndpoint: https://i8ia1epqnh.execute-api.us-east-1.amazonaws.com
The ApiEndpoint
is the API we just created.
Let’s test our endpoint with the SST Console. The SST Console is a web based dashboard to manage your SST apps. Learn more about it in our docs.
Go to the Functions tab and click the Invoke button of the POST /
function to send a POST
request.
After you see a success status in the logs, go to the Local tab in the console to see all function invocations. Local tab displays real-time logs from your Live Lambda Dev environment.
You should see Message queued!
logged in the console.
Sending messages to our Kinesis Data Stream
Now let’s send a message to our Kinesis Data Stream.
Replace the backend/functions/lambda.ts
with the following.
import AWS from "aws-sdk";
const stream = new AWS.Kinesis();
export async function handler() {
await stream
.putRecord({
Data: JSON.stringify({
message: "Hello from Lambda!",
}),
PartitionKey: "key",
StreamName: process.env.streamName,
})
.promise();
console.log("Message queued!");
return {
statusCode: 200,
body: JSON.stringify({ status: "successful" }),
};
}
Here we are getting the Kinesis Data Stream name from the environment variable, and then sending a message to it.
Let’s install the aws-sdk
package in the backend/
folder.
$ npm install aws-sdk
And now if you head over to your console and invoke the function again. You’ll notice in the Local tab that our consumers are called. You should see Message 1 processed!
and Message 2 processed!
being printed out.
Deploying to prod
To wrap things up we’ll deploy our app to prod.
$ npm run deploy -- --stage prod
This allows us to separate our environments, so when we are working in dev, it doesn’t break the API for our users.
Cleaning up
Finally, you can remove the resources created in this example using the following commands.
$ npm run remove
$ npm run remove -- --stage prod
Conclusion
And that’s it! We’ve got a completely serverless Kinesis Data Stream system. Check out the repo below for the code we used in this example. And leave a comment if you have any questions!
Example repo for reference
github.com/serverless-stack/serverless-stack/tree/master/examples/kinesisstreamFor help and discussion
Comments on this exampleMore Examples
APIs
-
REST API
Building a simple REST API.
-
WebSocket API
Building a simple WebSocket API.
-
Go REST API
Building a REST API with Golang.
-
Custom Domains
Using a custom domain in an API.
Web Apps
-
React.js
Full-stack React app with a serverless API.
-
Next.js
Full-stack Next.js app with DynamoDB.
-
Vue.js
Full-stack Next.js app with a serverless API.
-
Svelte
Full-stack Svelte app with a serverless API.
-
Gatsby
Full-stack Gatsby app with a serverless API.
-
Angular
Full-stack Angular app with a serverless API.
Mobile Apps
GraphQL
Databases
-
DynamoDB
Using DynamoDB in a serverless API.
-
MongoDB Atlas
Using MongoDB Atlas in a serverless API.
-
PostgreSQL
Using PostgreSQL and Aurora in a serverless API.
-
CRUD DynamoDB
Building a CRUD API with DynamoDB.
-
PlanetScale
Using PlanetScale in a serverless API.
Authentication
Using AWS IAM
-
Cognito IAM
Authenticating with Cognito User Pool and Identity Pool.
-
Facebook Auth
Authenticating a serverless API with Facebook.
-
Twitter Auth
Authenticating a serverless API with Twitter.
-
Auth0 IAM
Authenticating a serverless API with Auth0.
Using JWT
-
Cognito JWT
Adding JWT authentication with Cognito.
-
Auth0 JWT
Adding JWT authentication with Auth0.
-
Google Auth
Authenticating a full-stack serverless app with Google.
-
GitHub Auth
Authenticating a full-stack serverless app with GitHub.
Async Tasks
-
Cron
A simple serverless Cron job.
-
Queues
A simple queue system with SQS.
-
Pub/Sub
A simple pub/sub system with SNS.
-
Resize Images
Automatically resize images uploaded to S3.
-
EventBus
A simple EventBridge system with EventBus.
Editors
-
Debug With VS Code
Using VS Code to debug serverless apps.
-
Debug With WebStorm
Using WebStorm to debug serverless apps.
-
Debug With IntelliJ
Using IntelliJ IDEA to debug serverless apps.
Monitoring
Miscellaneous
-
Lambda Layers
Using the chrome-aws-lambda layer to take screenshots.
-
Middy Validator
Use Middy to validate API request and responses.