Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CacheProxy Great Again #12

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cacheproxy/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM golang:1.21-alpine3.18 as build

WORKDIR /app
COPY go.* .
COPY proxy proxy
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
CGO_ENABLED=0 go build -o cacheproxy proxy/main.go

FROM alpine:3.18
COPY --from=build /app/cacheproxy /cacheproxy
CMD ["/cacheproxy"]
13 changes: 13 additions & 0 deletions cacheproxy/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module cacheproxy

go 1.21

require (
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5
github.com/redis/go-redis/v9 v9.3.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)
15 changes: 15 additions & 0 deletions cacheproxy/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5 h1:m62nsMU279qRD9PQSWD1l66kmkXzuYcnVJqL4XLeV2M=
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 h1:dWB6v3RcOy03t/bUadywsbyrQwCqZeNIEX6M1OtSZOM=
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8=
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
280 changes: 280 additions & 0 deletions cacheproxy/proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package main

import (
"bufio"
"bytes"
"context"
"crypto/subtle"
"io"
"log"
"math/rand"
"net/http"
"net/http/httputil"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/elazarl/goproxy"
"github.com/redis/go-redis/v9"
)

const (
envRedisURL = "REDIS_URL"
envAuthKey = "AUTH_KEY"
listenAddr = ":8888"
redisDeadline = time.Second * 30
readLockInterval = time.Millisecond * 25
maxCacheDuration = time.Minute * 5
headerAuthKey = "X-CBSProxy-Auth-Key"
headerCacheDuration = "X-CBSProxy-Cache-Duration"
headerCacheOverride = "X-CBSProxy-Cache-Override"
headerCached = "X-CBSProxy-Cached"
)

type cachingData struct {
cacheKey string
cacheFor time.Duration
alreadyCached bool
}

type cachingContext struct {
context.Context
key string
keyUnlock func()
}

type cachingHandler struct {
redis *redis.Client
proxy *goproxy.ProxyHttpServer

mu sync.Mutex
keylocks map[string]*sync.RWMutex
authKey string
}

func (c *cachingHandler) getFromCache(ctx context.Context, key string) (string, error) {
rUnlock := c.rLockKey(key)
defer rUnlock()

ctx, cancel := context.WithTimeout(ctx, redisDeadline)
defer cancel()

return c.redis.Get(ctx, key).Result()
}

func (c *cachingHandler) storeInCache(ctx context.Context, key string, value []byte, d time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, redisDeadline)
defer cancel()

_, err := c.redis.SetNX(ctx, key, value, d).Result()
return err
}

func (c *cachingHandler) getKeyLock(key string) *sync.RWMutex {
c.mu.Lock()
defer c.mu.Unlock()

keyMu, ok := c.keylocks[key]
if !ok {
keyMu = new(sync.RWMutex)
c.keylocks[key] = keyMu
}

return keyMu
}

// rLockKey locks a key for reading, blocking if needed.
func (c *cachingHandler) rLockKey(key string) (unlock func()) {
keyMu := c.getKeyLock(key)

keyMu.RLock()
return keyMu.RUnlock
}

// tryLockKey tries to lock a key for writing without blocking.
func (c *cachingHandler) tryLockKey(key string) (ok bool, unlock func()) {
keyMu := c.getKeyLock(key)

if keyMu.TryLock() {
return true, keyMu.Unlock
}
return false, nil
}

func (c *cachingHandler) getCacheDuration(r *http.Request) time.Duration {
defer r.Header.Del(headerCacheDuration)

val := r.Header.Get(headerCacheDuration)
if val == "" {
return 0
}

// Try parse duration first.
if dur, err := time.ParseDuration(val); err == nil {
return dur
}

// Parse number of seconds
if durS, err := strconv.Atoi(val); err == nil {
return time.Second * time.Duration(durS)
}

return 0
}

func (c *cachingHandler) overrideCacheFlag(r *http.Request) bool {
defer r.Header.Del(headerCacheOverride)

return r.Header.Get(headerCacheOverride) != ""
}

func (c *cachingHandler) validateDuration(d time.Duration) time.Duration {
if d > 0 && d < time.Second {
return time.Second
}

// the final duration fits: d <= 0 || (d >= time.Second && d <= maxCacheDuration)
return min(d, maxCacheDuration)
}

func (c *cachingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := &cachingContext{
Context: r.Context(),
key: "",
keyUnlock: nil,
}

// proxy.ServeHTTP can fail after OnRequest has locked a key,
// in which case that key would stay locked forever without this defer
defer func() {
if ctx.keyUnlock != nil {
log.Printf("unlocking cache lock for %q", ctx.key)
ctx.keyUnlock()
}
}()

c.proxy.ServeHTTP(w, r.WithContext(ctx))
}

