Skip to content

Commit 6c7db13

Browse files
committed
Add new tool: iedctrl.py to operate on IEDs
--- + empty strings will be shown as <EMPTY> using mms_utility.py
1 parent 3cef197 commit 6c7db13

3 files changed

Lines changed: 225 additions & 1 deletion

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ write_to = "src/icspacket/_version.py"
7575
"mms_utility.py" = "icspacket.examples.mms_utility:cli_main"
7676
"mmsclient.py" = "icspacket.examples.mmsclient:cli_main"
7777
"iedmap.py" = "icspacket.examples.iedmap:cli_main"
78+
"iedctrl.py" = "icspacket.examples.iedctrl:cli_main"
7879
"dnp3dump.py" = "icspacket.examples.dnp3dump:cli_main"
7980
"dnp3read.py" = "icspacket.examples.dnp3read:cli_main"
8081
"dnp3linkaddr.py" = "icspacket.examples.dnp3linkaddr:cli_main"

src/icspacket/examples/iedctrl.py

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
# Copyright (C) 2025-present MatrixEditor @ github
2+
#
3+
# This program is free software: you can redistribute it and/or modify
4+
# it under the terms of the GNU General Public License as published by
5+
# the Free Software Foundation, either version 3 of the License, or
6+
# (at your option) any later version.
7+
#
8+
# This program is distributed in the hope that it will be useful,
9+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
# GNU General Public License for more details.
12+
#
13+
# You should have received a copy of the GNU General Public License
14+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
15+
# pyright: reportUnusedCallResult=false, reportGeneralTypeIssues=false
16+
import argparse
17+
import json
18+
import logging
19+
import sys
20+
21+
from pathlib import Path
22+
from typing import Any
23+
24+
from rich.console import Console
25+
from rich.prompt import Prompt
26+
27+
from icspacket.core import logger
28+
from icspacket.examples.mms_utility import data_to_str
29+
from icspacket.examples.util import add_logging_options
30+
from icspacket.proto.iec61850.classes import FC, ControlModel
31+
from icspacket.proto.iec61850.client import IED_Client
32+
from icspacket.examples.util.mms import (
33+
add_mms_connection_options,
34+
init_mms_connection,
35+
)
36+
from icspacket.proto.iec61850.control import LastApplError
37+
from icspacket.proto.iec61850.path import DataObjectReference
38+
39+
40+
def cli2data(value: str) -> Any | None:
41+
path = Path(value)
42+
try:
43+
if path.exists() and path.is_file():
44+
with path.open() as fp:
45+
doc = json.load(fp)
46+
else:
47+
doc = json.loads(value)
48+
if "value" in doc:
49+
doc = doc["value"]
50+
except json.JSONDecodeError as e:
51+
logging.error(f"Invalid JSON value format: {value!r} - {e}")
52+
return None
53+
return doc
54+
55+
56+
def cli_main():
57+
from icspacket import __version__
58+
59+
parser = argparse.ArgumentParser(
60+
description="IED operate tool",
61+
formatter_class=argparse.RawDescriptionHelpFormatter,
62+
)
63+
# fmt: off
64+
group = parser.add_argument_group("Target Options")
65+
group.add_argument("-t", "--target", metavar="[LDName/]LNName.[FC].DataName", default=None, help=(
66+
"Target data node to control. If missing the logical device (LDName), the service will first be \n"
67+
"queried for the default logical node (LNName) and then the data node (DataName) will be used. \n"
68+
"Functional Constrains (FC) is optional as this will always be the CO (Control)."
69+
), required=True)
70+
group.add_argument("--check", action="store_true", help="Queries the value after successful operation.")
71+
72+
group = parser.add_argument_group("Value Options")
73+
group = group.add_mutually_exclusive_group(required=True)
74+
group.add_argument("--value", metavar="VALUE", default=None, dest="var_value", help=(
75+
"Value to write to variable(s). JSON format is used for structured data. "
76+
"Alternatively, a file path containing the JSON can be specified.")
77+
)
78+
group.add_argument("--toggle", action="store_true", help="Toggle the value of the specified variable (queries first and assumes boolean).")
79+
80+
# fmt: on
81+
add_mms_connection_options(parser)
82+
add_logging_options(parser)
83+
84+
args = parser.parse_args()
85+
args.console = Console()
86+
87+
logger.init_from_args(args.verbosity, args.quiet, args.ts)
88+
if args.verbosity > 0:
89+
print(f"icspacket v{__version__}\n")
90+
91+
if args.var_value is None:
92+
if not args.toggle:
93+
logging.error("Must specify --value or --toggle")
94+
parser.print_usage()
95+
else:
96+
args.var_value = cli2data(args.var_value)
97+
if args.var_value is None:
98+
sys.exit(1)
99+
100+
conn = init_mms_connection(
101+
args.host,
102+
args.port,
103+
args.auth,
104+
args.auth_stdin,
105+
)
106+
if conn is None:
107+
sys.exit(1)
108+
109+
client = IED_Client(conn=conn)
110+
if "/" not in args.target:
111+
# search for logical devices
112+
devices = client.get_server_directory()
113+
if len(devices) == 0:
114+
logging.error("No logical devices found")
115+
sys.exit(1)
116+
117+
if len(devices) == 1:
118+
logging.info(f"Automatically selecting {devices[0]} as logical device")
119+
args.target = f"{devices[0]}/{args.target}"
120+
else:
121+
ldname = Prompt.ask(
122+
"Multiple logical devices found, please select one",
123+
choices=devices,
124+
default=devices[0],
125+
)
126+
args.target = f"{ldname}/{args.target}"
127+
128+
ref = DataObjectReference.from_mmsref(args.target)
129+
parts = ref.parts
130+
# Always change constraint to CO, even if not present
131+
ref = DataObjectReference(ref.ldname, ref.lnname, FC.CO.name, parts[-1])
132+
logging.debug(f"Requesting control model information about {parts[-1]}...")
133+
try:
134+
co = client.control(ref)
135+
logging.info(f"Control model for node: {co.model.name}")
136+
except ConnectionError:
137+
logging.error(
138+
"Failed to request control model information - is node really present?"
139+
)
140+
sys.exit(1)
141+
142+
if args.toggle:
143+
logging.debug("Toggle: Requesting current value for node...")
144+
value = client.get_data_values(ref / "Oper" / "ctlVal")
145+
if value.failure:
146+
logging.error("Failed to get value: %s", value.failure.value)
147+
sys.exit(1)
148+
149+
current = value.success.boolean
150+
args.var_value = not current
151+
logging.info(
152+
f"Value will be changed from {str(current).upper()} -> [b]{str(not current).upper()}[/]"
153+
)
154+
else:
155+
logging.info(f"Value will be changed to: {args.var_value!r}")
156+
157+
try:
158+
match co.model:
159+
case ControlModel.DIRECT_NORMAL:
160+
logging.debug("Direct Control Nodel: Writing value...")
161+
error = client.operate(co, ctl_val=args.var_value)
162+
if error:
163+
logging.error("Failed to start opertation: %s", error.value)
164+
sys.exit(1)
165+
166+
case ControlModel.DIRECT_ENHANCED:
167+
if error := client.operate(co, ctl_val=args.var_value):
168+
logging.error("Failed to start opertation: %s", error.value)
169+
sys.exit(1)
170+
171+
with args.console.status("Waiting for operation to complete..."):
172+
_ = client.await_command_termination()
173+
174+
case ControlModel.SBO_NORMAL:
175+
logging.debug("Selecting node before operation (SBO)...")
176+
if error := client.select(co):
177+
logging.error("Failed to select node: %s", error.value)
178+
sys.exit(1)
179+
180+
if error := client.operate(co, ctl_val=args.var_value):
181+
logging.error("Failed to start opertation: %s", error.value)
182+
sys.exit(1)
183+
184+
case ControlModel.SBO_ENHANCED:
185+
logging.debug("Selecting node with value before operation (SBOw)...")
186+
if error := client.select_with_value(co, ctl_val=args.var_value):
187+
logging.error("Failed to select node: %s", error.value)
188+
sys.exit(1)
189+
190+
if error := client.operate(co, ctl_val=args.var_value):
191+
logging.error("Failed to start opertation: %s", error.value)
192+
sys.exit(1)
193+
194+
with args.console.status("Waiting for operation to complete..."):
195+
_ = client.await_command_termination()
196+
except LastApplError as e:
197+
logging.error(
198+
"Failed to operate on node %s: error: %s, cause: %s",
199+
parts[-1],
200+
repr(e.error),
201+
repr(e.cause),
202+
)
203+
except ConnectionError as e:
204+
logging.error(
205+
"Failed to request control model information - is node really present?"
206+
)
207+
sys.exit(1)
208+
except KeyboardInterrupt:
209+
logging.warning("Operation cancelled by user")
210+
else:
211+
logging.info(f"Successfully completed operation on {parts[-1]}!")
212+
213+
if args.check:
214+
value = client.get_data_values(ref / "Oper" / "ctlVal")
215+
success = value.success
216+
if success:
217+
args.console.print(f"Current value: {data_to_str(success)}")
218+
else:
219+
logging.error("Failed to get value: %s", value.failure.value)
220+
221+
222+
if __name__ == "__main__":
223+
cli_main()

src/icspacket/examples/mms_utility.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def data_to_str(data: Data) -> str | dict | list:
8686
for item in data.structure
8787
}
8888
case Data.PRESENT.PR_visible_string:
89-
return escape(data.visible_string or "")
89+
return escape(data.visible_string or "<EMPTY>")
9090
case _:
9191
return escape(data.to_text().decode().strip())
9292

0 commit comments

Comments
 (0)