Skip to content

Commit 4892c5d

Browse files
authored
Merge pull request #97 from weibocom/dev
Dev
2 parents 5a5ec6d + 7c7a58c commit 4892c5d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+2288
-670
lines changed

agent.go

+48-77
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package motan
22

33
import (
4-
"encoding/json"
54
"flag"
65
"fmt"
76
"io/ioutil"
@@ -27,7 +26,7 @@ const (
2726

2827
type Agent struct {
2928
ConfigFile string
30-
extFactory motan.ExtentionFactory
29+
extFactory motan.ExtensionFactory
3130
Context *motan.Context
3231

3332
agentServer motan.Server
@@ -47,10 +46,10 @@ type Agent struct {
4746
manageHandlers map[string]http.Handler
4847
}
4948

50-
func NewAgent(extfactory motan.ExtentionFactory) *Agent {
49+
func NewAgent(extfactory motan.ExtensionFactory) *Agent {
5150
var agent *Agent
5251
if extfactory == nil {
53-
fmt.Println("agent using default extentionFactory.")
52+
fmt.Println("agent using default extensionFactory.")
5453
agent = &Agent{extFactory: GetDefaultExtFactory()}
5554
} else {
5655
agent = &Agent{extFactory: extfactory}
@@ -106,6 +105,7 @@ func (a *Agent) initParam() {
106105
logdir = "."
107106
}
108107
initLog(logdir)
108+
registerSwitchers(a.Context)
109109

110110
port := *motan.Port
111111
if port == 0 && section != nil && section["port"] != nil {
@@ -165,7 +165,7 @@ func (a *Agent) SetSanpshotConf() {
165165
if snapshotDir == "" {
166166
snapshotDir = registry.DefaultSnapshotDir
167167
}
168-
registry.SetSanpshotConf(registry.DefaultSnapshotInterval, snapshotDir)
168+
registry.SetSnapshotConf(registry.DefaultSnapshotInterval, snapshotDir)
169169
}
170170

171171
func (a *Agent) initAgentURL() {
@@ -215,7 +215,7 @@ func (a *Agent) startAgent() {
215215
}
216216

217217
func (a *Agent) registerAgent() {
218-
vlog.Infoln("start agent regitstry.")
218+
vlog.Infoln("start agent registry.")
219219
if reg, exit := a.agentURL.Parameters[motan.RegistryKey]; exit {
220220
if registryURL, regexit := a.Context.RegistryURLs[reg]; regexit {
221221
registry := a.extFactory.GetRegistry(registryURL)
@@ -232,7 +232,7 @@ func (a *Agent) registerAgent() {
232232
}
233233
}
234234
} else {
235-
vlog.Warningf("can not find agent registry in conf, so do not register. agent url:%s\n", a.agentURL)
235+
vlog.Warningf("can not find agent registry in conf, so do not register. agent url:%s\n", a.agentURL.GetIdentity())
236236
}
237237
}
238238
}
@@ -288,6 +288,7 @@ func (a *Agent) startServerAgent() {
288288
application = a.agentURL.GetParam(motan.ApplicationKey, "")
289289
url.PutParam(motan.ApplicationKey, application)
290290
}
291+
url.ClearCachedInfo()
291292
exporter := &mserver.DefaultExporter{}
292293
provider := a.extFactory.GetProvider(url)
293294
if provider == nil {
@@ -296,7 +297,7 @@ func (a *Agent) startServerAgent() {
296297
}
297298
motan.CanSetContext(provider, globalContext)
298299
motan.Initialize(provider)
299-
provider = mserver.WarperWithFilter(provider, a.extFactory)
300+
provider = mserver.WrapWithFilter(provider, a.extFactory, globalContext)
300301
exporter.SetProvider(provider)
301302
server := a.agentPortServer[url.Port]
302303
if server == nil {
@@ -373,6 +374,14 @@ func initLog(logdir string) {
373374
vlog.LogInit(nil)
374375
}
375376

377+
func registerSwitchers(c *motan.Context) {
378+
switchers, _ := c.Config.GetSection(motan.SwitcherSection)
379+
s := motan.GetSwitcherManager()
380+
for n, v := range switchers {
381+
s.Register(n.(string), v.(bool))
382+
}
383+
}
384+
376385
func getDefaultResponse(requestid uint64, errmsg string) *motan.MotanResponse {
377386
return motan.BuildExceptionResponse(requestid, &motan.Exception{ErrCode: 400, ErrMsg: errmsg, ErrType: motan.ServiceException})
378387
}
@@ -410,24 +419,15 @@ func (a *Agent) RegisterManageHandler(path string, handler http.Handler) {
410419
}
411420

412421
func (a *Agent) startMServer() {
413-
if _, ok := a.manageHandlers["/"]; !ok {
414-
a.manageHandlers["/"] = http.HandlerFunc(a.rootHandler)
415-
}
416-
if _, ok := a.manageHandlers["/503"]; !ok {
417-
a.manageHandlers["/503"] = http.HandlerFunc(a.StatusChangeHandler)
418-
}
419-
if _, ok := a.manageHandlers["/200"]; !ok {
420-
a.manageHandlers["/200"] = http.HandlerFunc(a.StatusChangeHandler)
421-
}
422-
if _, ok := a.manageHandlers["/getConfig"]; !ok {
423-
a.manageHandlers["/getConfig"] = http.HandlerFunc(a.getConfigHandler)
424-
}
425-
if _, ok := a.manageHandlers["/getReferService"]; !ok {
426-
a.manageHandlers["/getReferService"] = http.HandlerFunc(a.getReferServiceHandler)
422+
handlers := make(map[string]http.Handler, 16)
423+
for k, v := range GetDefaultManageHandlers() {
424+
handlers[k] = v
427425
}
428426
for k, v := range a.manageHandlers {
429-
http.Handle(k, v)
430-
vlog.Infof("add manage server handle path:%s\n", k)
427+
handlers[k] = v
428+
}
429+
for k, v := range handlers {
430+
a.mhandle(k, v)
431431
}
432432

433433
vlog.Infof("start listen manage port %d ...\n", a.mport)
@@ -438,60 +438,31 @@ func (a *Agent) startMServer() {
438438
}
439439
}
440440

441-
type rpcService struct {
442-
Name string `json:"name"`
443-
Status bool `json:"status"`
444-
}
445-
446-
type body struct {
447-
Service []rpcService `json:"service"`
448-
}
449-
450-
type jsonRetData struct {
451-
Code int `json:"code"`
452-
Body body `json:"body"`
453-
}
454-
455-
// return agent server status, e.g. 200 or 503
456-
func (a *Agent) rootHandler(w http.ResponseWriter, r *http.Request) {
457-
w.WriteHeader(a.status)
458-
w.Write([]byte(http.StatusText(a.status)))
459-
}
460-
461-
func (a *Agent) getConfigHandler(w http.ResponseWriter, r *http.Request) {
462-
data, err := ioutil.ReadFile(*motan.CfgFile)
463-
if err != nil {
464-
w.Write([]byte("error."))
465-
} else {
466-
w.Write(data)
467-
}
468-
}
469-
470-
func (a *Agent) getReferServiceHandler(w http.ResponseWriter, r *http.Request) {
471-
472-
mbody := body{Service: []rpcService{}}
473-
for _, cls := range a.clustermap {
474-
rpc := cls.GetURL().Path
475-
available := cls.IsAvailable()
476-
mbody.Service = append(mbody.Service, rpcService{Name: rpc, Status: available})
477-
}
478-
retData := &jsonRetData{Code: 200, Body: mbody}
479-
if data, err := json.Marshal(&retData); err == nil {
480-
w.Write(data)
481-
} else {
482-
w.Write([]byte("error."))
441+
func (a *Agent) mhandle(k string, h http.Handler) {
442+
defer func() {
443+
if err := recover(); err != nil {
444+
vlog.Warningf("manageHandler register fail. maybe the pattern '%s' already registered\n", k)
445+
}
446+
}()
447+
if sa, ok := h.(SetAgent); ok {
448+
sa.SetAgent(a)
483449
}
450+
http.HandleFunc(k, func(w http.ResponseWriter, r *http.Request) {
451+
if !PermissionCheck(r) {
452+
w.Write([]byte("need permission!"))
453+
return
454+
}
455+
defer func() {
456+
if err := recover(); err != nil {
457+
fmt.Fprintf(w, "process request err: %s\n", err)
458+
}
459+
}()
460+
h.ServeHTTP(w, r)
461+
})
462+
vlog.Infof("add manage server handle path:%s\n", k)
484463
}
485464

486-
// StatusChangeHandler change agent server status, and set registed services available or unavailable.
487-
func (a *Agent) StatusChangeHandler(w http.ResponseWriter, r *http.Request) {
488-
switch r.RequestURI {
489-
case "/200":
490-
availableService(a.serviceRegistries)
491-
a.status = http.StatusOK
492-
case "/503":
493-
unavailableService(a.serviceRegistries)
494-
a.status = http.StatusServiceUnavailable
495-
}
496-
w.Write([]byte("ok."))
465+
func (a *Agent) getConfigData() []byte {
466+
data, _ := ioutil.ReadFile(*motan.CfgFile)
467+
return data
497468
}

client.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ var (
1919
type MCContext struct {
2020
confFile string
2121
context *motan.Context
22-
extFactory motan.ExtentionFactory
22+
extFactory motan.ExtensionFactory
2323
clients map[string]*Client
2424

2525
csync sync.Mutex
@@ -29,7 +29,7 @@ type MCContext struct {
2929
type Client struct {
3030
url *motan.URL
3131
cluster *cluster.MotanCluster
32-
extFactory motan.ExtentionFactory
32+
extFactory motan.ExtensionFactory
3333
}
3434

3535
func (c *Client) Call(method string, args []interface{}, reply interface{}) error {
@@ -111,6 +111,7 @@ func GetClientContext(confFile string) *MCContext {
111111
logdir = "."
112112
}
113113
initLog(logdir)
114+
registerSwitchers(mc.context)
114115
}
115116
return mc
116117
}
@@ -127,7 +128,7 @@ func (m *MCContext) Initialize() {
127128
}
128129
}
129130

130-
func (m *MCContext) Start(extfactory motan.ExtentionFactory) {
131+
func (m *MCContext) Start(extfactory motan.ExtensionFactory) {
131132
m.csync.Lock()
132133
defer m.csync.Unlock()
133134
m.extFactory = extfactory

0 commit comments

Comments
 (0)