Distributed Control System - Conditions
Distributed Control System - Conditions
1. Intro: Conditions Database protocols using MQ-powered services. Typical workflow: sensor is booted, then it requests initial values from Conditions database, then it starts publishing updates on Conditions. At the same time, Web UI user may request to change Conditions for a specific sensor, and receive real-time updates on changes made by sensor.
2. Participating entities: Sensor, Storage service, Web UI (+ mq broker, mysql database)
2.1 Physical deployment details:
-
Sensor = code running in the online domain (daq network), TBD
-
Storage Service = C++ daemon, storage adapter handling MySQL (+MongoDB later on). Runs at onl16.starp.bnl.gov under 'dmitry' account
-
Web UI = can be run from anywhere, given that it has access to https://dashboard1.star.bnl.gov/mqtt/ for mqtt proxy service. E.g. https://dashboard1.star.bnl.gov/dcs/ . Allows to read and update (time-series versioned, so all updates are inserts) predefined Conditions structures.
-
MQ Broker = multi-protocol broker (Apache Apollo), running at onl16.starp.bnl.gov
-
MySQL service, holding Conditions database. For now, online conditions database is used. It runs at onldb.starp.bnl.gov:3501, Conditions_fps/<bla>
3. Protocols:
- transport protocol: MQTT (sensors, storage), WebSocket + MQTT (web ui)
- message serialization protocol: JSON
4. Data Request API:
- REGISTER, SET, GET, STORAGE, SENSOR
(see examples below the service layout image)
4.1 Example: to
SET value (received by sensor and storage) :
topic: dcs/set/Conditions/fps/fps_fee
message:
{
dcs_id: "<client_unique_id>",
dcs_uid: "<authenticated_login>",
dcs_header: ["field1", "field2", ... "fieldN"],
dcs_values: {
<row_id1>: [ value1, value2, ... valueN ],
...
<row_idN>: [ value1, value2, ... valueN ]
}
}
NOTE: one can set any combination of fields vs row ids, including one specific field and one specific row id.
4.2 Example: to
GET values:
topic: dcs/get/Conditions/fps/fps_fee
message:
{
dcs_id: "<client_unique_id>",
dcs_uid: "<authenticated_login>",
dcs_header: ["field1", "field2" ... "fieldN"], // alternatively, ["*"] to get all available fields
dcs_values: {
"1": true, // <-- request row_id = 1, alternatively, do dcs_values = {} to get all avaliable rows
"2": true,
...
"N": true
}
}
NOTE: one can request any combination of fields vs row ids, including one specific field and one specific row id.
4.3 Example: subscribe to response from storage:
topic: dcs/storage/Conditions/fps/fps_fee
message:
{
dcs_id: "<client_unique_id>",
dcs_uid: "<authenticated_login>",
dcs_header: ["field1", "field2", ... "fieldN"],
dcs_values: {
<row_id1>: [ value1, value2, ... valueN ],
...
<row_idN>: [ value1, value2, ... valueN ]
}
}
DCS Interface & Behaviors
DCS HW Interface and Operations
1. Hardware / PLC commands:
Format: COMMAND ( parameter1, parameter2, ... parameterN) = <description>
PV GET / SET:
- GET ( <list of PV names>, <reply-to address>)
- Requests cached value information.
- Reply instantly with the value from the in-memory copy, with name-value pairs for the requested PVs.
- If reply-to address exists, reply to the private address instead of a public channel.
- SET ( <list of PV name-value pairs> )
- Set values for the provided channels, if allowed to by internal configuration
PV SCANS:
- SCAN ( <list of PV names>, <group_reply = true|false>, <time_interval>, <reply-to address> )
- Requests one-time (if time_interval = 0) or periodic (if time_interval != 0) scan of live values.
- Register SCAN in a scan list: read out live values one by one.
- If group = true, reply once when all channels are scanned, otherwise send out individual updates.
- Reply instantly with registered SCAN_ID. If reply-to address provided, reply to the private channel instead of a public one.
- SCAN_CANCEL ( <SCAN_ID> )
- cancel existing scan for SCAN_ID if exists
- SCAN_MODIFY ( <SCAN_ID>, <group_reply = true|false>, <time_interval = milliseconds>, <reply-to address> )
- modify parameters of the defined scan
INFORMATIONAL:
- INFO_SYSTEM ()
- return the following:
- current state of the HW;
- configuration information for this HW;
- INFO_PV ( < pv_name = undefined | list of pv names > )
- if < pv_name = undefined >, return list of all configured PVs and their types;
- INFO_SCAN ( < scan_id = undefined | list of scan_ids > )
- if < scan_id = undefined > return list of all configured periodic scans from scan list, otherwise return specified scan details;
- if < scan_id = undefined > return list of all one-time scans remaining in a scan queue, otherwise return specified scan details;
OPERATIONAL:
- RESET ()
- clear all internal state variables, re-read configuration, restart operations from clean state.
- REBOOT ()
- request physical reboot of the device, if possible.
- reply with "confirmed" / "not available"
- ON ()
- request power up the device, if possible
- reply with "confirmed" / "not available"
- OFF ()
- request power off the device, if possible
- reply with "confirmed" / "not available"
2. HW internal organization:
NOTE: global variables, related to controller operations
- CONFIGURATION = list of [ < pv_name = STR > , <pv_type = INT | DBL | STR > , <pv_initial_value> , <pv_current_value> , <pv_flag = READ | WRITE | RW > ]
- i.e. list of controlled parameters;
- SCAN_LIST = list of < scan_item = [ ID, list of PV names, group = true | false, time_interval != 0, reply-to address, last_scan_ts ] >
- List/vector of registered periodic scans;
- SCAN_QUEUE = queue of <scan_item = [ ID, list of PV names, group = true | false, time_interval = 0, reply-to address ] >
- Queue of requested one-time scans;
- DEVICE_STATE = < int = ON | OFF >
- current global state of the controlled device
- DEVICE_OPERATION = < IDLE | GETTING | SETTING | SCANNING | POWERING_UP | POWERING_DOWN | RESETTING | REBOOTING >
- device current operation flag
- DEVICE_ID = < char_string >
- unique id of the controlled device
3. HW internal operations:
NOTE: implemented using FSM
I.
BOOT sequence
- INITIALIZE core parameters (device info, scan list/queue, device state)
- POPULATE list of controlled variables
- SETUP persistent scans, if requested
- GO to OPERATING sequence
II.
OPERATING sequence
- LISTEN for incoming commands for <N> microseconds (aka IDLE)
- if NETWORK ERROR occurs = pause OPERATING sequence, try RECONNECT to MQ in <N> seconds
- if RECONNECT succeeded = resume OPERATING sequence
- else GO to (1.1)
- IF external <comand> received within time interval, call <command> processing code
- ELSE proceed with <scan_queue> and <scan_list> processing
- LOOP to (1)
III.
SHUTDOWN sequence
- CLEANUP internal state
- DESTROY internal objects
- DISCONNECT from MQ service
- call POWER OFF routine
DCS protocol v1
DCS protocol v1
modeled after RESTful principles and Open Smart Grid Protocol
courtesy of Yulia Zulkarneeva, Quinta Plus Technologies LLC
I. TOPIC: <site_uid> / <protocol> / <version> / <method> / <Process_Variable_URI>
topic example: BNLSTAR / DCS / 1 / GET / Conditions / fps / fee
II. MESSAGE contents: request body, encoded as JSON/txt or MsgPack/bin formats
a) message example: {
"uid": "<client-uid>",
"header": [ <column_name_A>, <column_name_B> ],
"values": {
"<offset_0>" : [ <value_for_A>,<value_for_B> ]
"<offset_N>" : [ <value_for_A>,<value_for_B> ]
}
}
..or..
b) message: {
"uid": "<client-uid>",
"values": {
"<column_name_A>.<offset_A>" : 3,
"<column_name_B>.<offset_B>.<offset_B_at_bit_level>" : 1
}
}
example: BNLSTAR / DCS / 1.0 / GET / Conditions / fps / fee
message: { "uid": "unique-identifier-of-the-client", "ts": 12345678 }
example: BNLSTAR / DCS / 1.0 / SET / Conditions / fps / fee
message: { "uid": "<client-uid>", "header": [A,B], "values": { "0" : [1,2] } }
message: { "uid": "<client-uid>", "values": { "A.0" : 1, "B.0" : 2 } }
III. METHODS
METHOD |
DESCRIPTION |
NOTES |
standard methods |
GET |
get latest entry => either storage or sensor may reply via personal REPLY |
|
PUT |
store new entry |
|
POST |
sensor entry change update |
|
DELETE |
delete [latest] entry |
|
HEAD |
request schema descriptor only, without data |
|
PATCH |
modify schema descriptor properties |
|
OPTIONS |
get supported methods |
|
extra methods |
REPLY |
personal reply address in REQUEST / RESPONSE pattern. Ex. topic: DCS / REPLY / <CLIENT_UID>. Example: COMMAND acknowledgements, GET replies |
|
COMMAND |
commands from control system: ON / OFF / REBOOT / POWEROFF |
|
STATUS |
retrieve status of the device: ON / OFF / REBOOT / POWEROFF / BUSY |
|
Search capabilities
Search (essentially, Filter) Capabilities and Use-Cases
To request filtering of the result, special field could be added to the request body: "dcs_filter". Contents of the "dcs_filter" define the rules of filtering - see below.
-------------------------------------------------------------
[x] 1. Constraint: WHERE ( A = B )
dcs_filter: { "A" : B }
[x] 2. Constraint: WHERE ( A = B && C = "D" )
dcs_filter: { "A": B, "C": "D" }
[x] 3. Constraint: WHERE ( A = B || C = "D" )
dcs_filter: {
'_or': { "A": B, "C": "D" }
}
[x] 4. Constraint: WHERE ( A = B || A = C || A = D )
dcs_filter: {
"A": {
'_in': [ B, C, D ]
}
}
[x] 5. Constraint: WHERE ( A = B && ( C = D || E = F ) )
dcs_filter: {
"A": B,
"_or" : { C: D, E: F }
}
-------------------------------------------------------------
[x] 6.1 Constraint: WHERE ( A > B )
dcs_filter: {
A: { '_gt': B }
}
[x] 6.2 Constraint: WHERE ( A >= B )
dcs_filter: {
A: { '_ge': B }
}
[x] 7.1 Constraint: WHERE ( A < B )
dcs_filter: {
A: { '_gt': B }
}
[x] 7.2 Constraint: WHERE ( A <= B )
dcs_filter: {
A: { '_ge': B }
}
[x] 8. Constraint: WHERE ( A > B && A < C )
dcs_filter: {
A: { '_gt': B, '_lt': C }
}
-------------------------------------------------------------
...To Be Continued