Skip to content

Commit 5a5ec6d

Browse files
authored
Merge pull request #81 from weibocom/dev
Dev
2 parents 2f82b3f + 55d4a38 commit 5a5ec6d

File tree

6 files changed

+49
-31
lines changed

6 files changed

+49
-31
lines changed

agent.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -242,16 +242,19 @@ type agentMessageHandler struct {
242242
}
243243

244244
func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
245-
if request.GetAttachment(mpro.MSource) == "" {
246-
application := a.agent.agentURL.GetParam(motan.ApplicationKey, "")
247-
request.SetAttachment(mpro.MSource, application)
248-
}
249245
version := "0.1"
250246
if request.GetAttachment(mpro.MVersion) != "" {
251247
version = request.GetAttachment(mpro.MVersion)
252248
}
253249
ck := getClusterKey(request.GetAttachment(mpro.MGroup), version, request.GetAttachment(mpro.MProxyProtocol), request.GetAttachment(mpro.MPath))
254250
if motanCluster := a.agent.clustermap[ck]; motanCluster != nil {
251+
if request.GetAttachment(mpro.MSource) == "" {
252+
application := motanCluster.GetURL().GetParam(motan.ApplicationKey, "")
253+
if application == "" {
254+
application = a.agent.agentURL.GetParam(motan.ApplicationKey, "")
255+
}
256+
request.SetAttachment(mpro.MSource, application)
257+
}
255258
res = motanCluster.Call(request)
256259
if res == nil {
257260
vlog.Warningf("motanCluster Call return nil. cluster:%s\n", ck)

client.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"fmt"
77
"sync"
88

9-
cluster "github.com/weibocom/motan-go/cluster"
9+
"github.com/weibocom/motan-go/cluster"
1010
motan "github.com/weibocom/motan-go/core"
1111
mpro "github.com/weibocom/motan-go/protocol"
1212
)
@@ -75,19 +75,15 @@ func (c *Client) BaseGo(req motan.Request, reply interface{}, done chan *motan.A
7575
func (c *Client) BuildRequest(method string, args []interface{}) motan.Request {
7676
req := &motan.MotanRequest{Method: method, ServiceName: c.url.Path, Arguments: args, Attachment: motan.NewStringMap(motan.DefaultAttachmentSize)}
7777
version := c.url.GetParam(motan.VersionKey, "")
78-
if version != "" {
79-
req.SetAttachment(mpro.MVersion, version)
80-
}
78+
req.SetAttachment(mpro.MVersion, version)
8179
module := c.url.GetParam(motan.ModuleKey, "")
82-
if module != "" {
83-
req.SetAttachment(mpro.MModule, module)
84-
}
80+
req.SetAttachment(mpro.MModule, module)
8581
application := c.url.GetParam(motan.ApplicationKey, "")
86-
if application != "" {
87-
req.SetAttachment(mpro.MSource, application)
82+
if application == "" {
83+
application = c.cluster.Context.ClientURL.GetParam(motan.ApplicationKey, "")
8884
}
85+
req.SetAttachment(mpro.MSource, application)
8986
req.SetAttachment(mpro.MGroup, c.url.Group)
90-
9187
return req
9288
}
9389

core/constants.go

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package core
22

3+
import "time"
4+
35
//--------------all global public constants--------------
46
// exception type
57
const (
@@ -49,3 +51,7 @@ const (
4951
NodeTypeReferer = "referer"
5052
NodeTypeAgent = "agent"
5153
)
54+
55+
const (
56+
DefaultWriteTimeout = 5 * time.Second
57+
)

core/globalContext.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type Context struct {
4747
Config *cfg.Config
4848
RegistryURLs map[string]*URL
4949
RefersURLs map[string]*URL
50-
BasicRefers map[string]*URL
50+
BasicReferURLs map[string]*URL
5151
ServiceURLs map[string]*URL
5252
BasicServiceURLs map[string]*URL
5353
AgentURL *URL
@@ -118,7 +118,7 @@ func confToURL(urlInfo map[interface{}]interface{}) *URL {
118118
func (c *Context) Initialize() {
119119
c.RegistryURLs = make(map[string]*URL)
120120
c.RefersURLs = make(map[string]*URL)
121-
c.BasicRefers = make(map[string]*URL)
121+
c.BasicReferURLs = make(map[string]*URL)
122122
c.ServiceURLs = make(map[string]*URL)
123123
c.BasicServiceURLs = make(map[string]*URL)
124124
if c.ConfigFile == "" { // use flag as default config file name
@@ -277,7 +277,7 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL {
277277
basicURLs = c.BasicServiceURLs
278278
basicKey = basicServiceKey
279279
} else if section == refersSection {
280-
basicURLs = c.BasicRefers
280+
basicURLs = c.BasicReferURLs
281281
basicKey = basicReferKey
282282
}
283283
for key, url := range urls {
@@ -319,7 +319,7 @@ func (c *Context) parseRefers() {
319319
}
320320

321321
func (c *Context) parseBasicRefers() {
322-
c.BasicRefers = c.confToURLs(basicRefersSection)
322+
c.BasicReferURLs = c.confToURLs(basicRefersSection)
323323
}
324324

325325
func (c *Context) parseServices() {

endpoint/motanEndpoint.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bufio"
55
"errors"
66
"fmt"
7-
"io"
87
"net"
98
"sync"
109
"sync/atomic"
@@ -26,6 +25,8 @@ var (
2625
ErrRecvRequestTimeout = fmt.Errorf("Timeout err: receive request timeout")
2726

2827
defaultAsyncResponse = &motan.MotanResponse{Attachment: motan.NewStringMap(motan.DefaultAttachmentSize), RPCContext: &motan.RPCContext{AsyncCall: true}}
28+
29+
panicErr = errors.New("panic error")
2930
)
3031

3132
type MotanEndpoint struct {
@@ -255,7 +256,7 @@ type Channel struct {
255256
address string
256257

257258
// connection
258-
conn io.ReadWriteCloser
259+
conn net.Conn
259260
bufRead *bufio.Reader
260261

261262
// send
@@ -424,7 +425,9 @@ func (c *Channel) IsClosed() bool {
424425
}
425426

426427
func (c *Channel) recv() {
427-
defer motan.HandlePanic(nil)
428+
defer motan.HandlePanic(func() {
429+
c.closeOnErr(panicErr)
430+
})
428431
if err := c.recvLoop(); err != nil {
429432
c.closeOnErr(err)
430433
}
@@ -450,12 +453,15 @@ func (c *Channel) recvLoop() error {
450453
}
451454

452455
func (c *Channel) send() {
453-
defer motan.HandlePanic(nil)
456+
defer motan.HandlePanic(func() {
457+
c.closeOnErr(panicErr)
458+
})
454459
for {
455460
select {
456461
case ready := <-c.sendCh:
457462
if ready.data != nil {
458463
// TODO need async?
464+
c.conn.SetWriteDeadline(time.Now().Add(motan.DefaultWriteTimeout))
459465
sent := 0
460466
for sent < len(ready.data) {
461467
n, err := c.conn.Write(ready.data[sent:])

server/motanserver.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
motan "github.com/weibocom/motan-go/core"
1111
"github.com/weibocom/motan-go/log"
1212
mpro "github.com/weibocom/motan-go/protocol"
13+
"time"
1314
)
1415

1516
type MotanServer struct {
@@ -81,9 +82,8 @@ func (m *MotanServer) run() {
8182
}
8283

8384
func (m *MotanServer) handleConn(conn net.Conn) {
84-
defer motan.HandlePanic(func() {
85-
conn.Close()
86-
})
85+
defer conn.Close()
86+
defer motan.HandlePanic(nil)
8787
buf := bufio.NewReader(conn)
8888
for {
8989
request, err := mpro.Decode(buf)
@@ -100,6 +100,12 @@ func (m *MotanServer) handleConn(conn net.Conn) {
100100
func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
101101
defer motan.HandlePanic(nil)
102102
request.Header.SetProxy(m.proxy)
103+
var ip string
104+
if ta, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
105+
ip = ta.IP.String()
106+
} else {
107+
ip = getRemoteIP(conn.RemoteAddr().String())
108+
}
103109
// TODO request , response reuse
104110
var res *mpro.Message
105111
if request.Header.IsHeartbeat() {
@@ -112,11 +118,7 @@ func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
112118
vlog.Errorf("motan server convert to motan request fail. rid :%d, service: %s, method:%s,err:%s\n", request.Header.RequestID, request.Metadata.LoadOrEmpty(mpro.MPath), request.Metadata.LoadOrEmpty(mpro.MMethod), err.Error())
113119
res = mpro.BuildExceptionResponse(request.Header.RequestID, mpro.ExceptionToJSON(&motan.Exception{ErrCode: 500, ErrMsg: "deserialize fail. err:" + err.Error() + " method:" + request.Metadata.LoadOrEmpty(mpro.MMethod), ErrType: motan.ServiceException}))
114120
} else {
115-
if ta, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
116-
req.SetAttachment(motan.HostKey, ta.IP.String())
117-
} else {
118-
req.SetAttachment(motan.HostKey, getRemoteIP(conn.RemoteAddr().String()))
119-
}
121+
req.SetAttachment(motan.HostKey, ip)
120122
req.GetRPCContext(true).ExtFactory = m.extFactory
121123
mres = m.handler.Call(req)
122124
//TODO oneway
@@ -133,7 +135,12 @@ func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
133135
}
134136
}
135137
resbuf := res.Encode()
136-
conn.Write(resbuf.Bytes())
138+
conn.SetWriteDeadline(time.Now().Add(motan.DefaultWriteTimeout))
139+
_, err := conn.Write(resbuf.Bytes())
140+
if err != nil {
141+
vlog.Errorf("connection will close. ip: %s, cause:%s\n", ip, err.Error())
142+
conn.Close()
143+
}
137144
}
138145

139146
func getRemoteIP(address string) string {

0 commit comments

Comments
 (0)