func (c *cachingHandler) OnRequest(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
if subtle.ConstantTimeCompare([]byte(r.Header.Get(headerAuthKey)), []byte(c.authKey)) != 1 {
return nil, &http.Response{
StatusCode: http.StatusUnauthorized,
Header: make(http.Header),
Body: io.NopCloser(bytes.NewReader(nil)),
Trailer: make(http.Header),
Request: r,
}
}

cd := &cachingData{
cacheKey: r.URL.String(),
cacheFor: c.validateDuration(c.getCacheDuration(r)),
alreadyCached: false,
}

ctx.UserData = cd

// Forcefully override cache using response to this request.
if c.overrideCacheFlag(r) {
return r, nil
}

// Loop needed because the cache can be empty, but when we try to acquire a write lock,
// someone could've already gotten it first, and we need to wait for them to finish and read the result.
for {
cachedResponseString, err := c.getFromCache(r.Context(), cd.cacheKey)
if err == nil && len(cachedResponseString) > 0 {
// Return cached response or proxy the request if the cache contains an invalid entry
cachedResp, err := http.ReadResponse(bufio.NewReader(strings.NewReader(cachedResponseString)), r)
if err != nil {
ctx.Warnf("parsing cached response: %s", err)
return r, nil
}

ctx.Logf("returning cached response for %q", cd.cacheKey)
cd.alreadyCached = true
return nil, cachedResp
}

ok, unlock := c.tryLockKey(cd.cacheKey)
if !ok {
// sleep for 25±5ms before next iteration
time.Sleep(time.Duration(float64(readLockInterval) * (1 + rand.Float64()*0.4 - 0.2)))
continue
}

// Write lock acquired:
// - save the unlock function to be called in the ServeHTTP defer
// - proxy the request and then save the response
ctx.Logf("cache lock acquired for %q, will proxy request", cd.cacheKey)
cachingCtx := r.Context().(*cachingContext)
cachingCtx.keyUnlock = unlock
cachingCtx.key = cd.cacheKey
return r, nil
}
}

func (c *cachingHandler) OnResponse(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response {
if ctx.UserData == nil {
// UserData isn't set when request authorization fails.
return resp
} else if resp == nil {
// resp is nil when an error has occurred
return nil
}

cd, ok := ctx.UserData.(*cachingData)
if !ok {
ctx.Warnf("proxy context data contained %T instead of *cachingData", ctx.UserData)
return resp
}

resp.Header.Set(headerCached, strconv.FormatBool(cd.alreadyCached))
if cd.alreadyCached || cd.cacheFor <= 0 {
return resp
}

dump, err := httputil.DumpResponse(resp, true)
if err != nil {
ctx.Warnf("dumping HTTP response with body: %s", err)
return resp
}

if err := c.storeInCache(ctx.Req.Context(), cd.cacheKey, dump, cd.cacheFor); err != nil {
ctx.Warnf("storing dumped result in cache: %s", err)
}
return resp
}

func main() {
redopts, err := redis.ParseURL(os.Getenv(envRedisURL))
if err != nil {
log.Fatalf("Failed to parse %s: %s", envRedisURL, err)
}

handler := &cachingHandler{
redis: redis.NewClient(redopts),
proxy: goproxy.NewProxyHttpServer(),
keylocks: make(map[string]*sync.RWMutex),
authKey: os.Getenv(envAuthKey),
}

handler.proxy.Verbose = true
handler.proxy.OnRequest().HandleConnect(goproxy.AlwaysMitm)
handler.proxy.OnRequest().DoFunc(handler.OnRequest)
handler.proxy.OnResponse().DoFunc(handler.OnResponse)

log.Printf("Proxy started on %s", listenAddr)
srv := &http.Server{
Addr: listenAddr,
Handler: handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: time.Minute,
IdleTimeout: time.Minute * 2,
}

log.Fatal(srv.ListenAndServe())
}
22 changes: 15 additions & 7 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ services:
volumes:
- ./server/app:/app
environment:
REDIS_URL: 'redis://redis:6379/0'
POSTGRES_DSN: 'host=postgres port=5432 user=farm password=farm dbname=farm'
REDIS_URL: "redis://redis:6379/0"
POSTGRES_DSN: "host=postgres port=5432 user=farm password=farm dbname=farm"
SERVER_PASSWORD: ${SERVER_PASSWORD}
restart: unless-stopped
depends_on:
Expand All @@ -21,8 +21,8 @@ services:
volumes:
- ./server/app:/app
environment:
CELERY_BROKER_URL: 'redis://redis:6379/1'
POSTGRES_DSN: 'host=postgres port=5432 user=farm password=farm dbname=farm'
CELERY_BROKER_URL: "redis://redis:6379/1"
POSTGRES_DSN: "host=postgres port=5432 user=farm password=farm dbname=farm"
restart: unless-stopped
depends_on:
postgres:
Expand Down Expand Up @@ -53,10 +53,11 @@ services:
start_period: 30s

cacheproxy:
build: httpproxy
build: cacheproxy
restart: unless-stopped
environment:
REDIS_URL: 'redis://redis:6379/2'
REDIS_URL: "redis://redis:6379/2"
AUTH_KEY: "${CACHEPROXY_AUTH_KEY:-changeme}"
ports:
- "8888:8888"
depends_on:
Expand All @@ -74,6 +75,13 @@ services:
ports:
- "6378:6379"
restart: unless-stopped
command: ["redis-server", "--appendonly", "yes", "--requirepass", "${EXTERNAL_REDIS_PASSWORD:-changeme}"]
command:
[
"redis-server",
"--appendonly",
"yes",
"--requirepass",
"${EXTERNAL_REDIS_PASSWORD:-changeme}",
]
volumes:
- ./vol/external_redis:/data
17 changes: 0 additions & 17 deletions httpproxy/Dockerfile

This file was deleted.

Loading