import click
import json
import os
from datetime import datetime
from hop import Stream
from . import snews_pt_utils
[docs]
def make_file(outputfolder):
"""Get a proper json file name at a given folder
It applies an increment to the file name at a given folder to avoid overwrite
"""
os.makedirs(outputfolder, exist_ok=True)
date = datetime.utcnow().isoformat().split("T")[0]
file = os.path.join(outputfolder, f"0-SNEWS_ALERT_{date}.json")
while os.path.isfile(file):
i = int(file.split("/")[-1].split("-")[0])
file = os.path.join(outputfolder, f"{i + 1}-SNEWS_ALERT_{date}.json")
return file
[docs]
def save_message(message, outputfolder, return_file=False):
"""Save messages to a json file.
Parameters
----------
message : dict
The incoming alert message
outputfolder : str
The path where to save the incoming alerts
return_file : bool
Whether to return file name as a string
"""
file = make_file(outputfolder)
with open(file, 'w') as outfile:
json.dump(message, outfile, indent=4, sort_keys=True)
if return_file:
return file
[docs]
def display(message):
"""Display the incoming alert message on screen"""
click.echo(click.style("ALERT MESSAGE".center(65, "_"), bg="red", bold=True))
for k, v in message.items():
key_type = type(v)
if key_type == type(None):
v = "None"
if key_type in [int, float, str]:
if k == "alert_type":
if v == "RETRACTION":
click.echo(f"{k:<20s}" + click.style(f":{v:<45}", bg="red"))
elif v == "UPDATE":
click.echo(f"{k:<20s}" + click.style(f":{v:<45}", bg="blue"))
else:
click.echo(f"{k:<20s}:{v:<45}")
else:
click.echo(f"{k:<20s}:{v:<45}")
elif key_type == list:
v = [str(item) for item in v]
items = "\t".join(v)
if k == "detector_names":
click.echo(f"{k:<20s}" + click.style(f":{items:<45}", bg="blue"))
else:
click.echo(f"{k:<20s}:{items:<45}")
click.secho("_".center(65, "_"), bg="bright_red")
[docs]
class Subscriber:
"""Class to subscribe ALERT message stream
Parameters
----------
env_path : str
path for the environment file. Use default settings if not given
firedrill_mode : bool
tell Subscriber to get messages from the firedrill hop broker, defaults to False
"""
def __init__(self, env_path=None, firedrill_mode=True):
snews_pt_utils.set_env(env_path)
[docs]
self.alert_topic = os.getenv("ALERT_TOPIC")
[docs]
self.connection_test_topic = os.getenv("CONNECTION_TEST_TOPIC")
if firedrill_mode:
self.alert_topic = os.getenv("FIREDRILL_ALERT_TOPIC")
[docs]
self.snews_time = datetime.utcnow().isoformat()
[docs]
self.default_output = os.path.join(os.getcwd(), os.getenv("ALERT_OUTPUT"))
[docs]
def subscribe(self, outputfolder=None, auth=True, is_test=False):
"""Subscribe and listen to a given topic
Parameters
----------
outputfolder: str
where to save the alert messages, if None, creates a file based on env file
auth: A `bool` or :class:`Auth <hop.auth.Auth>` instance. Defaults to
loading from :meth:`auth.load_auth <hop.auth.load_auth>` if set to
True. To disable authentication, set to False.
is_test: bool if True overwrites the subscribed topic with CONNECTION_TEST_TOPIC
"""
outputfolder = outputfolder or self.default_output
TOPIC = self.connection_test_topic if is_test else self.alert_topic
click.echo(
"You are subscribing to "
+ click.style("ALERT", bg="red", bold=True)
+ "\nBroker:"
+ click.style(f"{TOPIC}", bg="green")
)
# Initiate hop_stream
stream = Stream(until_eos=False, auth=auth)
try:
with stream.open(TOPIC, "r") as s:
for message in s:
# Access message dictionary from JSOBlob
message = message.content
try:
if message["_id"] == "0_test-connection":
continue
except KeyError:
pass
# Save and display
save_message(message, outputfolder)
snews_pt_utils.display_gif()
display(message)
except KeyboardInterrupt:
click.secho("Done", fg="green")
[docs]
def subscribe_and_redirect_alert(
self, outputfolder=None, auth=True, _display=True, _return="file", is_test=False
):
"""subscribe generator"""
outputfolder = outputfolder or self.default_output
TOPIC = self.connection_test_topic if is_test else self.alert_topic
click.echo(
"You are subscribing to "
+ click.style("ALERT", bg="red", bold=True)
+ "\nBroker:"
+ click.style(f"{TOPIC}", bg="green")
)
# Initiate hop_stream
stream = Stream(until_eos=False, auth=auth)
try:
with stream.open(TOPIC, "r") as s:
for message in s:
# Access message dictionary from JSONBlobg
message = message.content
try:
if message["_id"] == "0_test-connection":
continue
except KeyError:
pass
# Save and display
file = save_message(message, outputfolder, return_file=True)
if _display:
snews_pt_utils.display_gif()
display(message)
if _return == "message":
yield message
else:
yield file
except KeyboardInterrupt:
click.secho("Done", fg="green")