[RFC PATCH 03/21] pipe: Use head and tail pointers for the ring, not cursor and length

Rasmus Villemoes linux at rasmusvillemoes.dk
Wed Oct 16 07:46:10 UTC 2019


On 15/10/2019 23.48, David Howells wrote:
> Convert pipes to use head and tail pointers for the buffer ring rather than
> pointer and length as the latter requires two atomic ops to update (or a
> combined op) whereas the former only requires one.
> 
>  (1) The head pointer is the point at which production occurs and points to
>      the slot in which the next buffer will be placed.  This is equivalent
>      to pipe->curbuf + pipe->nrbufs.
> 
>      The head pointer belongs to the write-side.
> 
>  (2) The tail pointer is the point at which consumption occurs.  It points
>      to the next slot to be consumed.  This is equivalent to pipe->curbuf.
> 
>      The tail pointer belongs to the read-side.
> 
>  (3) head and tail are allowed to run to UINT_MAX and wrap naturally.  They
>      are only masked off when the array is being accessed, e.g.:
> 
> 	pipe->bufs[head & mask]
> 
>      This means that it is not necessary to have a dead slot in the ring as
>      head == tail isn't ambiguous.
> 
>  (4) The ring is empty if "head == tail".
> 
>  (5) The occupancy of the ring is "head - tail".
> 
>  (6) The number of free slots in the ring is "(tail + pipe->ring_size) -
>      head".

Seems an odd way of writing pipe->ring_size - (head - tail) ; i.e.
obviously #free slots is #size minus #occupancy.

>  (7) The ring is full if "head >= (tail + pipe->ring_size)", which can also
>      be written as "head - tail >= pipe->ring_size".
>

No it cannot, it _must_ be written in the latter form. Assuming
sizeof(int)==1 for simplicity, consider ring_size = 16, tail = 240.
Regardless whether head is 240, 241, ..., 255, 0, tail + ring_size wraps
to 0, so the former expression states the ring is full in all cases.

Better spell out somewhere that while head and tail are free-running, at
any point in time they satisfy the invariant head - tail <= pipe_size
(and also 0 <= head - tail, but that's a tautology for unsigned
ints...). Then it's a matter of taste if one wants to write "full" as
head-tail == pipe_size or head-tail >= pipe_size.

> Also split pipe->buffers into pipe->ring_size (which indicates the size of the
> ring) and pipe->max_usage (which restricts the amount of ring that write() is
> allowed to fill).  This allows for a pipe that is both writable by the kernel
> notification facility and by userspace, allowing plenty of ring space for
> notifications to be added whilst preventing userspace from being able to use
> up too much buffer space.

That seems like something that should be added in a separate patch -
adding ->max_usage and switching appropriate users of ->ring_size over,
so it's more clear where you're using one or the other.

> @@ -1949,8 +1950,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
>  
>  	pipe_lock(pipe);
>  
> -	bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer),
> -			      GFP_KERNEL);
> +	head = pipe->head;
> +	tail = pipe->tail;
> +	mask = pipe->ring_size - 1;
> +	count = head - tail;
> +
> +	bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL);
>  	if (!bufs) {
>  		pipe_unlock(pipe);
>  		return -ENOMEM;
> @@ -1958,8 +1963,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
>  
>  	nbuf = 0;
>  	rem = 0;
> -	for (idx = 0; idx < pipe->nrbufs && rem < len; idx++)
> -		rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len;
> +	for (idx = tail; idx < head && rem < len; idx++)
> +		rem += pipe->bufs[idx & mask].len;
>  
>  	ret = -EINVAL;
>  	if (rem < len)
> @@ -1970,16 +1975,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
>  		struct pipe_buffer *ibuf;
>  		struct pipe_buffer *obuf;
>  
> -		BUG_ON(nbuf >= pipe->buffers);
> -		BUG_ON(!pipe->nrbufs);
> -		ibuf = &pipe->bufs[pipe->curbuf];
> +		BUG_ON(nbuf >= pipe->ring_size);
> +		BUG_ON(tail == head);
> +		ibuf = &pipe->bufs[tail];

I don't see where tail gets masked between tail = pipe->tail; above and
here, but I may be missing it. In any case, how about seeding head and
tail with something like 1<<20 when creating the pipe so bugs like that
are hit more quickly.

> @@ -515,17 +525,19 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
>  static long pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
>  {
>  	struct pipe_inode_info *pipe = filp->private_data;
> -	int count, buf, nrbufs;
> +	int count, head, tail, mask;
>  
>  	switch (cmd) {
>  		case FIONREAD:
>  			__pipe_lock(pipe);
>  			count = 0;
> -			buf = pipe->curbuf;
> -			nrbufs = pipe->nrbufs;
> -			while (--nrbufs >= 0) {
> -				count += pipe->bufs[buf].len;
> -				buf = (buf+1) & (pipe->buffers - 1);
> +			head = pipe->head;
> +			tail = pipe->tail;
> +			mask = pipe->ring_size - 1;
> +
> +			while (tail < head) {
> +				count += pipe->bufs[tail & mask].len;
> +				tail++;
>  			}

This is broken if head has wrapped but tail has not. It has to be "while
(head - tail)" or perhaps just "while (tail != head)" or something along
those lines.

> @@ -1086,17 +1104,21 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
>  	}
>  
>  	/*
> -	 * We can shrink the pipe, if arg >= pipe->nrbufs. Since we don't
> -	 * expect a lot of shrink+grow operations, just free and allocate
> -	 * again like we would do for growing. If the pipe currently
> +	 * We can shrink the pipe, if arg is greater than the ring occupancy.
> +	 * Since we don't expect a lot of shrink+grow operations, just free and
> +	 * allocate again like we would do for growing.  If the pipe currently
>  	 * contains more buffers than arg, then return busy.
>  	 */
> -	if (nr_pages < pipe->nrbufs) {
> +	mask = pipe->ring_size - 1;
> +	head = pipe->head & mask;
> +	tail = pipe->tail & mask;
> +	n = pipe->head - pipe->tail;

I think it's confusing to "premask" head and tail here. Can you either
drop that (pipe_set_size should hardly be a hot path?), or perhaps call
them something else to avoid a future reader seeing an unmasked
bufs[head] and thinking that's a bug?

> @@ -1254,9 +1290,10 @@ static ssize_t pipe_get_pages(struct iov_iter *i,
>  		   struct page **pages, size_t maxsize, unsigned maxpages,
>  		   size_t *start)
>  {
> +	unsigned int p_tail;
> +	unsigned int i_head;
>  	unsigned npages;
>  	size_t capacity;
> -	int idx;
>  
>  	if (!maxsize)
>  		return 0;
> @@ -1264,12 +1301,15 @@ static ssize_t pipe_get_pages(struct iov_iter *i,
>  	if (!sanity(i))
>  		return -EFAULT;
>  
> -	data_start(i, &idx, start);
> -	/* some of this one + all after this one */
> -	npages = ((i->pipe->curbuf - idx - 1) & (i->pipe->buffers - 1)) + 1;
> -	capacity = min(npages,maxpages) * PAGE_SIZE - *start;
> +	data_start(i, &i_head, start);
> +	p_tail = i->pipe->tail;
> +	/* Amount of free space: some of this one + all after this one */
> +	npages = (p_tail + i->pipe->ring_size) - i_head;

Hm, it's not clear that this is equivalent to the old computation. Since
it seems repeated in a few places, could it be factored to a little
helper (before this patch) and the "some of this one + all after this
one" comment perhaps expanded to explain what is going on?

Rasmus



More information about the Linux-security-module-archive mailing list