Source code for try_scenarios

import json
import os
import sys
import time
from pathlib import Path

import click
import inquirer
from snews.models.messages import CoincidenceTierMessage

from snews_pt.messages import Publisher
from snews_pt.remote_commands import reset_cache


[docs] def try_scenarios(fd_mode: bool = False, is_test: bool = False): topic = os.getenv("OBSERVATION_TOPIC") if fd_mode: topic = os.getenv("FIREDRILL_OBSERVATION_TOPIC") if not is_test: click.secho( "This script is only for testing purposes, and uses past neutrino times.\n" "We are running the scenarios in test mode", fg="red", bold=True, ) is_test = True with open(Path(__file__).parent / "scenarios.json") as json_file: coincidence_scenarios = json.load(json_file) scenarios_labels = list(coincidence_scenarios.keys()) try: questions = [ inquirer.Checkbox( "scenarios", message=click.style( " Which scenario(s) would you like to run next?", bg="yellow", bold=True, ), choices=scenarios_labels + ["finish & exit", "restart cache"], ) ] while True: try: answers = inquirer.prompt(questions) for scenario in answers["scenarios"]: if scenario == "finish & exit": click.secho("Terminating.") sys.exit() elif scenario == "restart cache": reset_cache(firedrill=fd_mode, is_test=is_test) print("> Cache cleaned\n") else: click.secho(f"\n>>> Testing {scenario}", fg="yellow", bold=True) pub = Publisher(kafka_topic=topic, auth=True) for evt in coincidence_scenarios[ scenario ]: # send one by one and sleep in between print(evt, "\n\n") msg = CoincidenceTierMessage( detector_name=evt["detector_name"], neutrino_time_utc=evt["neutrino_time_utc"], p_val=evt["p_val"], is_test=True, ) # allow future timestamps pub.add_message(msg) time.sleep(1) pub.send() print( f"> {len(coincidence_scenarios[scenario])} messages sent." ) # clear cache after each scenario reset_cache(firedrill=fd_mode, is_test=is_test) print("> Cache cleaned\n") except KeyboardInterrupt: break except Exception as e: print("Something went wrong\n", e, "\nTry manually submitting messages :/")