Skip to content

Commit 626063b

Browse files
committed
Eliminate busy loop when receiving messages from non-blocking socket
For a non-blocking socket, it should wait for events first before receiving messages from the socket, otherwise it would receive empty message and run into a busy loop. Signed-off-by: Quan Tian <[email protected]>
1 parent 856e190 commit 626063b

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

nl/nl_linux.go

+24-3
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,10 @@ type NetlinkSocket struct {
632632
fd int32
633633
lsa unix.SockaddrNetlink
634634
sync.Mutex
635+
636+
// pfd is non nil when the socket is in non-blocking mode, and is used to wait for events on the socket.
637+
pfd *unix.PollFd
638+
pollTimeout int64
635639
}
636640

637641
func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
@@ -728,17 +732,22 @@ func Subscribe(protocol int, groups ...uint) (*NetlinkSocket, error) {
728732
return nil, err
729733
}
730734

735+
var pfd *unix.PollFd
731736
// Sometimes (socket_linux.go:SocketGet), Subscribe is used to create a socket
732-
// that subscirbed to no groups. So we don't need to set nonblock there.
737+
// that subscribes to no groups. So we don't need to set nonblock there.
733738
if len(groups) > 0 {
734739
if err := unix.SetNonblock(fd, true); err != nil {
735740
unix.Close(fd)
736741
return nil, err
737742
}
743+
pfd = &unix.PollFd{Fd: int32(fd), Events: unix.POLLIN}
738744
}
739745

740746
s := &NetlinkSocket{
741-
fd: int32(fd),
747+
fd: int32(fd),
748+
pfd: pfd,
749+
// poll blocks infinitely by default.
750+
pollTimeout: -1,
742751
}
743752
s.lsa.Family = unix.AF_NETLINK
744753

@@ -791,6 +800,13 @@ func (s *NetlinkSocket) Receive() ([]syscall.NetlinkMessage, *unix.SockaddrNetli
791800
if fd < 0 {
792801
return nil, nil, fmt.Errorf("Receive called on a closed socket")
793802
}
803+
// The socket is in non-blocking mode.
804+
if s.pfd != nil {
805+
if _, err := unix.Poll([]unix.PollFd{*s.pfd}, int(atomic.LoadInt64(&s.pollTimeout))); err != nil {
806+
return nil, nil, fmt.Errorf("Error polling the socket: %w", err)
807+
}
808+
}
809+
794810
var fromAddr *unix.SockaddrNetlink
795811
var rb [RECEIVE_BUFFER_SIZE]byte
796812
nr, from, err := unix.Recvfrom(fd, rb[:], 0)
@@ -825,7 +841,12 @@ func (s *NetlinkSocket) SetSendTimeout(timeout *unix.Timeval) error {
825841
func (s *NetlinkSocket) SetReceiveTimeout(timeout *unix.Timeval) error {
826842
// Set a read timeout of SOCKET_READ_TIMEOUT, this will allow the Read to periodically unblock and avoid that a routine
827843
// remains stuck on a recvmsg on a closed fd
828-
return unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout)
844+
if err := unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout); err != nil {
845+
return err
846+
}
847+
// Set poll timeout to the same value to allow it to unblock upon timeout.
848+
atomic.StoreInt64(&s.pollTimeout, timeout.Sec*1000+timeout.Usec/1000)
849+
return nil
829850
}
830851

831852
// SetReceiveBufferSize allows to set a receive buffer size on the socket

nl/nl_linux_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestIfSocketCloses(t *testing.T) {
7575
for {
7676
_, _, err := sk.Receive()
7777
// Receive returned because of a timeout and the FD == -1 means that the socket got closed
78-
if err == unix.EAGAIN && nlSock.GetFd() == -1 {
78+
if nlSock.GetFd() == -1 {
7979
endCh <- err
8080
return
8181
}

0 commit comments

Comments
 (0)