[RFC 1/6] percpu-refcount: Add managed mode for RCU released objects

Neeraj Upadhyay Neeraj.Upadhyay at amd.com
Mon Sep 16 05:08:06 UTC 2024


Add a new "managed mode" to percpu refcounts, to track initial
reference drop for refs which use RCU grace period for their object
reclaims. Typical usage pattern for such refs is:

// Called with elevated refcount
get()
    p = get_ptr();
    kref_get(&p->count);
    return p;

get()
    rcu_read_lock();
    p = get_ptr();
    if (p && !kref_get_unless_zero(&p->count))
        p = NULL;
    rcu_read_unlock();
    return p;

release()
    remove_ptr(p);
    call_rcu(&p->rcu, freep);

release()
    remove_ptr(p);
    kfree_rcu((p, rcu);

Currently, percpu ref requires users to call percpu_ref_kill() when
object usage enters a shutdown phase. Post killi operation, ref
increment/ decrement are performed on a atomic counter. For cases where
ref is actively acquired and released after percpu_ref_kill(),
percpu ref does not provide any performance benefits over using
an atomic reference counter. Managed mode offloads tracking of ref
kill to a manager thread, thereby not requiring users to explicitly
call percpu_ref_kill(). This helps avoid the problem of suboptimal
performance if a percpu ref is actively acquired and released after
percpu_ref_kill() operation.

A percpu ref can be initialized as managed either during
percpu_ref_init() by passing PERCPU_REF_REL_MANAGED flag or a
reinitable ref can be switched to managed mode using
percpu_ref_switch_to_managed() post its initialization. Deferred switch
to managed mode can be used for cases like module initialization
errors, where a inited percpu ref's initial reference is dropped before
the object becomes active and is referenced by other contexts. One such
case is Apparmor labels which are not associated yet with a namespace.
These labels are freed without waiting for a RCU grace period. So,
managed mode cannot be used for these labels until their initialization
has completed.

Following are the allowed initialization modes for managed ref:

                Atomic  Percpu   Dead  Reinit  Managed
Managed-ref       Y        N      Y      Y       Y

Following are the allowed transitions for managed ref:

To -->       A   P   P(RI)   M   D  D(RI)  D(RI/M)   KLL   REI   RES

  A          y   n     y     y   n    y       y       y     y     y
  P          n   n     n     n   y    n       n       y     n     n
  M          n   n     n     y   n    n       y       n     y     y
  P(RI)      y   n     y     y   n    y       y       y     y     y
  D(RI)      y   n     y     y   n    y       y       -     y     y
  D(RI/M)    n   n     n     y   n    n       y       -     y     y

Modes:
A - Atomic  P - PerCPU  M - Managed  P(RI) - PerCPU with ReInit
D(RI) - Dead with ReInit  D(RI/M) - Dead with ReInit and Managed

PerCPU Ref Ops:

KLL - Kill  REI - Reinit  RES - Resurrect

Once a percpu ref is switched to managed mode, it cannot be switched to
any other active mode. On reinit/resurrect, managed ref is reinitialized
in managed mode.

Signed-off-by: Neeraj Upadhyay <Neeraj.Upadhyay at amd.com>
---
 .../admin-guide/kernel-parameters.txt         |  12 +
 include/linux/percpu-refcount.h               |  13 +
 lib/percpu-refcount.c                         | 358 +++++++++++++++++-
 3 files changed, 364 insertions(+), 19 deletions(-)

diff --git a/Documentation/admin-guide/kernel-parameters.txt b/Documentation/admin-guide/kernel-parameters.txt
index 09126bb8cc9f..0f02a1b04fe9 100644
--- a/Documentation/admin-guide/kernel-parameters.txt
+++ b/Documentation/admin-guide/kernel-parameters.txt
@@ -4665,6 +4665,18 @@
 			allocator.  This parameter is primarily	for debugging
 			and performance comparison.
 
+	percpu_refcount.max_scan_count= [KNL]
+			Specifies the maximum number of percpu ref nodes which
+			are processed in one run of percpu ref manager thread.
+
+			Default: 100
+
+	percpu_refcount.scan_interval= [KNL]
+			Specifies the duration (ms) between two runs of manager
+			thread.
+
+			Default: 5000 ms
+
 	pirq=		[SMP,APIC] Manual mp-table setup
 			See Documentation/arch/x86/i386/IO-APIC.rst.
 
diff --git a/include/linux/percpu-refcount.h b/include/linux/percpu-refcount.h
index d73a1c08c3e3..e6aea81b3d01 100644
--- a/include/linux/percpu-refcount.h
+++ b/include/linux/percpu-refcount.h
@@ -68,6 +68,11 @@ enum {
 	__PERCPU_REF_FLAG_BITS	= 2,
 };
 
+/* Auxiliary flags */
+enum  {
+	__PERCPU_REL_MANAGED	= 1LU << 0,	/* operating in managed mode */
+};
+
 /* @flags for percpu_ref_init() */
 enum {
 	/*
@@ -90,6 +95,10 @@ enum {
 	 * Allow switching from atomic mode to percpu mode.
 	 */
 	PERCPU_REF_ALLOW_REINIT	= 1 << 2,
+	/*
+	 * Manage release of the percpu ref.
+	 */
+	PERCPU_REF_REL_MANAGED	= 1 << 3,
 };
 
 struct percpu_ref_data {
@@ -100,6 +109,9 @@ struct percpu_ref_data {
 	bool			allow_reinit:1;
 	struct rcu_head		rcu;
 	struct percpu_ref	*ref;
+	unsigned int		aux_flags;
+	struct llist_node	node;
+
 };
 
 struct percpu_ref {
@@ -126,6 +138,7 @@ void percpu_ref_switch_to_atomic(struct percpu_ref *ref,
 				 percpu_ref_func_t *confirm_switch);
 void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref);
 void percpu_ref_switch_to_percpu(struct percpu_ref *ref);
+int percpu_ref_switch_to_managed(struct percpu_ref *ref);
 void percpu_ref_kill_and_confirm(struct percpu_ref *ref,
 				 percpu_ref_func_t *confirm_kill);
 void percpu_ref_resurrect(struct percpu_ref *ref);
diff --git a/lib/percpu-refcount.c b/lib/percpu-refcount.c
index 668f6aa6a75d..7b97f9728c5b 100644
--- a/lib/percpu-refcount.c
+++ b/lib/percpu-refcount.c
@@ -5,6 +5,9 @@
 #include <linux/sched.h>
 #include <linux/wait.h>
 #include <linux/slab.h>
+#include <linux/llist.h>
+#include <linux/moduleparam.h>
+#include <linux/types.h>
 #include <linux/mm.h>
 #include <linux/percpu-refcount.h>
 
@@ -38,6 +41,7 @@
 
 static DEFINE_SPINLOCK(percpu_ref_switch_lock);
 static DECLARE_WAIT_QUEUE_HEAD(percpu_ref_switch_waitq);
+static LLIST_HEAD(percpu_ref_manage_head);
 
 static unsigned long __percpu *percpu_count_ptr(struct percpu_ref *ref)
 {
@@ -45,6 +49,8 @@ static unsigned long __percpu *percpu_count_ptr(struct percpu_ref *ref)
 		(ref->percpu_count_ptr & ~__PERCPU_REF_ATOMIC_DEAD);
 }
 
+int percpu_ref_switch_to_managed(struct percpu_ref *ref);
+
 /**
  * percpu_ref_init - initialize a percpu refcount
  * @ref: percpu_ref to initialize
@@ -80,6 +86,9 @@ int percpu_ref_init(struct percpu_ref *ref, percpu_ref_func_t *release,
 		return -ENOMEM;
 	}
 
+	if (flags & PERCPU_REF_REL_MANAGED)
+		flags |= PERCPU_REF_ALLOW_REINIT;
+
 	data->force_atomic = flags & PERCPU_REF_INIT_ATOMIC;
 	data->allow_reinit = flags & PERCPU_REF_ALLOW_REINIT;
 
@@ -101,10 +110,73 @@ int percpu_ref_init(struct percpu_ref *ref, percpu_ref_func_t *release,
 	data->confirm_switch = NULL;
 	data->ref = ref;
 	ref->data = data;
+	init_llist_node(&data->node);
+
+	if (flags & PERCPU_REF_REL_MANAGED)
+		percpu_ref_switch_to_managed(ref);
+
 	return 0;
 }
 EXPORT_SYMBOL_GPL(percpu_ref_init);
 
+static bool percpu_ref_is_managed(struct percpu_ref *ref)
+{
+	return (ref->data->aux_flags & __PERCPU_REL_MANAGED) != 0;
+}
+
+static void __percpu_ref_switch_mode(struct percpu_ref *ref,
+				     percpu_ref_func_t *confirm_switch);
+
+static int __percpu_ref_switch_to_managed(struct percpu_ref *ref)
+{
+	unsigned long __percpu *percpu_count;
+	struct percpu_ref_data *data;
+	int ret = -1;
+
+	data = ref->data;
+
+	if (WARN_ONCE(!percpu_ref_tryget(ref), "Percpu ref is not active"))
+		return ret;
+
+	if (WARN_ONCE(!data->allow_reinit, "Percpu ref does not allow switch"))
+		goto err_switch_managed;
+
+	if (WARN_ONCE(percpu_ref_is_managed(ref), "Percpu ref is already managed"))
+		goto err_switch_managed;
+
+	data->aux_flags |= __PERCPU_REL_MANAGED;
+	data->force_atomic = false;
+	if (!__ref_is_percpu(ref, &percpu_count))
+		__percpu_ref_switch_mode(ref, NULL);
+	/* Ensure ordering of percpu mode switch and node scan */
+	smp_mb();
+	llist_add(&data->node, &percpu_ref_manage_head);
+
+	return 0;
+
+err_switch_managed:
+	percpu_ref_put(ref);
+	return ret;
+}
+
+/**
+ * percpu_ref_switch_to_managed - Switch an unmanaged ref to percpu mode.
+ *
+ * @ref: percpu_ref to switch to managed mode
+ *
+ */
+int percpu_ref_switch_to_managed(struct percpu_ref *ref)
+{
+	unsigned long flags;
+	int ret;
+
+	spin_lock_irqsave(&percpu_ref_switch_lock, flags);
+	ret = __percpu_ref_switch_to_managed(ref);
+	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
+	return ret;
+}
+EXPORT_SYMBOL_GPL(percpu_ref_switch_to_managed);
+
 static void __percpu_ref_exit(struct percpu_ref *ref)
 {
 	unsigned long __percpu *percpu_count = percpu_count_ptr(ref);
@@ -283,6 +355,27 @@ static void __percpu_ref_switch_mode(struct percpu_ref *ref,
 		__percpu_ref_switch_to_percpu(ref);
 }
 
+static bool __percpu_ref_switch_to_atomic_checked(struct percpu_ref *ref,
+						  percpu_ref_func_t *confirm_switch,
+						  bool check_managed)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&percpu_ref_switch_lock, flags);
+	if (check_managed && WARN_ONCE(percpu_ref_is_managed(ref),
+		      "Percpu ref is managed, cannot switch to atomic mode")) {
+		spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
+		return false;
+	}
+
+	ref->data->force_atomic = true;
+	__percpu_ref_switch_mode(ref, confirm_switch);
+
+	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
+
+	return true;
+}
+
 /**
  * percpu_ref_switch_to_atomic - switch a percpu_ref to atomic mode
  * @ref: percpu_ref to switch to atomic mode
@@ -306,17 +399,16 @@ static void __percpu_ref_switch_mode(struct percpu_ref *ref,
 void percpu_ref_switch_to_atomic(struct percpu_ref *ref,
 				 percpu_ref_func_t *confirm_switch)
 {
-	unsigned long flags;
-
-	spin_lock_irqsave(&percpu_ref_switch_lock, flags);
-
-	ref->data->force_atomic = true;
-	__percpu_ref_switch_mode(ref, confirm_switch);
-
-	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
+	(void)__percpu_ref_switch_to_atomic_checked(ref, confirm_switch, true);
 }
 EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic);
 
+static void __percpu_ref_switch_to_atomic_sync_checked(struct percpu_ref *ref, bool check_managed)
+{
+	if (!__percpu_ref_switch_to_atomic_checked(ref, NULL, check_managed))
+		return;
+	wait_event(percpu_ref_switch_waitq, !ref->data->confirm_switch);
+}
 /**
  * percpu_ref_switch_to_atomic_sync - switch a percpu_ref to atomic mode
  * @ref: percpu_ref to switch to atomic mode
@@ -327,11 +419,28 @@ EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic);
  */
 void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref)
 {
-	percpu_ref_switch_to_atomic(ref, NULL);
-	wait_event(percpu_ref_switch_waitq, !ref->data->confirm_switch);
+	__percpu_ref_switch_to_atomic_sync_checked(ref, true);
 }
 EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic_sync);
 
