Admin interfaces

This module defines the dasf-broker-django Admin interfaces.

Classes:

AvailabilityFilter(request, params, model, ...)

Filter topics based on their status.

BrokerMessageAdmin(model, admin_site)

An admin for a broker message

BrokerTopicAdmin(model, admin_site)

An admin for :model:`dasf_broker.BrokerTopic

HasBeenDeliveredFilter(request, params, ...)

Filter broker messages by their delivery state

IsResponseMessageFilter(request, params, ...)

Filter broker messages to a response topic

ResponseTopicAdmin(model, admin_site)

An admin for response topics.

ResponseTopicExistsFilter(request, params, ...)

Filter topics for response topics

Functions:

garbage_collect(modeladmin, request, queryset)

ping_message_topics(modeladmin, request, ...)

ping_topics(modeladmin, request, queryset)

Ping the broker topics.

send_messages(modeladmin, request, queryset)

class dasf_broker.admin.AvailabilityFilter(request, params, model, model_admin)

Bases: SimpleListFilter

Filter topics based on their status.

Methods:

lookups(request, model_admin)

Must be overridden to return a list of tuples (value, verbose value)

queryset(request, queryset)

Return the filtered queryset.

Attributes:

parameter_name

title

lookups(request: Any, model_admin: Any) List[Tuple[Any, str]]

Must be overridden to return a list of tuples (value, verbose value)

parameter_name = 'last_pong'
queryset(request: Any, queryset: models.BrokerTopicQuerySet) QuerySet[Any] | None

Return the filtered queryset.

title = 'Availability'
class dasf_broker.admin.BrokerMessageAdmin(model, admin_site)

Bases: GuardedModelAdmin

An admin for a broker message

Attributes:

actions

list_display

list_filter

media

readonly_fields

search_fields

Methods:

delivered(obj)

is_response(obj)

topic_availability(obj)

actions = [<function send_messages>, <function ping_message_topics>]
delivered(obj: BrokerMessage) bool
is_response(obj: BrokerMessage) bool
list_display = ['message_id', 'topic', 'topic_availability', 'is_response', 'user', 'delivered', 'date_created']
list_filter = ['date_created', <class 'dasf_broker.admin.IsResponseMessageFilter'>, <class 'dasf_broker.admin.HasBeenDeliveredFilter'>]
property media
readonly_fields = ['content']
search_fields = ['topic__slug__icontains', 'topic__responsetopic__is_response_for__slug__icontains', 'user__username__icontains', 'user__email__icontains', 'user__first_name__icontains', 'user__last_name__icontains']
topic_availability(obj: BrokerMessage) bool | None
class dasf_broker.admin.BrokerTopicAdmin(model, admin_site)

Bases: GuardedModelAdmin

An admin for :model:`dasf_broker.BrokerTopic

Attributes:

actions

list_display

list_filter

media

search_fields

Methods:

get_availability(obj)

get_consumers(obj)

get_producers(obj)

is_response_topic(obj)

pending_messages(obj)

Get the number of messages that have not yet been delivered

stored_messages(obj)

Get the number of messages in the database.

stored_responses(obj)

Get the number of messages in the database.

actions = [<function ping_topics>, <function garbage_collect>]
get_availability(obj: BrokerTopic) bool | None
get_consumers(obj: BrokerTopic) str
get_producers(obj: BrokerTopic) str
is_response_topic(obj: BrokerTopic) bool
list_display = ['slug', 'get_consumers', 'get_producers', 'is_public', 'is_response_topic', 'get_availability', 'pending_messages', 'stored_messages', 'stored_responses']
list_filter = ['is_public', 'date_created', <class 'dasf_broker.admin.ResponseTopicExistsFilter'>, <class 'dasf_broker.admin.AvailabilityFilter'>]
property media
pending_messages(obj: BrokerTopic) str

Get the number of messages that have not yet been delivered

search_fields = ['slug__icontains']
stored_messages(obj: BrokerTopic) str

Get the number of messages in the database.

stored_responses(obj: BrokerTopic) str

Get the number of messages in the database.

class dasf_broker.admin.HasBeenDeliveredFilter(request, params, model, model_admin)

Bases: SimpleListFilter

Filter broker messages by their delivery state

Methods:

lookups(request, model_admin)

Must be overridden to return a list of tuples (value, verbose value)

queryset(request, queryset)

Return the filtered queryset.

Attributes:

parameter_name

title

lookups(request: Any, model_admin: Any) List[Tuple[Any, str]]

Must be overridden to return a list of tuples (value, verbose value)

parameter_name = 'delivered_to'
queryset(request: Any, queryset: QuerySet[Any]) QuerySet[Any] | None

Return the filtered queryset.

title = 'has been delivered'
class dasf_broker.admin.IsResponseMessageFilter(request, params, model, model_admin)

Bases: SimpleListFilter

Filter broker messages to a response topic

Methods:

lookups(request, model_admin)

Must be overridden to return a list of tuples (value, verbose value)

queryset(request, queryset)

Return the filtered queryset.

Attributes:

parameter_name

title

lookups(request: Any, model_admin: Any) List[Tuple[Any, str]]

Must be overridden to return a list of tuples (value, verbose value)

parameter_name = 'topic'
queryset(request: Any, queryset: QuerySet[Any]) QuerySet[Any] | None

Return the filtered queryset.

title = 'is response message'
class dasf_broker.admin.ResponseTopicAdmin(model, admin_site)

Bases: BrokerTopicAdmin

An admin for response topics.

Attributes:

list_display

list_filter

media

search_fields

list_display = ['slug', 'get_consumers', 'get_producers', 'is_public', 'is_response_topic', 'get_availability', 'pending_messages', 'stored_messages']
list_filter = ['is_public', 'date_created']
property media
search_fields = ['slug__icontains', 'is_response_for__slug__icontains']
class dasf_broker.admin.ResponseTopicExistsFilter(request, params, model, model_admin)

Bases: SimpleListFilter

Filter topics for response topics

Methods:

lookups(request, model_admin)

Must be overridden to return a list of tuples (value, verbose value)

queryset(request, queryset)

Return the filtered queryset.

Attributes:

parameter_name

title

lookups(request: Any, model_admin: Any) List[Tuple[Any, str]]

Must be overridden to return a list of tuples (value, verbose value)

parameter_name = 'responsetopic'
queryset(request: Any, queryset: QuerySet[Any]) QuerySet[Any] | None

Return the filtered queryset.

title = 'is response topic'
dasf_broker.admin.garbage_collect(modeladmin, request, queryset)
dasf_broker.admin.ping_message_topics(modeladmin, request, queryset)
dasf_broker.admin.ping_topics(modeladmin, request, queryset)

Ping the broker topics.

dasf_broker.admin.send_messages(modeladmin, request, queryset)