AWS IoT 制作 Serverless 的异步事件系统

事件系统通过遵循在系统内定义的规则来接收和处理事件。所有的处理都通过异步发生。当事件发送到系统时,它会在某个时间点处理,但是不会立即得到响应。如果要构建可扩展解决方案,异步处理有其优势,因为它可以免除立即响应的负担。而不是,对它们进行排队,并尽可能快地处理它们。在本文中,我将演示如何使用 AWS IoT 服务来处理与 IoT 无关的事件。我还将使用新的 Serverless 应用程序模型(SAM)部署解决方案。不用说,该解决方案是 Serverless的,并且每天最多能承载 1 百万个事件的工作负载,具有高成本效益。

AWS IoT 如何工作

AWS IoT 可以做很多事情,但在这里我关注消息(message),主题(topic),规则(rule)和操作(actions)。

一条消息被发送到一个主题。在这种情况下,消息是一个事件。AWS IoT 可以处理 JSON,所以我选择 JSON 作为我的数据表示。

一个主题有一个名称,如 event/buy ,您可以看到,您可以使用最多 7 个斜杠(/)添加层次结构。

一个规则订阅主题,并在收到消息时触发操作。简单的规则可以订阅主题事件 event/buy。但是您也可以使用通配符,例如 event/+event/*。 + 用于正好一个的层次结构,而 * 则匹配任意数量的层次结构。

AWS IoT 带有许多内置动作。在这里稍微提一下:

  • 将消息写入 DynamoDB。
  • 将消息作为文件保存到S3。
  • 发送消息到 SNS。
  • 调用一个 Lambda 函数来处理消息。

所以一个消息被发送到一个主题。 如果规则与主题的名称相匹配,则会触发已定义的操作。 这是一个事件系统,不是吗?

架构

我在本文中设计的事件系统,可以处理在交易所生成的事件,如买卖事件。将所有事件存储在持久存储上以进行存档是很重要的。由于某些原因购买事件需要检查欺诈。如果检测到欺诈事件,则必须通知外部系统。 下图显示了系统的体系结构:

物联网事件系统架构

事件流

  1. 这一切都从 HTTP API(由 API 网关提供)开始,它为每个 HTTP POST 调用 /event 触发 Lambda 函数。
  2. Lambda event-api 进行一些输入验证。根据有效载荷,事件发布在诸如 event/buy, event/sell 之类的主题上...
  3. 一个规则通过动作订阅所有的 event/+ 来将消息写入 DynamoDB。
  4. 另一个规则只订阅 event/buy 主题,并为每个消息触发 event-buy 的 Lambda 事件。
  5. buy-event Lambda 决定事件是否是欺诈。如果是欺诈,它会将事件发布到 alert/fraud 主题。
  6. 一个规则订阅 alert/fraud 主题并触发两个操作:将消息保存到 S3 并将事件发送到 SNS。

实现

我使用全新的 Serverless Application Model(SAM)作为这个例子。SAM 建立在 CloudFormation 之上,所以大部分有趣的部分发生在文件中,我将命名为 template.yml

您可以在 GitHub 上找到完整的源代码:https://github.com/michaelwittig/sam-iot-example

第一个文件定义了 Lambda 函数中需要的 Node.js 依赖关系:

{
  "name": "sam-iot-example",
  "version": "1.0.0",
  "author": "Michael Wittig",
  "license": "MIT",
  "dependencies": {
    "uuid": "3.0.1"
  },
  "devDependencies": {
    "aws-sdk": "2.6.9"
  }
}

REST API

以下代码展示了如何用 SAM 定义一个 API 网关:

---
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Resources:
  EventApiLambda:
    Type: 'AWS::Serverless::Function'
    Properties:
      Handler: 'event-api-handler.create'
      Runtime: 'nodejs4.3'
      Policies:
      - Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Action: 'iot:Publish'
          Resource: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:topic/event/*'
        - Effect: Allow
          Action: 'iot:DescribeEndpoint'
          Resource: '*'
      Events:
        Http:
          Type: Api
          Properties:
            Path: /event
            Method: post


And here comes the implementation that will run inside Lambda. The name of the file must match with the Handler from above.

Event-API-Handler.js
'use strict';
const uuid = require('uuid');
const cache = require('./cache.js');
function publish(iotdata, payload) {
  return iotdata.publish({
    topic: `event/${payload.type}`,
    qos: 0,
    payload: JSON.stringify(payload),
  }).promise();
}
module.exports.create = (event, context, cb) => {
  console.log(JSON.stringify(event));
  try {
    var payload = JSON.parse(event.body); // safely parse body JSON
  } catch(err) {
    cb(null, {statusCode: 400});
    return;
  }
  if (payload.id === undefined || payload.id === null) {
    payload.id = uuid.v4(); // assign id if no id was passed in
  }
  if (payload.type === undefined || payload.type === null) {
    cb(null, {statusCode: 400});
  } else {
    cache.iotdata
      .then((iotdata) => publish(iotdata, payload))
      .then(() => cb(null, {statusCode: 204}))
      .catch((err) => cb(err));
  }
};

由于篇幅所限,这里就不放其它代码了。详细见 GitHub。

部署

确保你的 AWS CLI 是最新的。否则,您可能不支持 SAM:

sudo pip install --upgrade awscli

再选择一个支持 AWS IoT 的地区:

export AWS_DEFAULT_REGION=us-east-1

然后,创建一个 s3 bucket:

aws s3 mb s3://$USER-artifacts

clone 代码 :

git clone git@github.com:michaelwittig/sam-iot-example.git
cd sam-iot-example/

安装 Node.js 依赖:

npm install --production

使用 SAM 部署模板:

aws cloudformation package --template-file template.yml --output-template-file template.tmp.yml --s3-bucket "$USER-artifacts"
aws cloudformation deploy --template-file template.tmp.yml --stack-name sam-iot-example --capabilities CAPABILITY_IAM

完成,现在你已经有了一个事件系统。

使用

  1. 打开 https://console.aws.amazon.com/apigateway/
  2. 选择 sam-iot-example
  3. 选择 /event -> POST 然后点击 Test
  4. 填入请求包:{"type":"buy","price":123.45}, 然后提交几次。
  5. 打开 https://console.aws.amazon.com/dynamodb/
  6. 选择 Tables
  7. 选择 sam-iot-example-EventTable- 开头的表
  8. 点击列表,你会看到一些事件
  9. 打开 https://console.aws.amazon.com/s3/
  10. 选择 sam-iot-example-archivefraudbucket- 开头的 bucket
  11. 你应该看到一些欺诈事件

清理

使用 AWS 管理控制台从 S3 Bucket 中删除归档事件。bucket 的名称以 sam-iot-example-archivefraudbucket- 开始。

然后删除栈:

aws cloudformation delete-stack --stack-name sam-iot-example

再删除 bucket:

aws s3 rb s3://$USER-artifacts --force
  • 无服务器应用程序模型(SAM)使得部署CloudFormation模板非常简单。 我喜欢。
  • 使用主题来解耦事件系统非常强大。如果业务流程发生变化或只是其他人对系统中的事件感兴趣,则可以随时添加主题规则和操作。保持 Lambda 精简。小 Lambda 比大的更好。
  • 事件系统对于每天最多 1 百万个消息的工作负载是非常划算的。如果您想要处理的更多,我建议您首先计算成本并将其与使用 Kinesis 的架构进行比较。

原文链接:https://dzone.com/articles/using-aws-iot-minus-the-iot-for-serverless-async-event-systems

1 人评价

观光\评论区

Copyright © 2017 玩点什么. All Rights Reserved.