边缘设备教程:在 Edge Devices 上使用 Apache MiniFi

MiniFi 可以让你使用 Raspberry Pi、ASUS Ticker 开发板或者 BeagleBone Black 等小型设备以安全可控的方式采集数据,并允许完整的数据保管链。

这是我之前使用 Python 发送 MQTT 消息的方法的一个很好的更新。

我们将在 Apache NiFi 中建立一个流程,然后导出模板。使用 MiniFi 工具,我们将其转换成一个 config.yaml 文件,并通过 SCP 发送给我们的设备。你可以在第 1 部分看到这个。这个简单的流程调用一个 shell 脚本,它将运行一个 Python 脚本来获取我们的传感器数据。然后这个流程将通过基于 HTTP 之上的 S2S 将数据发送到我们的 NiFi 服务器。

我在这部分添加的是使用新的 Record 和 Schema 范例,以及使用 QueryRecord 对输入流文件进行 SQL 查询的能力。它需要为我们的数据建立一个 AVRO 模式,这是一个非常简单的 JSON 定义。

RPI Sense Hat Minifi

我们设置了端口来将 MiniFi 代理连接到我们的服务器。

RPi MiniFi Record Ingest SenseHat

数据很快将开始进入。

通过 S2S 在 NiFi 中接收 JSON 消息

Sense Hat Flow Overview

Avro Schema Registry Sense Hat

在几个小时内摄取传感器数据的重要步骤

  • 连接到端口。
  • 设置一个模式从注册表中提取; 还为 JSON 设置了一个 MIME 类型。
  • 查询流程文件,通过 Apache Calcite 处理的 SQL,取得超过华氏 65 度的温度。这些使用 AvoRecordSetWriter 和 AvroSchemaRegistry 的模式生成 AVRO 文件。
  • 将生成的 AVRO 文件存储到 HDFS。
  • 将发送的原始 JSON 文件存储到 HDFS 中。
  • 将 AVRO 文件转换为 ORC。
  • 将 ORC 文件存储到 HDFS。
  • 抓住自动生成的 hive.ddl 来创建外部表。
  • 在 Zeppelin 中查询传感器数据。

hive.ddl自动生成的Hive模式

CREATE EXTERNAL TABLE IF NOT EXISTS sensor (tempf FLOAT, cputemp FLOAT, ts STRING, pressure FLOAT, host STRING, pitch FLOAT, ipaddress STRING, temp FLOAT, diskfree STRING, yaw FLOAT, humidity FLOAT, memory FLOAT, y FLOAT, x FLOAT, z FLOAT, roll FLOAT) STORED AS ORC

我抓住 HDFS 位置并将其添加到 DDL:LOCATION'/ sensor'

对于 AVRO 和 JSON 版本的数据,我制作了类似的表格。

ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'    LOCATION '/jsonsensor';         
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'    STORED AS AVRO    LOCATION '/avrosensor';

安装库

pip install --upgrade sense-hat
pip install --upgrade pillow
pip install rtimulib    
pip install psutil    

sudo apt-get install oracle-java8-jdk

Shell 脚本

python /opt/demo/rpi-sensehat-mqtt-nifi/sense2.py

Python 脚本

from sense_hat import SenseHat
import json
import sys, socket
import os
import psutil
import subprocess
import time
import datetime
from time import sleep
from time import gmtime, strftime