+static void __percpu_ref_switch_to_percpu_checked(struct percpu_ref *ref, bool check_managed)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&percpu_ref_switch_lock, flags);
+
+	if (check_managed && WARN_ONCE(percpu_ref_is_managed(ref),
+		      "Percpu ref is managed, cannot switch to percpu mode")) {
+		spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
+		return;
+	}
+
+	ref->data->force_atomic = false;
+	__percpu_ref_switch_mode(ref, NULL);
+
+	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
+}
+
 /**
  * percpu_ref_switch_to_percpu - switch a percpu_ref to percpu mode
  * @ref: percpu_ref to switch to percpu mode
@@ -352,14 +461,7 @@ EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic_sync);
  */
 void percpu_ref_switch_to_percpu(struct percpu_ref *ref)
 {
-	unsigned long flags;
-
-	spin_lock_irqsave(&percpu_ref_switch_lock, flags);
-
-	ref->data->force_atomic = false;
-	__percpu_ref_switch_mode(ref, NULL);
-
-	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
+	__percpu_ref_switch_to_percpu_checked(ref, true);
 }
 EXPORT_SYMBOL_GPL(percpu_ref_switch_to_percpu);
 
@@ -472,8 +574,226 @@ void percpu_ref_resurrect(struct percpu_ref *ref)
 
 	ref->percpu_count_ptr &= ~__PERCPU_REF_DEAD;
 	percpu_ref_get(ref);
