from __future__ import print_function
import zmq
import time
import ujson
import traceback
import importlib
from zmq.utils.strtypes import asbytes
from oct.core.turrets_manager import TurretsManager
from oct.results.stats_handler import StatsHandler
DEFAULT_HIGHTQUATER_CLASS = 'oct.core.hq.HightQuarter'
[docs]def get_hq_class(path=None):
path = path or DEFAULT_HIGHTQUATER_CLASS
module_path = '.'.join(path.split('.')[:-1])
object_name = path.split('.')[-1]
module = importlib.import_module(module_path)
hq_class = getattr(module, object_name)
return hq_class
[docs]class HightQuarter(object):
"""The main hight quarter that will receive informations from the turrets
and send the start message
:param str output_dir: output directory for results
:param dict config: the configuration of the test
:param str topic: topic for external publishing socket
:param bool with_forwarder: tell HQ if it should connects to forwarder, default False
:param bool with_streamer: tell HQ if ti should connects to streamer, default False
:param str streamer_address: streamer address to connect with form : <ip>:<port>
"""
def __init__(self, output_dir, config, topic, master=True, *args, **kwargs):
self.context = zmq.Context()
self.poller = zmq.Poller()
self.topic = topic
self.master = master
self.result_collector = self.context.socket(zmq.PULL)
self.external_publisher = self.context.socket(zmq.PUB)
self.stats_handler = StatsHandler(config.get('results_database', {}).get('insert_limit', 150))
self._configure_sockets(config)
self.transaction_context = {}
self.turrets_manager = TurretsManager(config.get('publish_port', 5000), master)
self.config = config
self.started = False
self.messages = 0
with_forwarder = kwargs.get('with_forwarder', False)
forwarder_address = None
if with_forwarder is True:
forwarder_address = kwargs.get('forwarder_address', None)
if forwarder_address is None:
forwarder_address = "127.0.0.1:{}".format(config.get('external_publisher', 5002))
self._configure_external_publisher(config, with_forwarder, forwarder_address)
# waiting for init sockets
print("Warmup")
time.sleep(1)
def _configure_external_publisher(self, config, with_forwarder=False, forwarder_address=None):
external_publisher = config.get('external_publisher', 5002) if not forwarder_address else forwarder_address
print(external_publisher)
if with_forwarder:
self.external_publisher.connect("tcp://{}".format(external_publisher))
else:
self.external_publisher.bind("tcp://*:{}".format(external_publisher))
def _configure_sockets(self, config, with_streamer=False, with_forwarder=False):
"""Configure sockets for HQ
:param dict config: test configuration
:param bool with_streamer: tell if we need to connect to streamer or simply bind
:param bool with_forwarder: tell if we need to connect to forwarder or simply bind
"""
rc_port = config.get('rc_port', 5001)
self.result_collector.set_hwm(0)
self.result_collector.bind("tcp://*:{}".format(rc_port))
self.poller.register(self.result_collector, zmq.POLLIN)
def _process_socks(self, socks):
if self.result_collector in socks:
data = self.result_collector.recv_string()
if 'status' not in data:
self.stats_handler.write_result(ujson.loads(data))
self.external_publisher.send_multipart([asbytes(self.topic), asbytes(data)])
self.messages += 1
else:
self.turrets_manager.process_message(ujson.loads(data))
def _print_status(self, elapsed):
display = 'turrets: {}, elapsed: {} messages received: {}\r'
print(display.format(len(self.turrets_manager.turrets), round(elapsed), self.messages,), end='')
def _clean_queue(self):
try:
data = self.result_collector.recv(zmq.NOBLOCK)
while data:
data = self.result_collector.recv(zmq.NOBLOCK)
if b'status' not in data:
self.stats_handler.write_result(ujson.loads(data))
except zmq.Again:
self.result_collector.close()
self.turrets_manager.clean()
self.stats_handler.write_remaining()
def _run_loop_action(self):
socks = dict(self.poller.poll(1000))
self._process_socks(socks)
[docs] def wait_turrets(self, wait_for):
"""Wait until wait_for turrets are connected and ready
"""
print("Waiting for %d turrets" % (wait_for - len(self.turrets_manager.turrets)))
while len(self.turrets_manager.turrets) < wait_for:
self.turrets_manager.status_request()
socks = dict(self.poller.poll(2000))
if self.result_collector in socks:
data = self.result_collector.recv_json()
self.turrets_manager.process_message(data)
print("Waiting for %d turrets" % (wait_for - len(self.turrets_manager.turrets)))
[docs] def run(self):
"""Run the hight quarter, lunch the turrets and wait for results
"""
elapsed = 0
run_time = self.config['run_time']
start_time = time.time()
t = time.time
self.turrets_manager.start(self.transaction_context)
self.started = True
while elapsed <= run_time:
try:
self._run_loop_action()
self._print_status(elapsed)
elapsed = t() - start_time
except (Exception, KeyboardInterrupt):
print("\nStopping test, sending stop command to turrets")
self.turrets_manager.stop()
self.stats_handler.write_remaining()
traceback.print_exc()
break
self.turrets_manager.stop()
print("\n\nProcessing all remaining messages... This could take time depending on message volume")
t = time.time()
self.result_collector.unbind(self.result_collector.LAST_ENDPOINT)
self._clean_queue()
print("took %s" % (time.time() - t))
[docs] def setup(self):
"""This method will be called before to start turrets.
"""
pass
[docs] def tear_down(self):
"""This method will be called after to stop turrets.
"""
pass