Skip to content

Commit efd7896

Browse files
committed
Implemented ZeroMQ socket timeouts & retry logic fordropped connections.
1 parent c87cf2f commit efd7896

1 file changed

Lines changed: 28 additions & 2 deletions

File tree

concore.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,38 @@ def __init__(self, port_type, address, zmq_socket_type):
2020
self.socket = self.context.socket(zmq_socket_type)
2121
self.port_type = port_type # "bind" or "connect"
2222
self.address = address
23+
24+
self.socket.setsockopt(zmq.RCVTIMEO, 2000)
25+
self.socket.setsockopt(zmq.SNDTIMEO, 2000)
26+
self.socket.setsockopt(zmq.LINGER, 0)
27+
2328
if self.port_type == "bind":
2429
self.socket.bind(address)
2530
print(f"ZMQ Port bound to {address}")
2631
else:
2732
self.socket.connect(address)
2833
print(f"ZMQ Port connected to {address}")
34+
35+
def send_json_with_retry(self, message):
36+
for attempt in range(5):
37+
try:
38+
self.socket.send_json(message)
39+
return
40+
except zmq.Again:
41+
print(f"Send timeout (attempt {attempt + 1}/5)")
42+
time.sleep(0.5)
43+
print("Failed to send after retries.")
44+
return
45+
46+
def recv_json_with_retry(self):
47+
for attempt in range(5):
48+
try:
49+
return self.socket.recv_json()
50+
except zmq.Again:
51+
print(f"Receive timeout (attempt {attempt + 1}/5)")
52+
time.sleep(0.5)
53+
print("Failed to receive after retries.")
54+
return None
2955

3056
# Global ZeroMQ ports registry
3157
zmq_ports = {}
@@ -143,7 +169,7 @@ def read(port_identifier, name, initstr_val):
143169
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
144170
zmq_p = zmq_ports[port_identifier]
145171
try:
146-
message = zmq_p.socket.recv_json()
172+
message = zmq_p.recv_json_with_retry()
147173
return message
148174
except zmq.error.ZMQError as e:
149175
print(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
@@ -209,7 +235,7 @@ def write(port_identifier, name, val, delta=0):
209235
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
210236
zmq_p = zmq_ports[port_identifier]
211237
try:
212-
zmq_p.socket.send_json(val)
238+
zmq_p.send_json_with_retry(val)
213239
except zmq.error.ZMQError as e:
214240
print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
215241
except Exception as e:

0 commit comments

Comments
 (0)