summaryrefslogtreecommitdiff
path: root/net/tipc/subscr.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/subscr.c')
-rw-r--r--net/tipc/subscr.c400
1 files changed, 97 insertions, 303 deletions
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index d38bb45d82e9..f8490d94e323 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -1,8 +1,9 @@
/*
* net/tipc/subscr.c: TIPC network topology service
*
- * Copyright (c) 2000-2006, Ericsson AB
+ * Copyright (c) 2000-2017, Ericsson AB
* Copyright (c) 2005-2007, 2010-2013, Wind River Systems
+ * Copyright (c) 2020-2021, Red Hat Inc
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -36,354 +37,147 @@
#include "core.h"
#include "name_table.h"
-#include "port.h"
#include "subscr.h"
-/**
- * struct tipc_subscriber - TIPC network topology subscriber
- * @conid: connection identifier to server connecting to subscriber
- * @lock: controll access to subscriber
- * @subscription_list: list of subscription objects for this subscriber
- */
-struct tipc_subscriber {
- int conid;
- spinlock_t lock;
- struct list_head subscription_list;
-};
-
-static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
- void *usr_data, void *buf, size_t len);
-static void *subscr_named_msg_event(int conid);
-static void subscr_conn_shutdown_event(int conid, void *usr_data);
-
-static atomic_t subscription_count = ATOMIC_INIT(0);
-
-static struct sockaddr_tipc topsrv_addr __read_mostly = {
- .family = AF_TIPC,
- .addrtype = TIPC_ADDR_NAMESEQ,
- .addr.nameseq.type = TIPC_TOP_SRV,
- .addr.nameseq.lower = TIPC_TOP_SRV,
- .addr.nameseq.upper = TIPC_TOP_SRV,
- .scope = TIPC_NODE_SCOPE
-};
-
-static struct tipc_server topsrv __read_mostly = {
- .saddr = &topsrv_addr,
- .imp = TIPC_CRITICAL_IMPORTANCE,
- .type = SOCK_SEQPACKET,
- .max_rcvbuf_size = sizeof(struct tipc_subscr),
- .name = "topology_server",
- .tipc_conn_recvmsg = subscr_conn_msg_event,
- .tipc_conn_new = subscr_named_msg_event,
- .tipc_conn_shutdown = subscr_conn_shutdown_event,
-};
-
-/**
- * htohl - convert value to endianness used by destination
- * @in: value to convert
- * @swap: non-zero if endianness must be reversed
- *
- * Returns converted value
- */
-static u32 htohl(u32 in, int swap)
+static void tipc_sub_send_event(struct tipc_subscription *sub,
+ struct publication *p,
+ u32 event)
{
- return swap ? swab32(in) : in;
-}
+ struct tipc_subscr *s = &sub->evt.s;
+ struct tipc_event *evt = &sub->evt;
-static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
- u32 found_upper, u32 event, u32 port_ref,
- u32 node)
-{
- struct tipc_subscriber *subscriber = sub->subscriber;
- struct kvec msg_sect;
- int ret;
-
- msg_sect.iov_base = (void *)&sub->evt;
- msg_sect.iov_len = sizeof(struct tipc_event);
-
- sub->evt.event = htohl(event, sub->swap);
- sub->evt.found_lower = htohl(found_lower, sub->swap);
- sub->evt.found_upper = htohl(found_upper, sub->swap);
- sub->evt.port.ref = htohl(port_ref, sub->swap);
- sub->evt.port.node = htohl(node, sub->swap);
- ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL,
- msg_sect.iov_base, msg_sect.iov_len);
- if (ret < 0)
- pr_err("Sending subscription event failed, no memory\n");
+ if (sub->inactive)
+ return;
+ tipc_evt_write(evt, event, event);
+ if (p) {
+ tipc_evt_write(evt, found_lower, p->sr.lower);
+ tipc_evt_write(evt, found_upper, p->sr.upper);
+ tipc_evt_write(evt, port.ref, p->sk.ref);
+ tipc_evt_write(evt, port.node, p->sk.node);
+ } else {
+ tipc_evt_write(evt, found_lower, s->seq.lower);
+ tipc_evt_write(evt, found_upper, s->seq.upper);
+ tipc_evt_write(evt, port.ref, 0);
+ tipc_evt_write(evt, port.node, 0);
+ }
+ tipc_topsrv_queue_evt(sub->net, sub->conid, event, evt);
}
/**
- * tipc_subscr_overlap - test for subscription overlap with the given values
+ * tipc_sub_check_overlap - test for subscription overlap with the given values
+ * @subscribed: the service range subscribed for
+ * @found: the service range we are checking for match
*
- * Returns 1 if there is overlap, otherwise 0.
+ * Returns true if there is overlap, otherwise false.
*/
-int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
- u32 found_upper)
+static bool tipc_sub_check_overlap(struct tipc_service_range *subscribed,
+ struct tipc_service_range *found)
{
- if (found_lower < sub->seq.lower)
- found_lower = sub->seq.lower;
- if (found_upper > sub->seq.upper)
- found_upper = sub->seq.upper;
- if (found_lower > found_upper)
- return 0;
- return 1;
+ u32 found_lower = found->lower;
+ u32 found_upper = found->upper;
+
+ if (found_lower < subscribed->lower)
+ found_lower = subscribed->lower;
+ if (found_upper > subscribed->upper)
+ found_upper = subscribed->upper;
+ return found_lower <= found_upper;
}
-/**
- * tipc_subscr_report_overlap - issue event if there is subscription overlap
- *
- * Protected by nameseq.lock in name_table.c
- */
-void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
- u32 found_upper, u32 event, u32 port_ref,
- u32 node, int must)
+void tipc_sub_report_overlap(struct tipc_subscription *sub,
+ struct publication *p,
+ u32 event, bool must)
{
- if (!tipc_subscr_overlap(sub, found_lower, found_upper))
+ struct tipc_service_range *sr = &sub->s.seq;
+ u32 filter = sub->s.filter;
+
+ if (!tipc_sub_check_overlap(sr, &p->sr))
return;
- if (!must && !(sub->filter & TIPC_SUB_PORTS))
+ if (!must && !(filter & TIPC_SUB_PORTS))
return;
-
- subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
-}
-
-static void subscr_timeout(struct tipc_subscription *sub)
-{
- struct tipc_subscriber *subscriber = sub->subscriber;
-
- /* The spin lock per subscriber is used to protect its members */
- spin_lock_bh(&subscriber->lock);
-
- /* Validate if the connection related to the subscriber is
- * closed (in case subscriber is terminating)
- */
- if (subscriber->conid == 0) {
- spin_unlock_bh(&subscriber->lock);
+ if (filter & TIPC_SUB_CLUSTER_SCOPE && p->scope == TIPC_NODE_SCOPE)
return;
- }
-
- /* Validate timeout (in case subscription is being cancelled) */
- if (sub->timeout == TIPC_WAIT_FOREVER) {
- spin_unlock_bh(&subscriber->lock);
+ if (filter & TIPC_SUB_NODE_SCOPE && p->scope != TIPC_NODE_SCOPE)
return;
- }
-
- /* Unlink subscription from name table */
- tipc_nametbl_unsubscribe(sub);
-
- /* Unlink subscription from subscriber */
- list_del(&sub->subscription_list);
-
- spin_unlock_bh(&subscriber->lock);
-
- /* Notify subscriber of timeout */
- subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
- TIPC_SUBSCR_TIMEOUT, 0, 0);
-
- /* Now destroy subscription */
- k_term_timer(&sub->timer);
- kfree(sub);
- atomic_dec(&subscription_count);
+ spin_lock(&sub->lock);
+ tipc_sub_send_event(sub, p, event);
+ spin_unlock(&sub->lock);
}
-/**
- * subscr_del - delete a subscription within a subscription list
- *
- * Called with subscriber lock held.
- */
-static void subscr_del(struct tipc_subscription *sub)
+static void tipc_sub_timeout(struct timer_list *t)
{
- tipc_nametbl_unsubscribe(sub);
- list_del(&sub->subscription_list);
- kfree(sub);
- atomic_dec(&subscription_count);
+ struct tipc_subscription *sub = timer_container_of(sub, t, timer);
+
+ spin_lock(&sub->lock);
+ tipc_sub_send_event(sub, NULL, TIPC_SUBSCR_TIMEOUT);
+ sub->inactive = true;
+ spin_unlock(&sub->lock);
}
-/**
- * subscr_terminate - terminate communication with a subscriber
- *
- * Note: Must call it in process context since it might sleep.
- */
-static void subscr_terminate(struct tipc_subscriber *subscriber)
+static void tipc_sub_kref_release(struct kref *kref)
{
- tipc_conn_terminate(&topsrv, subscriber->conid);
+ kfree(container_of(kref, struct tipc_subscription, kref));
}
-static void subscr_release(struct tipc_subscriber *subscriber)
+void tipc_sub_put(struct tipc_subscription *subscription)
{
- struct tipc_subscription *sub;
- struct tipc_subscription *sub_temp;
-
- spin_lock_bh(&subscriber->lock);
-
- /* Invalidate subscriber reference */
- subscriber->conid = 0;
-
- /* Destroy any existing subscriptions for subscriber */
- list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
- subscription_list) {
- if (sub->timeout != TIPC_WAIT_FOREVER) {
- spin_unlock_bh(&subscriber->lock);
- k_cancel_timer(&sub->timer);
- k_term_timer(&sub->timer);
- spin_lock_bh(&subscriber->lock);
- }
- subscr_del(sub);
- }
- spin_unlock_bh(&subscriber->lock);
-
- /* Now destroy subscriber */
- kfree(subscriber);
+ kref_put(&subscription->kref, tipc_sub_kref_release);
}
-/**
- * subscr_cancel - handle subscription cancellation request
- *
- * Called with subscriber lock held. Routine must temporarily release lock
- * to enable the subscription timeout routine to finish without deadlocking;
- * the lock is then reclaimed to allow caller to release it upon return.
- *
- * Note that fields of 's' use subscriber's endianness!
- */
-static void subscr_cancel(struct tipc_subscr *s,
- struct tipc_subscriber *subscriber)
+void tipc_sub_get(struct tipc_subscription *subscription)
{
- struct tipc_subscription *sub;
- struct tipc_subscription *sub_temp;
- int found = 0;
-
- /* Find first matching subscription, exit if not found */
- list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
- subscription_list) {
- if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
- found = 1;
- break;
- }
- }
- if (!found)
- return;
-
- /* Cancel subscription timer (if used), then delete subscription */
- if (sub->timeout != TIPC_WAIT_FOREVER) {
- sub->timeout = TIPC_WAIT_FOREVER;
- spin_unlock_bh(&subscriber->lock);
- k_cancel_timer(&sub->timer);
- k_term_timer(&sub->timer);
- spin_lock_bh(&subscriber->lock);
- }
- subscr_del(sub);
+ kref_get(&subscription->kref);
}
-/**
- * subscr_subscribe - create subscription for subscriber
- *
- * Called with subscriber lock held.
- */
-static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
- struct tipc_subscriber *subscriber)
+struct tipc_subscription *tipc_sub_subscribe(struct net *net,
+ struct tipc_subscr *s,
+ int conid)
{
+ u32 lower = tipc_sub_read(s, seq.lower);
+ u32 upper = tipc_sub_read(s, seq.upper);
+ u32 filter = tipc_sub_read(s, filter);
struct tipc_subscription *sub;
- int swap;
-
- /* Determine subscriber's endianness */
- swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));
+ u32 timeout;
- /* Detect & process a subscription cancellation request */
- if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
- s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
- subscr_cancel(s, subscriber);
- return NULL;
- }
-
- /* Refuse subscription if global limit exceeded */
- if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
- pr_warn("Subscription rejected, limit reached (%u)\n",
- TIPC_MAX_SUBSCRIPTIONS);
- subscr_terminate(subscriber);
+ if ((filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) ||
+ lower > upper) {
+ pr_warn("Subscription rejected, illegal request\n");
return NULL;
}
-
- /* Allocate subscription object */
sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
if (!sub) {
pr_warn("Subscription rejected, no memory\n");
- subscr_terminate(subscriber);
return NULL;
}
-
- /* Initialize subscription object */
- sub->seq.type = htohl(s->seq.type, swap);
- sub->seq.lower = htohl(s->seq.lower, swap);
- sub->seq.upper = htohl(s->seq.upper, swap);
- sub->timeout = htohl(s->timeout, swap);
- sub->filter = htohl(s->filter, swap);
- if ((!(sub->filter & TIPC_SUB_PORTS) ==
- !(sub->filter & TIPC_SUB_SERVICE)) ||
- (sub->seq.lower > sub->seq.upper)) {
- pr_warn("Subscription rejected, illegal request\n");
+ INIT_LIST_HEAD(&sub->service_list);
+ INIT_LIST_HEAD(&sub->sub_list);
+ sub->net = net;
+ sub->conid = conid;
+ sub->inactive = false;
+ memcpy(&sub->evt.s, s, sizeof(*s));
+ sub->s.seq.type = tipc_sub_read(s, seq.type);
+ sub->s.seq.lower = lower;
+ sub->s.seq.upper = upper;
+ sub->s.filter = filter;
+ sub->s.timeout = tipc_sub_read(s, timeout);
+ memcpy(sub->s.usr_handle, s->usr_handle, 8);
+ spin_lock_init(&sub->lock);
+ kref_init(&sub->kref);
+ if (!tipc_nametbl_subscribe(sub)) {
kfree(sub);
- subscr_terminate(subscriber);
return NULL;
}
- INIT_LIST_HEAD(&sub->nameseq_list);
- list_add(&sub->subscription_list, &subscriber->subscription_list);
- sub->subscriber = subscriber;
- sub->swap = swap;
- memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
- atomic_inc(&subscription_count);
- if (sub->timeout != TIPC_WAIT_FOREVER) {
- k_init_timer(&sub->timer,
- (Handler)subscr_timeout, (unsigned long)sub);
- k_start_timer(&sub->timer, sub->timeout);
- }
-
+ timer_setup(&sub->timer, tipc_sub_timeout, 0);
+ timeout = tipc_sub_read(&sub->evt.s, timeout);
+ if (timeout != TIPC_WAIT_FOREVER)
+ mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
return sub;
}
-/* Handle one termination request for the subscriber */
-static void subscr_conn_shutdown_event(int conid, void *usr_data)
-{
- subscr_release((struct tipc_subscriber *)usr_data);
-}
-
-/* Handle one request to create a new subscription for the subscriber */
-static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
- void *usr_data, void *buf, size_t len)
-{
- struct tipc_subscriber *subscriber = usr_data;
- struct tipc_subscription *sub;
-
- spin_lock_bh(&subscriber->lock);
- sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber);
- if (sub)
- tipc_nametbl_subscribe(sub);
- spin_unlock_bh(&subscriber->lock);
-}
-
-
-/* Handle one request to establish a new subscriber */
-static void *subscr_named_msg_event(int conid)
+void tipc_sub_unsubscribe(struct tipc_subscription *sub)
{
- struct tipc_subscriber *subscriber;
-
- /* Create subscriber object */
- subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
- if (subscriber == NULL) {
- pr_warn("Subscriber rejected, no memory\n");
- return NULL;
- }
- INIT_LIST_HEAD(&subscriber->subscription_list);
- subscriber->conid = conid;
- spin_lock_init(&subscriber->lock);
-
- return (void *)subscriber;
-}
-
-int tipc_subscr_start(void)
-{
- return tipc_server_start(&topsrv);
-}
-
-void tipc_subscr_stop(void)
-{
- tipc_server_stop(&topsrv);
+ tipc_nametbl_unsubscribe(sub);
+ if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
+ timer_delete_sync(&sub->timer);
+ list_del(&sub->sub_list);
+ tipc_sub_put(sub);
}