diff options
| author | Jakub Kicinski <kuba@kernel.org> | 2026-02-02 18:15:35 -0800 |
|---|---|---|
| committer | Jakub Kicinski <kuba@kernel.org> | 2026-02-02 18:15:36 -0800 |
| commit | 84b86025f6d7844a208c53702c31b1d41aafe2c4 (patch) | |
| tree | d3618fcd649dc40a3c8b0981e754ab7b2c9945b7 | |
| parent | 74ad1dfe2335df7c1fd36139261c2fba6bcaea40 (diff) | |
| parent | 2f2dc84645fb25960a0f52aff4d754fce43edea4 (diff) | |
Merge branch 'mptcp-implement-read_sock-and-splice_read'
Matthieu Baerts says:
====================
mptcp: implement .read_sock and .splice_read
This series is a preparation work for future in-kernel MPTCP sockets
usage. Here, two interfaces are implemented: read_sock and splice_read.
As a result of this series, splice() with MPTCP sockets -- which was
already supported -- is now improved.
- Patches 1-2: .read_sock implementation
- Patches 3-4: .splice_read implementation
- Patches 5-6: validate splice() support with MPTCP sockets.
====================
Link: https://patch.msgid.link/20260130-net-next-mptcp-splice-v2-0-31332ba70d7f@kernel.org
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
| -rw-r--r-- | include/net/tcp.h | 11 | ||||
| -rw-r--r-- | net/ipv4/tcp.c | 13 | ||||
| -rw-r--r-- | net/mptcp/protocol.c | 218 | ||||
| -rw-r--r-- | tools/testing/selftests/net/mptcp/Makefile | 1 | ||||
| -rw-r--r-- | tools/testing/selftests/net/mptcp/mptcp_connect.c | 79 | ||||
| -rwxr-xr-x | tools/testing/selftests/net/mptcp/mptcp_connect_splice.sh | 5 |
6 files changed, 308 insertions, 19 deletions
diff --git a/include/net/tcp.h b/include/net/tcp.h index f1cf9e6730c8..cecec1a92d5e 100644 --- a/include/net/tcp.h +++ b/include/net/tcp.h @@ -347,6 +347,15 @@ extern struct proto tcp_prot; #define TCP_DEC_STATS(net, field) SNMP_DEC_STATS((net)->mib.tcp_statistics, field) #define TCP_ADD_STATS(net, field, val) SNMP_ADD_STATS((net)->mib.tcp_statistics, field, val) +/* + * TCP splice context + */ +struct tcp_splice_state { + struct pipe_inode_info *pipe; + size_t len; + unsigned int flags; +}; + void tcp_tsq_work_init(void); int tcp_v4_err(struct sk_buff *skb, u32); @@ -378,6 +387,8 @@ void tcp_rcv_space_adjust(struct sock *sk); int tcp_twsk_unique(struct sock *sk, struct sock *sktw, void *twp); void tcp_twsk_destructor(struct sock *sk); void tcp_twsk_purge(struct list_head *net_exit_list); +int tcp_splice_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, + unsigned int offset, size_t len); ssize_t tcp_splice_read(struct socket *sk, loff_t *ppos, struct pipe_inode_info *pipe, size_t len, unsigned int flags); diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c index e4009158b908..6e94c5859f4b 100644 --- a/net/ipv4/tcp.c +++ b/net/ipv4/tcp.c @@ -319,15 +319,6 @@ struct percpu_counter tcp_sockets_allocated ____cacheline_aligned_in_smp; EXPORT_IPV6_MOD(tcp_sockets_allocated); /* - * TCP splice context - */ -struct tcp_splice_state { - struct pipe_inode_info *pipe; - size_t len; - unsigned int flags; -}; - -/* * Pressure flag: try to collapse. * Technical note: it is used by multiple contexts non atomically. * All the __sk_mem_schedule() is of this nature: accounting @@ -791,8 +782,8 @@ void tcp_push(struct sock *sk, int flags, int mss_now, __tcp_push_pending_frames(sk, mss_now, nonagle); } -static int tcp_splice_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, - unsigned int offset, size_t len) +int tcp_splice_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, + unsigned int offset, size_t len) { struct tcp_splice_state *tss = rd_desc->arg.data; int ret; diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c index 8d3233667418..9b8c51937eb2 100644 --- a/net/mptcp/protocol.c +++ b/net/mptcp/protocol.c @@ -1995,6 +1995,17 @@ do_error: static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied); +static void mptcp_eat_recv_skb(struct sock *sk, struct sk_buff *skb) +{ + /* avoid the indirect call, we know the destructor is sock_rfree */ + skb->destructor = NULL; + skb->sk = NULL; + atomic_sub(skb->truesize, &sk->sk_rmem_alloc); + sk_mem_uncharge(sk, skb->truesize); + __skb_unlink(skb, &sk->sk_receive_queue); + skb_attempt_defer_free(skb); +} + static int __mptcp_recvmsg_mskq(struct sock *sk, struct msghdr *msg, size_t len, int flags, int copied_total, struct scm_timestamping_internal *tss, @@ -2049,13 +2060,7 @@ static int __mptcp_recvmsg_mskq(struct sock *sk, struct msghdr *msg, break; } - /* avoid the indirect call, we know the destructor is sock_rfree */ - skb->destructor = NULL; - skb->sk = NULL; - atomic_sub(skb->truesize, &sk->sk_rmem_alloc); - sk_mem_uncharge(sk, skb->truesize); - __skb_unlink(skb, &sk->sk_receive_queue); - skb_attempt_defer_free(skb); + mptcp_eat_recv_skb(sk, skb); } if (copied >= len) @@ -4312,6 +4317,201 @@ static __poll_t mptcp_poll(struct file *file, struct socket *sock, return mask; } +static struct sk_buff *mptcp_recv_skb(struct sock *sk, u32 *off) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + struct sk_buff *skb; + u32 offset; + + if (!list_empty(&msk->backlog_list)) + mptcp_move_skbs(sk); + + while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) { + offset = MPTCP_SKB_CB(skb)->offset; + if (offset < skb->len) { + *off = offset; + return skb; + } + mptcp_eat_recv_skb(sk, skb); + } + return NULL; +} + +/* + * Note: + * - It is assumed that the socket was locked by the caller. + */ +static int __mptcp_read_sock(struct sock *sk, read_descriptor_t *desc, + sk_read_actor_t recv_actor, bool noack) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + struct sk_buff *skb; + int copied = 0; + u32 offset; + + msk_owned_by_me(msk); + + if (sk->sk_state == TCP_LISTEN) + return -ENOTCONN; + while ((skb = mptcp_recv_skb(sk, &offset)) != NULL) { + u32 data_len = skb->len - offset; + int count; + u32 size; + + size = min_t(size_t, data_len, INT_MAX); + count = recv_actor(desc, skb, offset, size); + if (count <= 0) { + if (!copied) + copied = count; + break; + } + + copied += count; + + msk->bytes_consumed += count; + if (count < data_len) { + MPTCP_SKB_CB(skb)->offset += count; + MPTCP_SKB_CB(skb)->map_seq += count; + break; + } + + mptcp_eat_recv_skb(sk, skb); + } + + if (noack) + goto out; + + mptcp_rcv_space_adjust(msk, copied); + + if (copied > 0) { + mptcp_recv_skb(sk, &offset); + mptcp_cleanup_rbuf(msk, copied); + } +out: + return copied; +} + +static int mptcp_read_sock(struct sock *sk, read_descriptor_t *desc, + sk_read_actor_t recv_actor) +{ + return __mptcp_read_sock(sk, desc, recv_actor, false); +} + +static int __mptcp_splice_read(struct sock *sk, struct tcp_splice_state *tss) +{ + /* Store TCP splice context information in read_descriptor_t. */ + read_descriptor_t rd_desc = { + .arg.data = tss, + .count = tss->len, + }; + + return mptcp_read_sock(sk, &rd_desc, tcp_splice_data_recv); +} + +/** + * mptcp_splice_read - splice data from MPTCP socket to a pipe + * @sock: socket to splice from + * @ppos: position (not valid) + * @pipe: pipe to splice to + * @len: number of bytes to splice + * @flags: splice modifier flags + * + * Description: + * Will read pages from given socket and fill them into a pipe. + * + * Return: + * Amount of bytes that have been spliced. + * + **/ +static ssize_t mptcp_splice_read(struct socket *sock, loff_t *ppos, + struct pipe_inode_info *pipe, size_t len, + unsigned int flags) +{ + struct tcp_splice_state tss = { + .pipe = pipe, + .len = len, + .flags = flags, + }; + struct sock *sk = sock->sk; + ssize_t spliced = 0; + int ret = 0; + long timeo; + + /* + * We can't seek on a socket input + */ + if (unlikely(*ppos)) + return -ESPIPE; + + lock_sock(sk); + + mptcp_rps_record_subflows(mptcp_sk(sk)); + + timeo = sock_rcvtimeo(sk, sock->file->f_flags & O_NONBLOCK); + while (tss.len) { + ret = __mptcp_splice_read(sk, &tss); + if (ret < 0) { + break; + } else if (!ret) { + if (spliced) + break; + if (sock_flag(sk, SOCK_DONE)) + break; + if (sk->sk_err) { + ret = sock_error(sk); + break; + } + if (sk->sk_shutdown & RCV_SHUTDOWN) + break; + if (sk->sk_state == TCP_CLOSE) { + /* + * This occurs when user tries to read + * from never connected socket. + */ + ret = -ENOTCONN; + break; + } + if (!timeo) { + ret = -EAGAIN; + break; + } + /* if __mptcp_splice_read() got nothing while we have + * an skb in receive queue, we do not want to loop. + * This might happen with URG data. + */ + if (!skb_queue_empty(&sk->sk_receive_queue)) + break; + ret = sk_wait_data(sk, &timeo, NULL); + if (ret < 0) + break; + if (signal_pending(current)) { + ret = sock_intr_errno(timeo); + break; + } + continue; + } + tss.len -= ret; + spliced += ret; + + if (!tss.len || !timeo) + break; + release_sock(sk); + lock_sock(sk); + + if (sk->sk_err || sk->sk_state == TCP_CLOSE || + (sk->sk_shutdown & RCV_SHUTDOWN) || + signal_pending(current)) + break; + } + + release_sock(sk); + + if (spliced) + return spliced; + + return ret; +} + static const struct proto_ops mptcp_stream_ops = { .family = PF_INET, .owner = THIS_MODULE, @@ -4332,6 +4532,8 @@ static const struct proto_ops mptcp_stream_ops = { .recvmsg = inet_recvmsg, .mmap = sock_no_mmap, .set_rcvlowat = mptcp_set_rcvlowat, + .read_sock = mptcp_read_sock, + .splice_read = mptcp_splice_read, }; static struct inet_protosw mptcp_protosw = { @@ -4436,6 +4638,8 @@ static const struct proto_ops mptcp_v6_stream_ops = { .compat_ioctl = inet6_compat_ioctl, #endif .set_rcvlowat = mptcp_set_rcvlowat, + .read_sock = mptcp_read_sock, + .splice_read = mptcp_splice_read, }; static struct proto mptcp_v6_prot; diff --git a/tools/testing/selftests/net/mptcp/Makefile b/tools/testing/selftests/net/mptcp/Makefile index 4dd6278cd3dd..22ba0da2adb8 100644 --- a/tools/testing/selftests/net/mptcp/Makefile +++ b/tools/testing/selftests/net/mptcp/Makefile @@ -11,6 +11,7 @@ TEST_PROGS := \ mptcp_connect_checksum.sh \ mptcp_connect_mmap.sh \ mptcp_connect_sendfile.sh \ + mptcp_connect_splice.sh \ mptcp_join.sh \ mptcp_sockopt.sh \ pm_netlink.sh \ diff --git a/tools/testing/selftests/net/mptcp/mptcp_connect.c b/tools/testing/selftests/net/mptcp/mptcp_connect.c index 10f6f99cfd4e..a74b13e42ecd 100644 --- a/tools/testing/selftests/net/mptcp/mptcp_connect.c +++ b/tools/testing/selftests/net/mptcp/mptcp_connect.c @@ -52,6 +52,7 @@ enum cfg_mode { CFG_MODE_POLL, CFG_MODE_MMAP, CFG_MODE_SENDFILE, + CFG_MODE_SPLICE, }; enum cfg_peek { @@ -124,7 +125,7 @@ static void die_usage(void) fprintf(stderr, "\t-j -- add additional sleep at connection start and tear down " "-- for MPJ tests\n"); fprintf(stderr, "\t-l -- listens mode, accepts incoming connection\n"); - fprintf(stderr, "\t-m [poll|mmap|sendfile] -- use poll(default)/mmap+write/sendfile\n"); + fprintf(stderr, "\t-m [poll|mmap|sendfile|splice] -- use poll(default)/mmap+write/sendfile/splice\n"); fprintf(stderr, "\t-M mark -- set socket packet mark\n"); fprintf(stderr, "\t-o option -- test sockopt <option>\n"); fprintf(stderr, "\t-p num -- use port num\n"); @@ -935,6 +936,71 @@ static int copyfd_io_sendfile(int infd, int peerfd, int outfd, return err; } +static int do_splice(const int infd, const int outfd, const size_t len, + struct wstate *winfo) +{ + ssize_t in_bytes, out_bytes; + int pipefd[2]; + int err; + + err = pipe(pipefd); + if (err) { + perror("pipe"); + return 2; + } + +again: + in_bytes = splice(infd, NULL, pipefd[1], NULL, len - winfo->total_len, + SPLICE_F_MOVE | SPLICE_F_MORE); + if (in_bytes < 0) { + perror("splice in"); + err = 3; + } else if (in_bytes > 0) { + out_bytes = splice(pipefd[0], NULL, outfd, NULL, in_bytes, + SPLICE_F_MOVE | SPLICE_F_MORE); + if (out_bytes < 0) { + perror("splice out"); + err = 4; + } else if (in_bytes != out_bytes) { + fprintf(stderr, "Unexpected transfer: %zu vs %zu\n", + in_bytes, out_bytes); + err = 5; + } else { + goto again; + } + } + + close(pipefd[0]); + close(pipefd[1]); + + return err; +} + +static int copyfd_io_splice(int infd, int peerfd, int outfd, unsigned int size, + bool *in_closed_after_out, struct wstate *winfo) +{ + int err; + + if (listen_mode) { + err = do_splice(peerfd, outfd, size, winfo); + if (err) + return err; + + err = do_splice(infd, peerfd, size, winfo); + } else { + err = do_splice(infd, peerfd, size, winfo); + if (err) + return err; + + shut_wr(peerfd); + + err = do_splice(peerfd, outfd, size, winfo); + *in_closed_after_out = true; + } + + return err; +} + static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd, struct wstate *winfo) { bool in_closed_after_out = false; @@ -967,6 +1033,14 @@ static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd, struct &in_closed_after_out, winfo); break; + case CFG_MODE_SPLICE: + file_size = get_infd_size(infd); + if (file_size < 0) + return file_size; + ret = copyfd_io_splice(infd, peerfd, outfd, file_size, + &in_closed_after_out, winfo); + break; + default: fprintf(stderr, "Invalid mode %d\n", cfg_mode); @@ -1380,12 +1454,15 @@ int parse_mode(const char *mode) return CFG_MODE_MMAP; if (!strcasecmp(mode, "sendfile")) return CFG_MODE_SENDFILE; + if (!strcasecmp(mode, "splice")) + return CFG_MODE_SPLICE; fprintf(stderr, "Unknown test mode: %s\n", mode); fprintf(stderr, "Supported modes are:\n"); fprintf(stderr, "\t\t\"poll\" - interleaved read/write using poll()\n"); fprintf(stderr, "\t\t\"mmap\" - send entire input file (mmap+write), then read response (-l will read input first)\n"); fprintf(stderr, "\t\t\"sendfile\" - send entire input file (sendfile), then read response (-l will read input first)\n"); + fprintf(stderr, "\t\t\"splice\" - send entire input file (splice), then read response (-l will read input first)\n"); die_usage(); diff --git a/tools/testing/selftests/net/mptcp/mptcp_connect_splice.sh b/tools/testing/selftests/net/mptcp/mptcp_connect_splice.sh new file mode 100755 index 000000000000..241254a966c9 --- /dev/null +++ b/tools/testing/selftests/net/mptcp/mptcp_connect_splice.sh @@ -0,0 +1,5 @@ +#!/bin/bash +# SPDX-License-Identifier: GPL-2.0 + +MPTCP_LIB_KSFT_TEST="$(basename "${0}" .sh)" \ + "$(dirname "${0}")/mptcp_connect.sh" -m splice "${@}" |
