Source code for snews_pt.__main__

import json
import os
import warnings

import click
from dotenv import load_dotenv
from snews import messages

from . import snews_pt_utils
from .auxiliary.try_scenarios import try_scenarios
from .messages import Publisher
from .snews_sub import Subscriber

[docs] envpath = os.path.join(os.path.dirname(__file__), "auxiliary/test-config.env")
load_dotenv(envpath) if int(os.getenv("HAS_NAME_CHANGED")) == 0:
[docs] warning_text = click.style( 'You are using default detector name "TEST"\n' "Please change this by snews_pt.snews_pt_utils.set_name()", fg="red", )
warnings.warn(warning_text, UserWarning) @click.group(invoke_without_command=True, context_settings={'max_content_width': 120}, epilog='See https://snews-publishing-tools.readthedocs.io/en/latest/ for more details') @click.version_option() @click.option( "--env", type=str, default="/auxiliary/test-config.env", show_default="auxiliary/test-config.env", help="environment file containing the configurations", ) @click.pass_context
[docs] def main(ctx, env): """User interface for snews_pt tools""" base = os.path.dirname(os.path.realpath(__file__)) env_path = base + env ctx.ensure_object(dict) snews_pt_utils.set_env(env_path) ctx.obj["env"] = env ctx.obj["DETECTOR_NAME"] = os.getenv("DETECTOR_NAME") ctx.obj["USER_PASS"] = os.getenv("ADMIN_PASS", "NO_AUTH")
@main.command() @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.option( "--force", is_flag=True, default=False, show_default="False", help="Force json file to overwrite env variables", ) @click.option( "--verbose", "-v", default=0, show_default="0", help="Verbosity level. 0: no output, 1: print simple feedback, 2: print message details.", ) @click.argument("file", nargs=-1) @click.pass_context
[docs] def publish(ctx, file, firedrill, force, verbose): """Publish a message using snews_pub, multiple files are allowed $: snews_pt publish my_json_message.json Notes The topics are read from the defaults i.e. from auxiliary/test-config.env If no file is given it can still submit dummy messages with default values """ def _check_detector_name(json_detector_name, env_detector_name, force): """Check if detector name in json file matches the environment detector name. Defaults to env name. The json file name can be used with the --force flag """ if force: click.secho(f"Forcing detector name from json file: " f"{click.style(json_detector_name, bold=True)}") return json_detector_name if json_detector_name != env_detector_name: click.secho( f"{click.style('Warning:', bg='red')} " f"Detector name in JSON file ({click.style(json_detector_name, bold=True)}) " f"does not match the environment detector name " f"{click.style(env_detector_name, bold=True)}", fg="red", color="black" ) click.secho( f"{click.style('Warning:', bg='red')} " f"Using environment detector name " f"{click.style(env_detector_name, bold=True)} " f"for this message. " f"To change the detector name, use the `snews_pt set-name` command.", ) return env_detector_name if firedrill: publisher = Publisher(kafka_topic=os.getenv("FIREDRILL_OBSERVATION_TOPIC")) else: publisher = Publisher(kafka_topic=os.getenv("OBSERVATION_TOPIC")) env_detector_name = ctx.obj["DETECTOR_NAME"] for filename in file: if filename.endswith(".json"): try: with open(filename, "r", encoding="utf-8") as json_file: json_data = json.load( json_file ) snews_messages = messages.create_messages(**json_data) for message in snews_messages: _detector_name = _check_detector_name( message.detector_name, env_detector_name, force) message.detector_name = _detector_name publisher.add_message(message) publisher.send(verbose=verbose) except FileNotFoundError: click.echo(f"Error: File not found: {filename}") except json.JSONDecodeError as e: click.echo(f"Error: Invalid JSON in {filename}: {e}") except Exception as e: click.echo(f"An unexpected error occurred: {e}") else: raise TypeError(f"Expected json file with .json format! Got {filename}")
@main.command() @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.option( "--status", "-s", type=str, default="OFF", show_default="OFF", help="Heartbeat at the time of execution", ) @click.option( "--time", "-t", type=str, default=None, show_default="None", help="Machine time, format: %Y-%m-%dT%H:%M:%S.%f", ) @click.option( "--verbose", "-v", default=0, show_default="0", help="Verbosity level. 0: no output, 1: print simple feedback, 2: print message details.", ) @click.pass_context
[docs] def heartbeat(ctx, status, time, firedrill, verbose): """Publish heartbeat message :param status: Status of the experiment ON/OFF. :param time: (optional) Machine time is appended as the time of execution different time can be passed following the iso-format """ if firedrill: publisher = Publisher(kafka_topic=os.getenv("FIREDRILL_OBSERVATION_TOPIC")) else: publisher = Publisher(kafka_topic=os.getenv("OBSERVATION_TOPIC")) message = messages.HeartbeatMessage( detector_name=ctx.obj["DETECTOR_NAME"], machine_time_utc=time, detector_status=status, is_firedrill=firedrill, ) publisher.add_message(message) publisher.send(verbose=verbose)
@main.command() @click.option("--plugin", "-p", type=str, default="None") @click.option("--outputfolder", "-o", type=str, default="None") @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.option( "--test/--no-test", default=False, show_default="False", help="If True subscribe to test topic", ) @click.pass_context
[docs] def subscribe(ctx, plugin, outputfolder, firedrill, test): """Subscribe to Alert topic Optionally, `plugin` script can be passed The message content as a single dictionary will be passed to this script as a positional argument. dictionary follows the snews_alert message schema """ sub = Subscriber(ctx.obj["env"], firedrill_mode=firedrill) try: if plugin != "None": print(f"Redirecting output to {plugin}") for saved_json in sub.subscribe_and_redirect_alert( outputfolder=outputfolder, is_test=test ): os.system(f"python {plugin} {saved_json}") else: sub.subscribe(outputfolder=outputfolder, is_test=test) except KeyboardInterrupt: pass
@main.command() @click.argument("requested_tier", nargs=-1) @click.pass_context
[docs] def message_schema(ctx, requested_tier): """Display the message format for each `tier` If 'all' is passed, displays everything. """ valid_tiers = [ m.replace("Message", "") for m in messages.__all__ if m.endswith("Message") ] if len(requested_tier) == 0: requested_tier = ["all"] get_all_tiers = requested_tier[0] == "all" # it's a bool flag to print all tiers # Check for invalid tiers and echo if any requested tier is not valid if not get_all_tiers: invalid_tiers = [t for t in requested_tier if t not in valid_tiers] if invalid_tiers: click.echo(f"Warning: The following requested tiers are not valid: {', '.join(invalid_tiers)}") click.echo(f"Valid tiers are: {', '.join(valid_tiers)}") tiers = ( valid_tiers if get_all_tiers # if True, print all tiers else [t for t in requested_tier if t in valid_tiers] # if False, print only the requested tiers ) for t in tiers: tier_message = getattr(messages, t + "Message") fields = messages.get_fields(tier_message) reqfields = messages.get_fields(tier_message, required=True) click.secho(f"Message schema for {t}", bg="white", fg="blue") for f in fields: if f in reqfields: click.secho(f"{f:<20s} : (REQUIRED USER INPUT)", fg="bright_blue") else: click.secho(f"{f:<20s} : (USER INPUT)", fg="bright_cyan") click.secho(f'{"**kwargs":<20s} : (GROUPED AS META)\n', fg="bright_green")
@main.command() @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.option( "--test/--no-test", default=False, show_default="False", help="If True subscribe to test topic", )
[docs] def run_scenarios(firedrill, test): """Test different coincidence scenarios""" # base = os.path.dirname(os.path.realpath(__file__)) # path = os.path.join(base, 'auxiliary/try_scenarios.py') # os.system(f'python3 {path} {firedrill} {test}') try_scenarios(fd_mode=firedrill, is_test=test)
@main.command() @click.option( "--name", "-n", default="TEST", show_default="TEST", help="Set the detectors name" )
[docs] def set_name(name): """Set your detectors name""" from .snews_pt_utils import set_name as _set_name _set_name(name) click.secho( f"Your detector name is set to be: {os.environ['DETECTOR_NAME']}", fg="green", bold=True, )
# Remote Commands @main.command() @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.option( "--start_at", "-s", type=str, default="LATEST", help="either LATEST or EARLIEST" ) @click.option("--patience", "-p", type=int, default=8) @click.pass_context
[docs] def test_connection(ctx, firedrill, start_at, patience): """Test the connection to the server It should prompt your whether the coincidence script is running in the server :param start_at: `str` Where to start looking for the confirmation LATEST or EARLIEST :param patience: `int` seconds to wait before the check Sometime, it takes time for server to respond, increase patience """ from .remote_commands import test_connection test_connection( detector_name=ctx.obj["DETECTOR_NAME"], firedrill=firedrill, start_at=start_at, patience=patience, )
@main.command() @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.pass_context
[docs] def write_hb_logs(ctx, firedrill): """REQUIRES AUTHORIZATION | Print the HB logs on the server standard output later admins can see them remotely """ from .remote_commands import write_hb_logs write_hb_logs( detector_name=ctx.obj["DETECTOR_NAME"], admin_pass=ctx.obj["USER_PASS"], firedrill=firedrill, )
@main.command() @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.option( "--test/--no-test", default=False, show_default="False", help="If True subscribe to test topic", ) @click.pass_context
[docs] def reset_cache(ctx, firedrill, test): """REQUIRES AUTHORIZATION | Drop the current cache at the server """ from .remote_commands import reset_cache reset_cache( detector_name=ctx.obj["DETECTOR_NAME"], admin_pass=ctx.obj["USER_PASS"], firedrill=firedrill, is_test=test, )
@main.command() @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.option("--brokername", "-bn", help="Change the broker") @click.pass_context
[docs] def change_broker(ctx, firedrill, brokername): """REQUIRES AUTHORIZATION | If authorized, server changes the broker """ from .remote_commands import change_broker change_broker( brokername=brokername, detector_name=ctx.obj["DETECTOR_NAME"], admin_pass=ctx.obj["USER_PASS"], firedrill=firedrill, )
@main.command() @click.option( "--firedrill/--no-firedrill", default=True, show_default="True", help="Whether to use firedrill brokers or default ones", ) @click.pass_context
[docs] def get_feedback(ctx, firedrill): """REQUIRES AUTHORIZATION | Get heartbeat feedback by email """ from .remote_commands import get_feedback get_feedback(detector_name=ctx.obj["DETECTOR_NAME"], firedrill=firedrill)
if __name__ == "__main__": main()