MiniFi 可以让你使用 Raspberry Pi、ASUS Ticker 开发板或者 BeagleBone Black 等小型设备以安全可控的方式采集数据,并允许完整的数据保管链。
这是我之前使用 Python 发送 MQTT 消息的方法的一个很好的更新。
我们将在 Apache NiFi 中建立一个流程,然后导出模板。使用 MiniFi 工具,我们将其转换成一个 config.yaml
文件,并通过 SCP 发送给我们的设备。你可以在第 1 部分看到这个。这个简单的流程调用一个 shell 脚本,它将运行一个 Python 脚本来获取我们的传感器数据。然后这个流程将通过基于 HTTP 之上的 S2S 将数据发送到我们的 NiFi 服务器。
我在这部分添加的是使用新的 Record 和 Schema 范例,以及使用 QueryRecord 对输入流文件进行 SQL 查询的能力。它需要为我们的数据建立一个 AVRO 模式,这是一个非常简单的 JSON 定义。
我们设置了端口来将 MiniFi 代理连接到我们的服务器。
数据很快将开始进入。
CREATE EXTERNAL TABLE IF NOT EXISTS sensor (tempf FLOAT, cputemp FLOAT, ts STRING, pressure FLOAT, host STRING, pitch FLOAT, ipaddress STRING, temp FLOAT, diskfree STRING, yaw FLOAT, humidity FLOAT, memory FLOAT, y FLOAT, x FLOAT, z FLOAT, roll FLOAT) STORED AS ORC
我抓住 HDFS 位置并将其添加到 DDL:LOCATION'/ sensor'
。
对于 AVRO 和 JSON 版本的数据,我制作了类似的表格。
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION '/jsonsensor';
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS AVRO LOCATION '/avrosensor';
pip install --upgrade sense-hat
pip install --upgrade pillow
pip install rtimulib
pip install psutil
sudo apt-get install oracle-java8-jdk
python /opt/demo/rpi-sensehat-mqtt-nifi/sense2.py
from sense_hat import SenseHat
import json
import sys, socket
import os
import psutil
import subprocess
import time
import datetime
from time import sleep
from time import gmtime, strftime
# get data
current_milli_time = lambda: int(round(time.time() * 1000))
# yyyy-mm-dd hh:mm:ss
currenttime = strftime("%Y-%m-%d %H:%M:%S", gmtime())
host = os.uname()[1]
rasp = ('armv' in os.uname()[4])
cpu = psutil.cpu_percent(interval=1)
if rasp:
f = open('/sys/class/thermal/thermal_zone0/temp', 'r')
l = f.readline()
ctemp = 1.0 * float(l) / 1000
usage = psutil.disk_usage("/")
mem = psutil.virtual_memory()
diskrootfree = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
mempercent = mem.percent
external_IP_and_port = ('198.41.0.4', 53) # a.root-servers.net
socket_family = socket.AF_INET
p = subprocess.Popen(['/opt/vc/bin/vcgencmd', 'measure_temp'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# out, err = p.communicate()
def IP_address():
try:
s = socket.socket(socket_family, socket.SOCK_DGRAM)
s.connect(external_IP_and_port)
answer = s.getsockname()
s.close()
return answer[0] if answer else None
except socket.error:
return None
ipaddress = IP_address()
sense = SenseHat()
sense.clear()
temp = sense.get_temperature()
temp = round(temp, 2)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
# cputemp = out
x = round(x, 0)
y = round(y, 0)
z = round(z, 0)
pitch = round(pitch, 0)
roll = round(roll, 0)
yaw = round(yaw, 0)
row = {'ts': currenttime, 'host': host, 'memory': mempercent, 'diskfree': diskrootfree, 'cputemp': round(ctemp, 2),
'ipaddress': ipaddress, 'temp': temp, 'tempf': round(((temp * 1.8) + 12), 2), 'humidity': humidity,
'pressure': pressure, 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z}
json_string = json.dumps(row)
print(json_string)
{
"cputemp": 52.08,
"diskfree": "1211.8 MB",
"host": "picroft",
"humidity": 41.5,
"ipaddress": "192.168.1.156",
"memory": 23.0,
"pitch": 1.0,
"pressure": 0.0,
"roll": 1.0,
"temp": 35.08,
"tempf": 75.14,
"ts": "2017-06-16 17:39:08",
"x": -1.0,
"y": 0.0,
"yaw": 55.0,
"z": 1.0
}
{
"fields": [
{
"name": "tempf",
"type": "float"
},
{
"name": "cputemp",
"type": "float"
},
{
"name": "ts",
"type": "string"
},
{
"name": "pressure",
"type": "float"
},
{
"name": "host",
"type": "string"
},
{
"name": "pitch",
"type": "float"
},
{
"name": "ipaddress",
"type": "string"
},
{
"name": "temp",
"type": "float"
},
{
"name": "diskfree",
"type": "string"
},
{
"name": "yaw",
"type": "float"
},
{
"name": "humidity",
"type": "float"
},
{
"name": "memory",
"type": "float"
},
{
"name": "y",
"type": "float"
},
{
"name": "x",
"type": "float"
},
{
"name": "z",
"type": "float"
},
{
"name": "roll",
"type": "float"
}
],
"name": "sensehat",
"namespace": "hortonworks.hdp.refapp.sensehat",
"type": "record"
}
MiNiFi Config Version: 2
Flow Controller:
name: sense hat
comment: sense hat 2017
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
max concurrent threads: 1
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Provenance Repository:
provenance rollover time: 1 min
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: ''
keystore type: ''
keystore password: ''
key password: ''
truststore: ''
truststore type: ''
truststore password: ''
ssl protocol: ''
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors:
- id: db6fbd3b-ddf4-3041-0000-000000000000
name: ExecuteProcess
class: org.apache.nifi.processors.standard.ExecuteProcess
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 60 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Argument Delimiter: ' '
Batch Duration:
Command: /opt/demo/rpi-sensehat-mqtt-nifi/sense2.sh
Command Arguments:
Redirect Error Stream: 'true'
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: 5635290a-4cb6-3da7-0000-000000000000
name: minifiSenseHat
source id: db6fbd3b-ddf4-3041-0000-000000000000
source relationship names:
- success
destination id: 166616e3-1962-1660-2b7c-2f824584b23a
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- id: fdc45649-84be-374b-0000-000000000000
name: ''
url: http://hw13125.local:8080/nifi
comment: ''
timeout: 30 sec
yield period: 10 sec
transport protocol: HTTP
Input Ports:
- id: 166616e3-1962-1660-2b7c-2f824584b23a
name: MiniFi SenseHat
comment: ''
max concurrent tasks: 1
use compression: false
minifi-toolkit-1.0.2.1.4.0-5/bin/config.sh transform sensorminifi.xml config.yml
https://github.com/tspannhw/rpi-sensehat-minifi-python
dResourceClaim[id=1497645887239-1, container=default, section=1], offset=2501, length=278],offset=0,name=13917785142443,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 116 milliseconds at a rate of 2.32 KB/sec
2017-06-16 20:54:41,827 INFO [Provenance Maintenance Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 3162
2017-06-16 20:54:41,844 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully merged 16 journal files (3 records) into single Provenance Log File provenance_repository/3159.prov in 33 milliseconds
2017-06-16 20:54:41,846 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 5 records
2017-06-16 20:54:43,288 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@87eb01 checkpointed with 0 Records and 0 Swap Files in 100 milliseconds (Stop-the-world time = 13 milliseconds, Clear Edit Logs time = 10 millis), max Transaction ID -1
2017-06-16 20:54:48,429 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2017-06-16 20:54:48,890 INFO [pool-23-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@17461db checkpointed with 0 Records and 0 Swap Files in 460 milliseconds (Stop-the-world time = 190 milliseconds, Clear Edit Logs time = 77 millis), max Transaction ID 2107
2017-06-16 20:54:48,891 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 461 milliseconds
2017-06-16 20:54:51,482 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@f69f9d Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-16 20:55:07,621 INFO [Timer-Driven Process Thread-9] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
2017-06-16 20:55:07,957 INFO [Timer-Driven Process Thread-9] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi SenseHat,target=http://hw13125.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=b3bcd211-7425-4750-9e4c-ba2d477b9cc1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1497645887239-1, container=default, section=1], offset=2779, length=278],offset=0,name=13979556432846,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 114 milliseconds at a rate of 2.38 KB/sec
root@picroft:/opt/demo/minifi-1.0.2.1.4.0-5# bin/minifi.sh flowStatus processor:db6fbd3b-ddf4-3041-0000-000000000000:health,stats,bulletins
minifi.sh: JAVA_HOME not set; results may vary
Bootstrap Classpath: /opt/demo/minifi-1.0.2.1.4.0-5/conf:/opt/demo/minifi-1.0.2.1.4.0-5/lib/bootstrap/*:/opt/demo/minifi-1.0.2.1.4.0-5/lib/*
Java home:
MiNiFi home: /opt/demo/minifi-1.0.2.1.4.0-5
Bootstrap Config File: /opt/demo/minifi-1.0.2.1.4.0-5/conf/bootstrap.conf
FlowStatusReport{controllerServiceStatusList=null, processorStatusList=[{name='ExecuteProcess', processorHealth={runStatus='Running', hasBulletins=false, validationErrorList=[]}, processorStats={activeThreads=0, flowfilesReceived=0, bytesRead=0, bytesWritten=1390, flowfilesSent=0, invocations=5, processingNanos=9290051632}, bulletinList=[]}], connectionStatusList=null, remoteProcessGroupStatusList=null, instanceStatus=null, systemDiagnosticsStatus=null, reportingTaskStatusList=null, errorsGeneratingReport=[]}
使用 Apache NiFi 生成的 DDL,我们可以为原始 JSON 数据,ORC 清理版本的数据以及数据的 AVRO 版本创建外部 Hive 表。
我们可以查询我们的数据集:
观光\评论区