-	__percpu_ref_switch_mode(ref, NULL);
+	if (percpu_ref_is_managed(ref)) {
+		ref->data->aux_flags &= ~__PERCPU_REL_MANAGED;
+		__percpu_ref_switch_to_managed(ref);
+	} else {
+		__percpu_ref_switch_mode(ref, NULL);
+	}
 
 	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
 }
 EXPORT_SYMBOL_GPL(percpu_ref_resurrect);
+
+#define DEFAULT_SCAN_INTERVAL_MS    5000
+/* Interval duration between two ref scans. */
+static ulong scan_interval = DEFAULT_SCAN_INTERVAL_MS;
+module_param(scan_interval, ulong, 0444);
+
+#define DEFAULT_MAX_SCAN_COUNT      100
+/* Number of percpu refs scanned in one iteration of worker execution. */
+static int max_scan_count = DEFAULT_MAX_SCAN_COUNT;
+module_param(max_scan_count, int, 0444);
+
+static void percpu_ref_release_work_fn(struct work_struct *work);
+
+/*
+ * Sentinel llist nodes for lockless list traveral and deletions by
+ * the pcpu ref release worker, while nodes are added from
+ * percpu_ref_init() and percpu_ref_switch_to_managed().
+ *
+ * Sentinel node marks the head of list traversal for the current
+ * iteration of kworker execution.
+ */
+struct percpu_ref_sen_node {
+	bool inuse;
+	struct llist_node node;
+};
+
+/*
+ * We need two sentinel nodes for lockless list manipulations from release
+ * worker - first node will be used in current reclaim iteration. The second
+ * node will be used in next iteration. Next iteration marks the first node
+ * as free, for use in subsequent iteration.
+ */
+#define PERCPU_REF_SEN_NODES_COUNT     2
+
+/* Track last processed percpu ref node */
+static struct llist_node *last_percpu_ref_node;
+
+static struct percpu_ref_sen_node
+	percpu_ref_sen_nodes[PERCPU_REF_SEN_NODES_COUNT];
+
+static DECLARE_DELAYED_WORK(percpu_ref_release_work, percpu_ref_release_work_fn);
+
+static bool percpu_ref_is_sen_node(struct llist_node *node)
+{
+	return &percpu_ref_sen_nodes[0].node <= node &&
+		node <= &percpu_ref_sen_nodes[PERCPU_REF_SEN_NODES_COUNT - 1].node;
+}
+
+static struct llist_node *percpu_ref_get_sen_node(void)
+{
+	int i;
+	struct percpu_ref_sen_node *sn;
+
+	for (i = 0; i < PERCPU_REF_SEN_NODES_COUNT; i++) {
+		sn = &percpu_ref_sen_nodes[i];
+		if (!sn->inuse) {
+			sn->inuse = true;
+			return &sn->node;
+		}
+	}
+
+	return NULL;
+}
+
+static void percpu_ref_put_sen_node(struct llist_node *node)
+{
+	struct percpu_ref_sen_node *sn = container_of(node, struct percpu_ref_sen_node, node);
+
+	sn->inuse = false;
+	init_llist_node(node);
+}
+
+static void percpu_ref_put_all_sen_nodes_except(struct llist_node *node)
+{
+	int i;
+
+	for (i = 0; i < PERCPU_REF_SEN_NODES_COUNT; i++) {
+		if (&percpu_ref_sen_nodes[i].node == node)
+			continue;
+		percpu_ref_sen_nodes[i].inuse = false;
+		init_llist_node(&percpu_ref_sen_nodes[i].node);
+	}
+}
+
+static struct workqueue_struct *percpu_ref_release_wq;
+
+static void percpu_ref_release_work_fn(struct work_struct *work)
+{
+	struct llist_node *pos, *first, *head, *prev, *next;
+	struct llist_node *sen_node;
+	struct percpu_ref *ref;
+	int count = 0;
+	bool held;
+
+	first = READ_ONCE(percpu_ref_manage_head.first);
+	if (!first)
+		goto queue_release_work;
+
+	/*
+	 * Enqueue a dummy node to mark the start of scan. This dummy
+	 * node is used as start point of scan and ensures that
+	 * there is no additional synchronization required with new
+	 * label node additions to the llist. Any new labels will
+	 * be processed in next run of the kworker.
+	 *
+	 *                SCAN START PTR
+	 *                     |
+	 *                     v
+	 * +----------+     +------+    +------+    +------+
+	 * |          |     |      |    |      |    |      |
+	 * |   head   ------> dummy|--->|label |--->| label|--->NULL
+	 * |          |     | node |    |      |    |      |
+	 * +----------+     +------+    +------+    +------+
+	 *
+	 *
+	 * New label addition:
+	 *
+	 *                       SCAN START PTR
+	 *                            |
+	 *                            v
+	 * +----------+  +------+  +------+    +------+    +------+
+	 * |          |  |      |  |      |    |      |    |      |
+	 * |   head   |--> label|--> dummy|--->|label |--->| label|--->NULL
+	 * |          |  |      |  | node |    |      |    |      |
+	 * +----------+  +------+  +------+    +------+    +------+
+	 *
+	 */
+	if (last_percpu_ref_node == NULL || last_percpu_ref_node->next == NULL) {
+retry_sentinel_get:
+		sen_node = percpu_ref_get_sen_node();
+		/*
+		 * All sentinel nodes are in use? This should not happen, as we
+		 * require only one sentinel for the start of list traversal and
+		 * other sentinel node is freed during the traversal.
+		 */
+		if (WARN_ONCE(!sen_node, "All sentinel nodes are in use")) {
+			/* Use first node as the sentinel node */
+			head = first->next;
+			if (!head) {
+				struct llist_node *ign_node = NULL;
+				/*
+				 * We exhausted sentinel nodes. However, there aren't
+				 * enough nodes in the llist. So, we have leaked
+				 * sentinel nodes. Reclaim sentinels and retry.
+				 */
+				if (percpu_ref_is_sen_node(first))
+					ign_node = first;
+				percpu_ref_put_all_sen_nodes_except(ign_node);
+				goto retry_sentinel_get;
+			}
+			prev = first;
+		} else {
+			llist_add(sen_node, &percpu_ref_manage_head);
+			prev = sen_node;
+			head = prev->next;
+		}
+	} else {
+		prev = last_percpu_ref_node;
+		head = prev->next;
+	}
+
+	last_percpu_ref_node = NULL;
+	llist_for_each_safe(pos, next, head) {
+		/* Free sentinel node which is present in the list */
+		if (percpu_ref_is_sen_node(pos)) {
+			prev->next = pos->next;
+			percpu_ref_put_sen_node(pos);
+			continue;
+		}
+
+		ref = container_of(pos, struct percpu_ref_data, node)->ref;
+		__percpu_ref_switch_to_atomic_sync_checked(ref, false);
+		/*
+		 * Drop the ref while in RCU read critical section to
+		 * prevent obj free while we manipulating node.
+		 */
+		rcu_read_lock();
+		percpu_ref_put(ref);
+		held = percpu_ref_tryget(ref);
+		if (!held) {
+			prev->next = pos->next;
+			init_llist_node(pos);
+			ref->percpu_count_ptr |= __PERCPU_REF_DEAD;
+		}
+		rcu_read_unlock();
+		if (!held)
+			continue;
+		__percpu_ref_switch_to_percpu_checked(ref, false);
+		count++;
+		if (count == max_scan_count) {
+			last_percpu_ref_node = pos;
+			break;
+		}
+		prev = pos;
+	}
+
+queue_release_work:
+	queue_delayed_work(percpu_ref_release_wq, &percpu_ref_release_work,
+			   scan_interval);
+}
+
+static __init int percpu_ref_setup(void)
+{
+	percpu_ref_release_wq = alloc_workqueue("percpu_ref_release_wq",
+				WQ_UNBOUND | WQ_MEM_RECLAIM | WQ_FREEZABLE, 0);
+	if (!percpu_ref_release_wq)
+		return -ENOMEM;
+
+	queue_delayed_work(percpu_ref_release_wq, &percpu_ref_release_work,
+			   scan_interval);
+	return 0;
+}
+early_initcall(percpu_ref_setup);
-- 
2.34.1




More information about the Linux-security-module-archive mailing list