Source code for snews_pt.remote_commands

"""
Easy handle remote commands


Melih Kara, kara@kit.edu
"""

import os
import time
from datetime import UTC, datetime

import click
from hop import Stream
from hop.io import StartPosition
from hop.models import JSONBlob
from snews.models import messages


[docs] def test_connection(detector_name=None, firedrill=True, start_at="LATEST", patience=8): """test the server connection It should prompt your whether the coincidence script is running in the server Parameters ---------- start_at : str Where hop starts looking for messages can either be "LATEST" or "EARLIEST" patience : int Seconds to wait before the check Returns ------- None """ detector_name = detector_name or os.getenv("DETECTOR_NAME") if detector_name == "": detector_name = "Test-Detector" default_connection_topic = "kafka://kafka.scimma.org/snews.connection-testing" connection_broker = os.getenv("CONNECTION_TEST_TOPIC", default_connection_topic) stamp_time = datetime.now(UTC).isoformat() message = { "_id": "0_test-connection", "detector_name": detector_name, "sent_time_utc": stamp_time, "status": "sending", "meta": {}, } message = messages.DetectorMessageBase( id="0_test-connection", detector_name=detector_name, tier=messages.Tier.HEART_BEAT, sent_time_utc=stamp_time, meta={"status": "sending"}, ) if firedrill: topic = os.getenv("FIREDRILL_OBSERVATION_TOPIC") else: topic = os.getenv("OBSERVATION_TOPIC") _start_at = StartPosition.LATEST if start_at == "LATEST" else StartPosition.EARLIEST substream = Stream(until_eos=True, auth=True, start_at=_start_at) pubstream = Stream(until_eos=True, auth=True) click.secho( f"\n> Testing your connection.\n> Sending to {topic}\n" f"> Expecting from {connection_broker}. \n" f"> Going to wait {patience} seconds before checking for confirmation...\n" ) # start_time = datetime.utcnow() confirmed = False message_expected = message.model_copy() message_expected.meta["status"] = "received" message_expected = message_expected.model_dump() with pubstream.open(topic, "w") as ps, substream.open(connection_broker, "r") as ss: ps.write(JSONBlob(message.model_dump_json())) # while (datetime.utcnow() - start_time) < timedelta(seconds=wait): time.sleep(patience) for read in ss: read = read.content if read == message_expected: read_name = click.style(read["detector_name"], fg="green", bold=True) read_time = click.style(read["sent_time_utc"], fg="green", bold=True) click.echo( f"You ({read_name}) have a connection to the server at {read_time}" ) confirmed = True break else: # if there is no else: continue statement, it does not work continue if not confirmed: click.secho( f"\tWaited for {patience} sec and checked from {start_at}," f" couldn't get a confirmation" f"\n\tMaybe increase timeout and try again.", fg="red", bold=True, ) return False return True
[docs] def write_hb_logs(detector_name=None, admin_pass=None, firedrill=True): """Ask server to print the heartbeat logs on the server as standard output later admins can see them remotely. Requires admin password Parameters ---------- detector_name : str Name of the detector requesting the write operation admin_pass : str Simple password as a string firedrill : bool Whether to use the firedrill broker """ passw = admin_pass or os.getenv("ADMIN_PASS", "NO_AUTH") detector_name = detector_name or os.getenv("DETECTOR_NAME") message = { "_id": "0_display-heartbeats", "pass": passw, "detector_name": detector_name, "meta": {}, } topic = ( os.getenv("FIREDRILL_OBSERVATION_TOPIC") if firedrill else os.getenv("OBSERVATION_TOPIC") ) pubstream = Stream(until_eos=True, auth=True) with pubstream.open(topic, "w") as ps: ps.write(message) logslink = "> https://www.physics.purdue.edu/snews/logs/" click.secho( f"> Requested logs. If you have rights, go to remote Purdue server logs\n{logslink}\n", fg="blue", bold=True, )
[docs] def reset_cache(detector_name=None, admin_pass=None, firedrill=True, is_test=True): """If authorized, drop the current cache at the server Parameters ---------- detector_name : str Name of the detector admin_pass : str Simple password as a string firedrill : bool Whether to use the firedrill broker """ passw = admin_pass or os.getenv("ADMIN_PASS", "NO_AUTH") detector_name = detector_name or os.getenv("DETECTOR_NAME") message = { "_id": "0_hard-reset", "pass": passw, "detector_name": detector_name, "is_test": is_test, "meta": {}, } topic = ( os.getenv("FIREDRILL_OBSERVATION_TOPIC") if firedrill else os.getenv("OBSERVATION_TOPIC") ) pubstream = Stream(until_eos=True, auth=True) with pubstream.open(topic, "w") as ps: ps.write(message) click.secho( "> Requesting to Reset the cache. If you have rights, cache will be reset", fg="blue", bold=True, )
[docs] def change_broker(brokername, detector_name=None, admin_pass=None, firedrill=True): """If authorized, server changes the broker === Not implemented yet === brokername : str name of the broker to replace detector_name : str Name of the detector admin_pass : str Simple password as a string firedrill : bool Whether to use the firedrill broker """ passw = admin_pass or os.getenv("ADMIN_PASS", "NO_AUTH") detector_name = detector_name or os.getenv("DETECTOR_NAME") message = { "_id": "0_broker-change", "pass": passw, "detector_name": detector_name, "new_broker": brokername, "meta": {}, } current_topic = ( os.getenv("FIREDRILL_OBSERVATION_TOPIC") if firedrill else os.getenv("OBSERVATION_TOPIC") ) pubstream = Stream(until_eos=True, auth=True) with pubstream.open(current_topic, "w") as ps: ps.write(message) click.secho( "> Requesting to change the broker. If you have rights, broker will be changed", fg="blue", bold=True, )
[docs] def get_feedback(detector_name=None, email_address=None, firedrill=True): """Get heartbeat feedback by email For a given detector, if your email is registered We are going to send that email address(es) an email with feedback from last 24hours. multiple email addresses are allowed with a semicolon delimiter (;) Parameters ---------- detector_name : str Name of your detector email_address : str or list Registered e-mail adress(es) firedrill : bool Whether to use firedrill broker or not """ detector_name = detector_name or os.getenv("DETECTOR_NAME") email_address = email_address or input("\t> Your registered email address: ") message = { "_id": "0_Get-Feedback", "email": email_address, "detector_name": detector_name, "meta": {}, } topic = ( os.getenv("FIREDRILL_OBSERVATION_TOPIC") if firedrill else os.getenv("OBSERVATION_TOPIC") ) pubstream = Stream(until_eos=True, auth=True) with pubstream.open(topic, "w") as ps: ps.write(message) click.secho("Heartbeat Feedback is requested! Expect an email from us!")