Skip to content

Commit 5f66b0d

Browse files
added telemetry api support
1 parent 71388dd commit 5f66b0d

File tree

4 files changed

+107
-5
lines changed

4 files changed

+107
-5
lines changed

cmd/aws-lambda-rie/util.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ package main
55

66
import (
77
"fmt"
8-
"net/http")
8+
"net/http"
9+
)
910

1011
type ErrorType int
1112

lambda/logging/internal_log_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,3 @@ func BenchmarkLogrusDebugWithFieldLogLevelDisabledInternalFormatter(b *testing.B
164164
l.WithField("field", "value").Debug(1, "two", true)
165165
}
166166
}
167-

lambda/rapi/handler/runtimelogs_stub.go

+104-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,17 @@
44
package handler
55

66
import (
7+
"bufio"
8+
"bytes"
9+
"encoding/json"
10+
"fmt"
11+
"io/ioutil"
712
"net/http"
13+
"net/url"
14+
"os"
15+
"strings"
16+
"sync"
17+
"time"
818

919
log "github.com/sirupsen/logrus"
1020
"go.amzn.com/lambda/rapi/model"
@@ -16,6 +26,24 @@ const (
1626
telemetryAPIDisabledErrorType = "Telemetry.NotSupported"
1727
)
1828

29+
type runtimeTelemetryBuffering struct {
30+
MaxBytes int64 `json:"maxBytes"`
31+
MaxItems int `json:"maxItems"`
32+
TimeoutMs int64 `json:"timeoutMs"`
33+
}
34+
35+
type runtimeTelemetryDestination struct {
36+
URI string `json:"URI"`
37+
Protocol string `json:"protocol"`
38+
}
39+
40+
type runtimeTelemetryRequest struct {
41+
Buffering runtimeTelemetryBuffering `json:"buffering"`
42+
Destination runtimeTelemetryDestination `json:"destination"`
43+
Types []string `json:"types"`
44+
SchemaVersion string `json:"schemaVersion"`
45+
}
46+
1947
type runtimeLogsStubAPIHandler struct{}
2048

2149
func (h *runtimeLogsStubAPIHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
@@ -34,9 +62,36 @@ func NewRuntimeLogsAPIStubHandler() http.Handler {
3462
return &runtimeLogsStubAPIHandler{}
3563
}
3664

37-
type runtimeTelemetryAPIStubHandler struct{}
65+
type runtimeTelemetryAPIStubHandler struct {
66+
destinations []string
67+
mu sync.Mutex
68+
}
3869

3970
func (h *runtimeTelemetryAPIStubHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
71+
var runtimeReq runtimeTelemetryRequest
72+
body, err := ioutil.ReadAll(request.Body)
73+
if err != nil {
74+
log.WithError(err).Warn("Error while reading request body")
75+
http.Error(writer, err.Error(), http.StatusInternalServerError)
76+
}
77+
err = json.Unmarshal(body, &runtimeReq)
78+
if err != nil {
79+
log.WithError(err).Warn("Error while unmarshaling request")
80+
http.Error(writer, err.Error(), http.StatusInternalServerError)
81+
}
82+
if len(runtimeReq.Destination.URI) > 0 && runtimeReq.Destination.Protocol == "HTTP" {
83+
u, err := url.Parse(runtimeReq.Destination.URI)
84+
if err != nil {
85+
log.WithError(err).Warn("Error while parsing destination URL")
86+
http.Error(writer, err.Error(), http.StatusInternalServerError)
87+
}
88+
if sep := strings.IndexRune(u.Host, ':'); sep != -1 && u.Host[:sep] == "sandbox" {
89+
u.Host = "localhost" + u.Host[sep:]
90+
}
91+
h.mu.Lock()
92+
h.destinations = append(h.destinations, u.String())
93+
h.mu.Unlock()
94+
}
4095
if err := rendering.RenderJSON(http.StatusAccepted, writer, request, &model.ErrorResponse{
4196
ErrorType: telemetryAPIDisabledErrorType,
4297
ErrorMessage: "Telemetry API is not supported",
@@ -46,8 +101,55 @@ func (h *runtimeTelemetryAPIStubHandler) ServeHTTP(writer http.ResponseWriter, r
46101
}
47102
}
48103

104+
type logMessage struct {
105+
Time string `json:"time"`
106+
Type string `json:"type"`
107+
Record string `json:"record"`
108+
}
109+
49110
// NewRuntimeTelemetryAPIStubHandler returns a new instance of http handler
50111
// for serving /runtime/logs when a telemetry service implementation is absent
51112
func NewRuntimeTelemetryAPIStubHandler() http.Handler {
52-
return &runtimeTelemetryAPIStubHandler{}
113+
handler := runtimeTelemetryAPIStubHandler{}
114+
originalStdout := os.Stdout
115+
r, w, _ := os.Pipe()
116+
os.Stdout = w
117+
os.Stderr = w
118+
scanner := bufio.NewScanner(r)
119+
scanner.Split(bufio.ScanLines)
120+
go func() {
121+
for {
122+
if len(handler.destinations) > 0 {
123+
var msgs []logMessage
124+
for scanner.Scan() && len(msgs) < 10 {
125+
line := scanner.Text()
126+
originalStdout.WriteString(fmt.Sprintf("%s\n", line))
127+
msgs = append(msgs, logMessage{
128+
Time: time.Now().Format("2006-01-02T15:04:05.999Z"),
129+
Type: "function",
130+
Record: line,
131+
})
132+
}
133+
data, err := json.Marshal(msgs)
134+
if err != nil {
135+
originalStdout.WriteString(fmt.Sprintf("%s\n", err))
136+
}
137+
bodyReader := bytes.NewReader(data)
138+
handler.mu.Lock()
139+
destinations := handler.destinations
140+
handler.mu.Unlock()
141+
for _, dest := range destinations {
142+
resp, err := http.Post(dest, "application/json", bodyReader)
143+
if err != nil {
144+
originalStdout.WriteString(fmt.Sprintf("%s\n", err))
145+
}
146+
if resp.StatusCode > 300 {
147+
originalStdout.WriteString(fmt.Sprintf("failed to send logs to destination %q: status %d", dest, resp.StatusCode))
148+
}
149+
}
150+
}
151+
time.Sleep(5 * time.Second)
152+
}
153+
}()
154+
return &handler
53155
}

lambda/rapid/handlers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ const (
4141

4242
const (
4343
// Same value as defined in LambdaSandbox minus 1.
44-
maxExtensionNamesLength = 127
44+
maxExtensionNamesLength = 127
4545
standaloneShutdownReason = "spindown"
4646
)
4747

0 commit comments

Comments
 (0)