Skip to content

Commit 6ec969f

Browse files
authored
Merge pull request #161 from saksham-gera/dev
Integrated ZeroMQ in ControlCore.
2 parents 69bf02b + b7c21e8 commit 6ec969f

30 files changed

Lines changed: 5204 additions & 599 deletions
Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ def init_zmq_port(port_name, port_type, address, socket_type_str):
5454
except Exception as e:
5555
print(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")
5656

57+
def terminate_zmq():
58+
for port in zmq_ports.values():
59+
try:
60+
port.socket.close()
61+
port.context.term()
62+
except Exception as e:
63+
print(f"Error while terminating ZMQ port {port.address}: {e}")
5764
# --- ZeroMQ Integration End ---
5865

5966
def safe_literal_eval(filename, defaultValue):
@@ -207,16 +214,17 @@ def write(port_identifier, name, val, delta=0):
207214
print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
208215
except Exception as e:
209216
print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
210-
return
211-
217+
return
212218
try:
213-
file_port_num = int(port_identifier)
219+
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
220+
file_path = os.path.join("../"+port_identifier, name)
221+
else:
222+
file_port_num = int(port_identifier)
223+
file_path = os.path.join(outpath+str(file_port_num), name)
214224
except ValueError:
215225
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
216226
return
217227

218-
file_path = os.path.join(outpath+str(file_port_num), name)
219-
220228
if isinstance(val, str):
221229
time.sleep(2 * delay)
222230
elif not isinstance(val, list):

0mq/comm_node.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import concore
2+
import concore2
3+
4+
concore.delay = 0.07
5+
concore2.delay = 0.07
6+
concore2.inpath = concore.inpath
7+
concore2.outpath = concore.outpath
8+
concore2.simtime = 0
9+
concore.default_maxtime(100)
10+
init_simtime_u = "[0.0, 0.0, 0.0]"
11+
init_simtime_ym = "[0.0, 0.0, 0.0]"
12+
13+
u = concore.initval(init_simtime_u)
14+
ym = concore2.initval(init_simtime_ym)
15+
while(concore2.simtime<concore.maxtime):
16+
while concore.unchanged():
17+
u = concore.read(concore.iport['U'],"u",init_simtime_u)
18+
concore.write(concore.oport['U1'],"u",u)
19+
print(u)
20+
old2 = concore2.simtime
21+
while concore2.unchanged() or concore2.simtime <= old2:
22+
ym = concore2.read(concore.iport['Y1'],"ym",init_simtime_ym)
23+
concore2.write(concore.oport['Y'],"ym",ym)
24+
print("funbody u="+str(u)+" ym="+str(ym)+" time="+str(concore2.simtime))
25+
print("retry="+str(concore.retrycount))

0mq/distributed_client.graphml

Lines changed: 592 additions & 0 deletions
Large diffs are not rendered by default.

0mq/distributed_server.graphml

Lines changed: 520 additions & 0 deletions
Large diffs are not rendered by default.

0mq/fileOnlyCommunication.graphml

Lines changed: 460 additions & 0 deletions
Large diffs are not rendered by default.

0mq/firstNode.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# firstNode.py (Client/Orchestrator)
2+
import concore
3+
import time
4+
5+
# --- ZMQ Initialization ---
6+
# This REQ socket connects to Node B (F2)
7+
concore.init_zmq_port(
8+
port_name=f"0x{PORT_F1_F2}_{PORT_NAME_F1_F2}",
9+
port_type="connect",
10+
address="tcp://localhost:" + PORT_F1_F2,
11+
socket_type_str="REQ"
12+
)
13+
# This REQ socket connects to Node C (F3)
14+
concore.init_zmq_port(
15+
port_name=f"0x{PORT_F1_F3}_{PORT_NAME_F1_F3}",
16+
port_type="connect",
17+
address="tcp://localhost:" + PORT_F1_F3,
18+
socket_type_str="REQ"
19+
)
20+
21+
current_value = 0.0
22+
23+
while current_value <= 100:
24+
# --- Step 1: Communicate with Node B ---
25+
print(f"Node A: Sending value {current_value:.2f} to Node B.")
26+
concore.write(f"0x{PORT_F1_F2}_{PORT_NAME_F1_F2}", "value", [current_value])
27+
28+
# Wait for the reply from Node B
29+
value_from_b = concore.read(f"0x{PORT_F1_F2}_{PORT_NAME_F1_F2}", "value", [current_value])
30+
processed_by_b = value_from_b[0]
31+
print(f"Node A: Received processed value {processed_by_b:.2f} from Node B.")
32+
33+
# --- Step 2: Communicate with Node C ---
34+
print(f"Node A: Sending value {processed_by_b:.2f} to Node C.")
35+
concore.write(f"0x{PORT_F1_F3}_{PORT_NAME_F1_F3}", "value", [processed_by_b])
36+
37+
# Wait for the reply from Node C
38+
value_from_c = concore.read(f"0x{PORT_F1_F3}_{PORT_NAME_F1_F3}", "value", [processed_by_b])
39+
current_value = value_from_c[0]
40+
print(f"Node A: Received final value {current_value:.2f} from Node C.")
41+
print("-" * 20)
42+
time.sleep(1) # Slow down the loop for readability
43+
44+
print("\nNode A: Value exceeded 100. Terminating.")
45+
concore.terminate_zmq()

0 commit comments

Comments
 (0)