Skip to content

Commit 55d4a38

Browse files
authored
Merge pull request #77 from weibocom/bugfix/connWriteTimeout
fix conn write timeout
2 parents c6d0714 + 8f9d428 commit 55d4a38

File tree

3 files changed

+32
-13
lines changed

3 files changed

+32
-13
lines changed

core/constants.go

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package core
22

3+
import "time"
4+
35
//--------------all global public constants--------------
46
// exception type
57
const (
@@ -49,3 +51,7 @@ const (
4951
NodeTypeReferer = "referer"
5052
NodeTypeAgent = "agent"
5153
)
54+
55+
const (
56+
DefaultWriteTimeout = 5 * time.Second
57+
)

endpoint/motanEndpoint.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bufio"
55
"errors"
66
"fmt"
7-
"io"
87
"net"
98
"sync"
109
"sync/atomic"
@@ -26,6 +25,8 @@ var (
2625
ErrRecvRequestTimeout = fmt.Errorf("Timeout err: receive request timeout")
2726

2827
defaultAsyncResponse = &motan.MotanResponse{Attachment: motan.NewStringMap(motan.DefaultAttachmentSize), RPCContext: &motan.RPCContext{AsyncCall: true}}
28+
29+
panicErr = errors.New("panic error")
2930
)
3031

3132
type MotanEndpoint struct {
@@ -255,7 +256,7 @@ type Channel struct {
255256
address string
256257

257258
// connection
258-
conn io.ReadWriteCloser
259+
conn net.Conn
259260
bufRead *bufio.Reader
260261

261262
// send
@@ -424,7 +425,9 @@ func (c *Channel) IsClosed() bool {
424425
}
425426

426427
func (c *Channel) recv() {
427-
defer motan.HandlePanic(nil)
428+
defer motan.HandlePanic(func() {
429+
c.closeOnErr(panicErr)
430+
})
428431
if err := c.recvLoop(); err != nil {
429432
c.closeOnErr(err)
430433
}
@@ -450,12 +453,15 @@ func (c *Channel) recvLoop() error {
450453
}
451454

452455
func (c *Channel) send() {
453-
defer motan.HandlePanic(nil)
456+
defer motan.HandlePanic(func() {
457+
c.closeOnErr(panicErr)
458+
})
454459
for {
455460
select {
456461
case ready := <-c.sendCh:
457462
if ready.data != nil {
458463
// TODO need async?
464+
c.conn.SetWriteDeadline(time.Now().Add(motan.DefaultWriteTimeout))
459465
sent := 0
460466
for sent < len(ready.data) {
461467
n, err := c.conn.Write(ready.data[sent:])

server/motanserver.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
motan "github.com/weibocom/motan-go/core"
1111
"github.com/weibocom/motan-go/log"
1212
mpro "github.com/weibocom/motan-go/protocol"
13+
"time"
1314
)
1415

1516
type MotanServer struct {
@@ -81,9 +82,8 @@ func (m *MotanServer) run() {
8182
}
8283

8384
func (m *MotanServer) handleConn(conn net.Conn) {
84-
defer motan.HandlePanic(func() {
85-
conn.Close()
86-
})
85+
defer conn.Close()
86+
defer motan.HandlePanic(nil)
8787
buf := bufio.NewReader(conn)
8888
for {
8989
request, err := mpro.Decode(buf)
@@ -100,6 +100,12 @@ func (m *MotanServer) handleConn(conn net.Conn) {
100100
func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
101101
defer motan.HandlePanic(nil)
102102
request.Header.SetProxy(m.proxy)
103+
var ip string
104+
if ta, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
105+
ip = ta.IP.String()
106+
} else {
107+
ip = getRemoteIP(conn.RemoteAddr().String())
108+
}
103109
// TODO request , response reuse
104110
var res *mpro.Message
105111
if request.Header.IsHeartbeat() {
@@ -112,11 +118,7 @@ func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
112118
vlog.Errorf("motan server convert to motan request fail. rid :%d, service: %s, method:%s,err:%s\n", request.Header.RequestID, request.Metadata.LoadOrEmpty(mpro.MPath), request.Metadata.LoadOrEmpty(mpro.MMethod), err.Error())
113119
res = mpro.BuildExceptionResponse(request.Header.RequestID, mpro.ExceptionToJSON(&motan.Exception{ErrCode: 500, ErrMsg: "deserialize fail. err:" + err.Error() + " method:" + request.Metadata.LoadOrEmpty(mpro.MMethod), ErrType: motan.ServiceException}))
114120
} else {
115-
if ta, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
116-
req.SetAttachment(motan.HostKey, ta.IP.String())
117-
} else {
118-
req.SetAttachment(motan.HostKey, getRemoteIP(conn.RemoteAddr().String()))
119-
}
121+
req.SetAttachment(motan.HostKey, ip)
120122
req.GetRPCContext(true).ExtFactory = m.extFactory
121123
mres = m.handler.Call(req)
122124
//TODO oneway
@@ -133,7 +135,12 @@ func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
133135
}
134136
}
135137
resbuf := res.Encode()
136-
conn.Write(resbuf.Bytes())
138+
conn.SetWriteDeadline(time.Now().Add(motan.DefaultWriteTimeout))
139+
_, err := conn.Write(resbuf.Bytes())
140+
if err != nil {
141+
vlog.Errorf("connection will close. ip: %s, cause:%s\n", ip, err.Error())
142+
conn.Close()
143+
}
137144
}
138145

139146
func getRemoteIP(address string) string {

0 commit comments

Comments
 (0)