-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathwatcher.go
130 lines (103 loc) · 2.84 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package eureka
import (
"fmt"
"time"
"golang.org/x/net/context"
)
// DefaultPollInterval defines the default interval at which the watcher queries
// the registry.
const DefaultPollInterval = 30 * time.Second
// EventType defines the type of an observed event.
type EventType uint8
const (
// EventInstanceRegistered indicates that a newly registered instance has
// been observed.
EventInstanceRegistered EventType = iota
// EventInstanceDeregistered indicates that a previously registered instance
// is no longer registered.
EventInstanceDeregistered
// EventInstanceUpdated indicates that a previously registered instance has
// changed in the registry, e.g. status or metadata changes have been observed.
EventInstanceUpdated
)
// Event holds information about the type and subject of an observation.
type Event struct {
Type EventType
Instance *Instance
}
// Watcher can be used to observe the registry for changes with respect
// to the instances of particular app.
type Watcher struct {
events chan Event
instances map[string]*Instance
cancel context.CancelFunc
}
// Registry is being used to poll for registered Apps.
type Registry interface {
Apps() ([]*App, error)
}
func newWatcher(registry Registry, pollInterval time.Duration) *Watcher {
ctx, cancel := context.WithCancel(context.Background())
watcher := &Watcher{
events: make(chan Event),
cancel: cancel,
}
go watcher.poll(ctx, registry, pollInterval)
return watcher
}
// Stop the watcher, i.e. the registry is no longer being polled.
func (w *Watcher) Stop() {
w.cancel()
}
// Events returns a channel that can be used to listen for changes to the app
// observed by this watcher.
func (w *Watcher) Events() <-chan Event {
return w.events
}
func (w *Watcher) poll(ctx context.Context, registry Registry, interval time.Duration) {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
if apps, err := registry.Apps(); err == nil {
w.update(apps)
}
case <-ctx.Done():
return
}
}
}
func (w *Watcher) update(apps []*App) {
current := map[string]*Instance{}
// check if instances are new or have changed
for _, a := range apps {
for _, i := range a.Instances {
key := key(a, i)
current[key] = i
prev, found := w.instances[key]
if !found {
w.notify(EventInstanceRegistered, i)
continue
}
delete(w.instances, key)
if !i.Equals(prev) {
w.notify(EventInstanceUpdated, i)
}
}
}
// instances we haven't deleted above are not registered anymore
for _, i := range w.instances {
w.notify(EventInstanceDeregistered, i)
}
// reset instances
w.instances = current
}
func (w *Watcher) notify(t EventType, i *Instance) {
// blocking
w.events <- Event{t, i}
}
func key(a *App, i *Instance) string {
// instance ids might not be unique across apps
return fmt.Sprintf("%s-%s", a.Name, i.ID)
}