from __future__ import print_function
import zmq
import time
import json
import traceback
[docs]class HightQuarter(object):
"""The main hight quarter that will receive informations from the turrets
and send the start message
:param int publish_port: the port for publishing information to turrets
:param int rc_port: the result collector port for collecting results from the turrets
:param StatsHandler stats_handler: the stats handler writer
:param dict config: the configuration of the test
"""
def __init__(self, publish_port, rc_port, stats_handler, config):
self.context = zmq.Context()
self.poller = zmq.Poller()
self.result_collector = self.context.socket(zmq.PULL)
self.result_collector.bind("tcp://*:{}".format(rc_port))
self.publisher = self.context.socket(zmq.PUB)
self.publisher.bind("tcp://*:{}".format(publish_port))
self.poller.register(self.result_collector, zmq.POLLIN)
self.stats_handler = stats_handler
self.config = config
self.turrets = []
self.started = False
# waiting for init sockets
print("Warmup")
time.sleep(1)
def _turret_already_exists(self, turret_data):
for t in self.turrets:
if turret_data['uuid'] == t.uuid:
return True
return False
def _add_turret(self, turret_data):
self.turrets.append(self.stats_handler.write_turret(turret_data))
if self.started:
self._publish({'command': 'start', 'msg': 'open fire'})
def _update_turret(self, turret_data):
for t in self.turrets:
if turret_data['uuid'] == t.uuid:
t.status = turret_data['status']
t.save()
break
def _publish(self, message, channel=''):
data = json.dumps(message)
self.publisher.send_string("%s %s" % (channel, data))
def _process_turret_status(self, data):
if 'status' in data:
if self._turret_already_exists(data):
self._update_turret(data)
else:
self._add_turret(data)
return True
else:
return False
def _process_socks(self, socks):
if self.result_collector in socks:
data = self.result_collector.recv_json()
if 'status' not in data:
self.stats_handler.write_result(data)
else:
self._process_turret_status(data)
def _print_status(self, elapsed):
display = 'turrets: {}, elapsed: {} transactions: {} timers: {} errors: {}\r'
print(display.format(len(self.turrets), round(elapsed), self.stats_handler.trans_count,
self.stats_handler.timer_count,
self.stats_handler.error_count), end='')
def _clean_queue(self):
try:
data = self.result_collector.recv_json(zmq.NOBLOCK)
while data:
data = self.result_collector.recv_json(zmq.NOBLOCK)
if 'status' not in data:
self.stats_handler.write_result(data)
except zmq.Again:
self.result_collector.close()
self.publisher.close()
self.stats_handler.write_remaining()
def _run_loop_action(self):
socks = dict(self.poller.poll(1000))
self._process_socks(socks)
[docs] def wait_turrets(self, wait_for):
"""Wait until wait_for turrets are connected and ready
"""
print("waiting for {} turrets to connect".format(wait_for - len(self.turrets)))
while len(self.turrets) < wait_for:
self._publish({'command': 'status_request', 'msg': None})
socks = dict(self.poller.poll(2000))
if self.result_collector in socks:
data = self.result_collector.recv_json()
self._process_turret_status(data)
print("waiting for {} turrets to connect".format(wait_for - len(self.turrets)))
[docs] def run(self):
"""Run the hight quarter, lunch the turrets and wait for results
"""
elapsed = 0
start_time = time.time()
self._publish({'command': 'start', 'msg': 'open fire'})
self.started = True
while elapsed <= (self.config['run_time']):
try:
self._run_loop_action()
self._print_status(elapsed)
elapsed = time.time() - start_time
except (Exception, KeyboardInterrupt):
print("\nStopping test, sending stop command to turrets")
self._publish({'command': 'stop', 'msg': 'premature stop'})
traceback.print_exc()
break
self._publish({'command': 'stop', 'msg': 'stopping fire'})
print("\n\nProcessing all remaining messages...")
self.result_collector.unbind(self.result_collector.LAST_ENDPOINT)
self._clean_queue()