# get data
current_milli_time = lambda: int(round(time.time() * 1000))
# yyyy-mm-dd hh:mm:ss
currenttime = strftime("%Y-%m-%d %H:%M:%S", gmtime())
host = os.uname()[1]
rasp = ('armv' in os.uname()[4])
cpu = psutil.cpu_percent(interval=1)
if rasp:
  f = open('/sys/class/thermal/thermal_zone0/temp', 'r')
  l = f.readline()

  ctemp = 1.0 * float(l) / 1000
  usage = psutil.disk_usage("/")
  mem = psutil.virtual_memory()
  diskrootfree = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
  mempercent = mem.percent
  external_IP_and_port = ('198.41.0.4', 53)  # a.root-servers.net
  socket_family = socket.AF_INET
  p = subprocess.Popen(['/opt/vc/bin/vcgencmd', 'measure_temp'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)


  # out, err = p.communicate()
  def IP_address():
    try:
      s = socket.socket(socket_family, socket.SOCK_DGRAM)
      s.connect(external_IP_and_port)
      answer = s.getsockname()
      s.close()

      return answer[0] if answer else None

    except socket.error:
      return None


  ipaddress = IP_address()
  sense = SenseHat()
  sense.clear()
  temp = sense.get_temperature()
  temp = round(temp, 2)
  humidity = sense.get_humidity()
  humidity = round(humidity, 1)
  pressure = sense.get_pressure()
  pressure = round(pressure, 1)
  orientation = sense.get_orientation()
  pitch = orientation['pitch']
  roll = orientation['roll']
  yaw = orientation['yaw']
  acceleration = sense.get_accelerometer_raw()
  x = acceleration['x']
  y = acceleration['y']
  z = acceleration['z']
  # cputemp = out
  x = round(x, 0)
  y = round(y, 0)
  z = round(z, 0)
  pitch = round(pitch, 0)
  roll = round(roll, 0)
  yaw = round(yaw, 0)
  row = {'ts': currenttime, 'host': host, 'memory': mempercent, 'diskfree': diskrootfree, 'cputemp': round(ctemp, 2),
         'ipaddress': ipaddress, 'temp': temp, 'tempf': round(((temp * 1.8) + 12), 2), 'humidity': humidity,
         'pressure': pressure, 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z}
  json_string = json.dumps(row)
  print(json_string)

一个 Record(JSON)

{
  "cputemp": 52.08,
  "diskfree": "1211.8 MB",
  "host": "picroft",
  "humidity": 41.5,
  "ipaddress": "192.168.1.156",
  "memory": 23.0,
  "pitch": 1.0,
  "pressure": 0.0,
  "roll": 1.0,
  "temp": 35.08,
  "tempf": 75.14,
  "ts": "2017-06-16 17:39:08",
  "x": -1.0,
  "y": 0.0,
  "yaw": 55.0,
  "z": 1.0
}

一个 AVRO Schema

{
  "fields": [
    {
      "name": "tempf",
      "type": "float"
    },
    {
      "name": "cputemp",
      "type": "float"
    },
    {
      "name": "ts",
      "type": "string"
    },
    {
      "name": "pressure",
      "type": "float"
    },
    {
      "name": "host",
      "type": "string"
    },
    {
      "name": "pitch",
      "type": "float"
    },
    {
      "name": "ipaddress",
      "type": "string"
    },
    {
      "name": "temp",
      "type": "float"
    },
    {
      "name": "diskfree",
      "type": "string"
    },
    {
      "name": "yaw",
      "type": "float"
    },
    {
      "name": "humidity",
      "type": "float"
    },
    {
      "name": "memory",
      "type": "float"
    },
    {
      "name": "y",
      "type": "float"
    },
    {
      "name": "x",
      "type": "float"
    },
    {
      "name": "z",
      "type": "float"
    },
    {
      "name": "roll",
      "type": "float"
    }
  ],
  "name": "sensehat",
  "namespace": "hortonworks.hdp.refapp.sensehat",
  "type": "record"
}

config.yml

MiNiFi Config Version: 2
Flow Controller:
  name: sense hat
  comment: sense hat 2017
Core Properties:
  flow controller graceful shutdown period: 10 sec
  flow service write delay interval: 500 ms
  administrative yield duration: 30 sec
  bored yield duration: 10 millis
  max concurrent threads: 1
FlowFile Repository:
  partitions: 256
  checkpoint interval: 2 mins
  always sync: false
  Swap:

threshold: 20000

in period: 5 sec

in threads: 1

out period: 5 sec

out threads: 4
Content Repository:
  content claim max appendable size: 10 MB
  content claim max flow files: 100
  always sync: false
Provenance Repository:
  provenance rollover time: 1 min
Component Status Repository:
  buffer size: 1440
  snapshot frequency: 1 min
Security Properties:
  keystore: ''
  keystore type: ''
  keystore password: ''
  key password: ''
  truststore: ''
  truststore type: ''
  truststore password: ''
  ssl protocol: ''
  Sensitive Props:

key:

algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL

provider: BC
Processors:
- id: db6fbd3b-ddf4-3041-0000-000000000000
  name: ExecuteProcess
  class: org.apache.nifi.processors.standard.ExecuteProcess
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 60 sec
  penalization period: 30 sec
  yield period: 1 sec
  run duration nanos: 0
  auto-terminated relationships list: []
  Properties:

Argument Delimiter: ' '

Batch Duration:

Command: /opt/demo/rpi-sensehat-mqtt-nifi/sense2.sh

Command Arguments:

Redirect Error Stream: 'true'
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: 5635290a-4cb6-3da7-0000-000000000000
  name: minifiSenseHat
  source id: db6fbd3b-ddf4-3041-0000-000000000000
  source relationship names:
  - success
  destination id: 166616e3-1962-1660-2b7c-2f824584b23a
  max work queue size: 10000
  max work queue data size: 1 GB
  flowfile expiration: 0 sec
  queue prioritizer class: ''
Remote Process Groups:
- id: fdc45649-84be-374b-0000-000000000000
  name: ''
  url: http://hw13125.local:8080/nifi
  comment: ''
  timeout: 30 sec
  yield period: 10 sec
  transport protocol: HTTP
  Input Ports:
  - id: 166616e3-1962-1660-2b7c-2f824584b23a

name: MiniFi SenseHat

comment: ''

max concurrent tasks: 1

use compression: false

从传感器 minifi.xml 构建您的 MiniFi 配置文件

minifi-toolkit-1.0.2.1.4.0-5/bin/config.sh transform sensorminifi.xml config.yml

源码

https://github.com/tspannhw/rpi-sensehat-minifi-python

示例 MiniFi Log

dResourceClaim[id=1497645887239-1, container=default, section=1], offset=2501, length=278],offset=0,name=13917785142443,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 116 milliseconds at a rate of 2.32 KB/sec
2017-06-16 20:54:41,827 INFO [Provenance Maintenance Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 3162
2017-06-16 20:54:41,844 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully merged 16 journal files (3 records) into single Provenance Log File provenance_repository/3159.prov in 33 milliseconds
2017-06-16 20:54:41,846 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 5 records
2017-06-16 20:54:43,288 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@87eb01 checkpointed with 0 Records and 0 Swap Files in 100 milliseconds (Stop-the-world time = 13 milliseconds, Clear Edit Logs time = 10 millis), max Transaction ID -1
2017-06-16 20:54:48,429 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2017-06-16 20:54:48,890 INFO [pool-23-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@17461db checkpointed with 0 Records and 0 Swap Files in 460 milliseconds (Stop-the-world time = 190 milliseconds, Clear Edit Logs time = 77 millis), max Transaction ID 2107
2017-06-16 20:54:48,891 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 461 milliseconds
2017-06-16 20:54:51,482 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@f69f9d Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-16 20:55:07,621 INFO [Timer-Driven Process Thread-9] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
2017-06-16 20:55:07,957 INFO [Timer-Driven Process Thread-9] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi SenseHat,target=http://hw13125.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=b3bcd211-7425-4750-9e4c-ba2d477b9cc1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1497645887239-1, container=default, section=1], offset=2779, length=278],offset=0,name=13979556432846,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 114 milliseconds at a rate of 2.38 KB/sec

检测 MiniFi 状态

root@picroft:/opt/demo/minifi-1.0.2.1.4.0-5# bin/minifi.sh flowStatus processor:db6fbd3b-ddf4-3041-0000-000000000000:health,stats,bulletins
minifi.sh: JAVA_HOME not set; results may vary
Bootstrap Classpath: /opt/demo/minifi-1.0.2.1.4.0-5/conf:/opt/demo/minifi-1.0.2.1.4.0-5/lib/bootstrap/*:/opt/demo/minifi-1.0.2.1.4.0-5/lib/*
Java home:
MiNiFi home: /opt/demo/minifi-1.0.2.1.4.0-5
Bootstrap Config File: /opt/demo/minifi-1.0.2.1.4.0-5/conf/bootstrap.conf

 FlowStatusReport{controllerServiceStatusList=null, processorStatusList=[{name='ExecuteProcess', processorHealth={runStatus='Running', hasBulletins=false, validationErrorList=[]}, processorStats={activeThreads=0, flowfilesReceived=0, bytesRead=0, bytesWritten=1390, flowfilesSent=0, invocations=5, processingNanos=9290051632}, bulletinList=[]}], connectionStatusList=null, remoteProcessGroupStatusList=null, instanceStatus=null, systemDiagnosticsStatus=null, reportingTaskStatusList=null, errorsGeneratingReport=[]}

在 Apache Zeppelin 工作簿中输出显示

使用 Apache NiFi 生成的 DDL,我们可以为原始 JSON 数据,ORC 清理版本的数据以及数据的 AVRO 版本创建外部 Hive 表。

Zepplin 创建表

我们可以查询我们的数据集:

数据集

英语原文链接:Using Apache MiniFi on Edge Devices: Part 1

尚未评分
您的评分将帮助我们做出更好的玩法

观光\评论区

Copyright © 2017 玩点什么. All Rights Reserved.