Source code for test_plugin

import os
import signal
import subprocess


[docs] def test_subscribe_and_direct(): # Subscribe to SNEWS alters in a subprocess via CLI. p = subprocess.Popen( ["snews_pt", "subscribe", "-p ../auxiliary/custom_script.py", "--no-firedrill"], stdout=subprocess.PIPE, shell=False, preexec_fn=os.setsid, ) # List for snews_pt echos echos = [p.stdout.readline() for _ in range(3)] # Send 'ctrl + c' to process os.killpg(os.getpgid(p.pid), signal.SIGTERM) assert echos[1] == b"You are subscribing to ALERT\n" assert echos[2] == b"Broker:kafka://kafka.scimma.org/snews.alert-test\n"