Source code for test_messages

"""Test publishing coincidence tier messages."""

from snews import messages

from snews_pt.messages import Publisher


[docs] def test_publisher_add_message(): publisher = Publisher("kafka://kafka.scimma.org/snews.experiments-test") publisher.add_message( messages.HeartbeatMessage( detector_name="XENONnT", machine_time="2012-06-09T15:30:00.000501", detector_status="ON", firedrill_mode=False, is_test=True, ) ) assert len(publisher.message_queue) == 1
[docs] def test_publisher_send_message(): publisher = Publisher("kafka://kafka.scimma.org/snews.experiments-test") publisher.add_message( messages.HeartbeatMessage( detector_name="XENONnT", machine_time_utc="2012-06-09T15:30:00.000501", detector_status="ON", firedrill_mode=False, is_test=True, ) ) publisher.send() assert True
[docs] def test_messages_main_function(): from snews_pt.messages import test test() assert True
[docs] def test_heartbeat_message(): messages.HeartbeatMessage( detector_name="XENONnT", machine_time_utc="2012-06-09T15:30:00.000501", detector_status="ON", firedrill_mode=False, is_test=True, ) assert True
[docs] def test_retraction_message(): assert True
[docs] def test_significance_tier_message(): messages.SignificanceTierMessage( detector_name="DS-20K", machine_time_utc="2012-06-09T15:30:00.000501", neutrino_time_utc="2012-06-09T15:31:08.109876", p_values=[0.4, 0.5], t_bin_width_sec=0.8, is_firedrill=False, is_test=True, ) assert True
[docs] def test_timing_tier_message(): messages.TimingTierMessage( detector_name="XENONnT", neutrino_time_utc="2012-06-09T15:31:08.109876", timing_series=[ 0, 6, 15, 4982 ], start_time_utc="2012-06-09T15:30:00.009876", machine_time_utc="2012-06-09T15:30:00.009876", is_firedrill=False, is_test=True, ) assert True
[docs] def test_coincidence_tier_message(): """Test with example of expected message type.""" messages.CoincidenceTierMessage( detector_name="KamLAND", machine_time_utc="2012-06-09T15:30:00.000501", neutrino_time_utc="2012-06-09T15:31:08.891011", is_firedrill=False, is_test=True, ) assert True