splice: relay support
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
This commit is contained in:
parent
cf8208d0ea
commit
ebf9909343
1 changed files with 191 additions and 50 deletions
241
kernel/relay.c
241
kernel/relay.c
|
@ -21,6 +21,7 @@
|
||||||
#include <linux/vmalloc.h>
|
#include <linux/vmalloc.h>
|
||||||
#include <linux/mm.h>
|
#include <linux/mm.h>
|
||||||
#include <linux/cpu.h>
|
#include <linux/cpu.h>
|
||||||
|
#include <linux/pipe_fs_i.h>
|
||||||
|
|
||||||
/* list of open channels, for cpu hotplug */
|
/* list of open channels, for cpu hotplug */
|
||||||
static DEFINE_MUTEX(relay_channels_mutex);
|
static DEFINE_MUTEX(relay_channels_mutex);
|
||||||
|
@ -121,6 +122,7 @@ static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
|
||||||
buf->page_array[i] = alloc_page(GFP_KERNEL);
|
buf->page_array[i] = alloc_page(GFP_KERNEL);
|
||||||
if (unlikely(!buf->page_array[i]))
|
if (unlikely(!buf->page_array[i]))
|
||||||
goto depopulate;
|
goto depopulate;
|
||||||
|
set_page_private(buf->page_array[i], (unsigned long)buf);
|
||||||
}
|
}
|
||||||
mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
|
mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
|
||||||
if (!mem)
|
if (!mem)
|
||||||
|
@ -970,43 +972,6 @@ static int subbuf_read_actor(size_t read_start,
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* subbuf_send_actor - send up to one subbuf's worth of data
|
|
||||||
*/
|
|
||||||
static int subbuf_send_actor(size_t read_start,
|
|
||||||
struct rchan_buf *buf,
|
|
||||||
size_t avail,
|
|
||||||
read_descriptor_t *desc,
|
|
||||||
read_actor_t actor)
|
|
||||||
{
|
|
||||||
unsigned long pidx, poff;
|
|
||||||
unsigned int subbuf_pages;
|
|
||||||
int ret = 0;
|
|
||||||
|
|
||||||
subbuf_pages = buf->chan->alloc_size >> PAGE_SHIFT;
|
|
||||||
pidx = (read_start / PAGE_SIZE) % subbuf_pages;
|
|
||||||
poff = read_start & ~PAGE_MASK;
|
|
||||||
while (avail) {
|
|
||||||
struct page *p = buf->page_array[pidx];
|
|
||||||
unsigned int len;
|
|
||||||
|
|
||||||
len = PAGE_SIZE - poff;
|
|
||||||
if (len > avail)
|
|
||||||
len = avail;
|
|
||||||
|
|
||||||
len = actor(desc, p, poff, len);
|
|
||||||
if (desc->error)
|
|
||||||
break;
|
|
||||||
|
|
||||||
avail -= len;
|
|
||||||
ret += len;
|
|
||||||
poff = 0;
|
|
||||||
pidx = (pidx + 1) % subbuf_pages;
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef int (*subbuf_actor_t) (size_t read_start,
|
typedef int (*subbuf_actor_t) (size_t read_start,
|
||||||
struct rchan_buf *buf,
|
struct rchan_buf *buf,
|
||||||
size_t avail,
|
size_t avail,
|
||||||
|
@ -1067,19 +1032,195 @@ static ssize_t relay_file_read(struct file *filp,
|
||||||
NULL, &desc);
|
NULL, &desc);
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t relay_file_sendfile(struct file *filp,
|
static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
|
||||||
loff_t *ppos,
|
struct pipe_buffer *buf)
|
||||||
size_t count,
|
|
||||||
read_actor_t actor,
|
|
||||||
void *target)
|
|
||||||
{
|
{
|
||||||
read_descriptor_t desc;
|
struct rchan_buf *rbuf;
|
||||||
desc.written = 0;
|
|
||||||
desc.count = count;
|
rbuf = (struct rchan_buf *)page_private(buf->page);
|
||||||
desc.arg.data = target;
|
|
||||||
desc.error = 0;
|
rbuf->bytes_consumed += PAGE_SIZE;
|
||||||
return relay_file_read_subbufs(filp, ppos, subbuf_send_actor,
|
|
||||||
actor, &desc);
|
if (rbuf->bytes_consumed == rbuf->chan->subbuf_size) {
|
||||||
|
relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
|
||||||
|
rbuf->bytes_consumed = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct pipe_buf_operations relay_pipe_buf_ops = {
|
||||||
|
.can_merge = 0,
|
||||||
|
.map = generic_pipe_buf_map,
|
||||||
|
.unmap = generic_pipe_buf_unmap,
|
||||||
|
.pin = generic_pipe_buf_pin,
|
||||||
|
.release = relay_pipe_buf_release,
|
||||||
|
.steal = generic_pipe_buf_steal,
|
||||||
|
.get = generic_pipe_buf_get,
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* subbuf_splice_actor - splice up to one subbuf's worth of data
|
||||||
|
*/
|
||||||
|
static int subbuf_splice_actor(struct file *in,
|
||||||
|
loff_t *ppos,
|
||||||
|
struct pipe_inode_info *pipe,
|
||||||
|
size_t len,
|
||||||
|
unsigned int flags,
|
||||||
|
int *nonpad_ret)
|
||||||
|
{
|
||||||
|
unsigned int pidx, poff;
|
||||||
|
unsigned int subbuf_pages;
|
||||||
|
int ret = 0;
|
||||||
|
int do_wakeup = 0;
|
||||||
|
struct rchan_buf *rbuf = in->private_data;
|
||||||
|
unsigned int subbuf_size = rbuf->chan->subbuf_size;
|
||||||
|
size_t read_start = ((size_t)*ppos) % rbuf->chan->alloc_size;
|
||||||
|
size_t avail = subbuf_size - read_start % subbuf_size;
|
||||||
|
size_t read_subbuf = read_start / subbuf_size;
|
||||||
|
size_t padding = rbuf->padding[read_subbuf];
|
||||||
|
size_t nonpad_end = read_subbuf * subbuf_size + subbuf_size - padding;
|
||||||
|
|
||||||
|
if (rbuf->subbufs_produced == rbuf->subbufs_consumed)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (len > avail)
|
||||||
|
len = avail;
|
||||||
|
|
||||||
|
if (pipe->inode)
|
||||||
|
mutex_lock(&pipe->inode->i_mutex);
|
||||||
|
|
||||||
|
subbuf_pages = rbuf->chan->alloc_size >> PAGE_SHIFT;
|
||||||
|
pidx = (read_start / PAGE_SIZE) % subbuf_pages;
|
||||||
|
poff = read_start & ~PAGE_MASK;
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
unsigned int this_len;
|
||||||
|
unsigned int this_end;
|
||||||
|
int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
|
||||||
|
struct pipe_buffer *buf = pipe->bufs + newbuf;
|
||||||
|
|
||||||
|
if (!pipe->readers) {
|
||||||
|
send_sig(SIGPIPE, current, 0);
|
||||||
|
if (!ret)
|
||||||
|
ret = -EPIPE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pipe->nrbufs < PIPE_BUFFERS) {
|
||||||
|
this_len = PAGE_SIZE - poff;
|
||||||
|
if (this_len > avail)
|
||||||
|
this_len = avail;
|
||||||
|
|
||||||
|
buf->page = rbuf->page_array[pidx];
|
||||||
|
buf->offset = poff;
|
||||||
|
this_end = read_start + ret + this_len;
|
||||||
|
if (this_end > nonpad_end) {
|
||||||
|
if (read_start + ret >= nonpad_end)
|
||||||
|
buf->len = 0;
|
||||||
|
else
|
||||||
|
buf->len = nonpad_end - (read_start + ret);
|
||||||
|
} else
|
||||||
|
buf->len = this_len;
|
||||||
|
|
||||||
|
*nonpad_ret += buf->len;
|
||||||
|
|
||||||
|
buf->ops = &relay_pipe_buf_ops;
|
||||||
|
pipe->nrbufs++;
|
||||||
|
|
||||||
|
avail -= this_len;
|
||||||
|
ret += this_len;
|
||||||
|
poff = 0;
|
||||||
|
pidx = (pidx + 1) % subbuf_pages;
|
||||||
|
|
||||||
|
if (pipe->inode)
|
||||||
|
do_wakeup = 1;
|
||||||
|
|
||||||
|
if (!avail)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (pipe->nrbufs < PIPE_BUFFERS)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flags & SPLICE_F_NONBLOCK) {
|
||||||
|
if (!ret)
|
||||||
|
ret = -EAGAIN;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (signal_pending(current)) {
|
||||||
|
if (!ret)
|
||||||
|
ret = -ERESTARTSYS;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (do_wakeup) {
|
||||||
|
smp_mb();
|
||||||
|
if (waitqueue_active(&pipe->wait))
|
||||||
|
wake_up_interruptible_sync(&pipe->wait);
|
||||||
|
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
|
||||||
|
do_wakeup = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pipe->waiting_writers++;
|
||||||
|
pipe_wait(pipe);
|
||||||
|
pipe->waiting_writers--;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pipe->inode)
|
||||||
|
mutex_unlock(&pipe->inode->i_mutex);
|
||||||
|
|
||||||
|
if (do_wakeup) {
|
||||||
|
smp_mb();
|
||||||
|
if (waitqueue_active(&pipe->wait))
|
||||||
|
wake_up_interruptible(&pipe->wait);
|
||||||
|
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ssize_t relay_file_splice_read(struct file *in,
|
||||||
|
loff_t *ppos,
|
||||||
|
struct pipe_inode_info *pipe,
|
||||||
|
size_t len,
|
||||||
|
unsigned int flags)
|
||||||
|
{
|
||||||
|
ssize_t spliced;
|
||||||
|
int ret;
|
||||||
|
int nonpad_ret = 0;
|
||||||
|
|
||||||
|
ret = 0;
|
||||||
|
spliced = 0;
|
||||||
|
|
||||||
|
while (len) {
|
||||||
|
ret = subbuf_splice_actor(in, ppos, pipe, len, flags, &nonpad_ret);
|
||||||
|
if (ret < 0)
|
||||||
|
break;
|
||||||
|
else if (!ret) {
|
||||||
|
break;
|
||||||
|
if (spliced)
|
||||||
|
break;
|
||||||
|
if (flags & SPLICE_F_NONBLOCK) {
|
||||||
|
ret = -EAGAIN;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppos += ret;
|
||||||
|
if (ret > len)
|
||||||
|
len = 0;
|
||||||
|
else
|
||||||
|
len -= ret;
|
||||||
|
spliced += nonpad_ret;
|
||||||
|
nonpad_ret = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (spliced)
|
||||||
|
return spliced;
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
const struct file_operations relay_file_operations = {
|
const struct file_operations relay_file_operations = {
|
||||||
|
@ -1089,7 +1230,7 @@ const struct file_operations relay_file_operations = {
|
||||||
.read = relay_file_read,
|
.read = relay_file_read,
|
||||||
.llseek = no_llseek,
|
.llseek = no_llseek,
|
||||||
.release = relay_file_release,
|
.release = relay_file_release,
|
||||||
.sendfile = relay_file_sendfile,
|
.splice_read = relay_file_splice_read,
|
||||||
};
|
};
|
||||||
EXPORT_SYMBOL_GPL(relay_file_operations);
|
EXPORT_SYMBOL_GPL(relay_file_operations);
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue