From 1e8a4e725162cf32bb52e87c4cebc7c572cb6216 Mon Sep 17 00:00:00 2001 From: Domenico Testa Date: Wed, 6 May 2026 11:28:19 +0200 Subject: [PATCH] feat: wiring the worker in the main loop --- Dockerfile | 7 ++ main.go | 280 +++++++++++++++++++++++++++++++++++++++++++++++------ worker.go | 250 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 507 insertions(+), 30 deletions(-) create mode 100644 worker.go diff --git a/Dockerfile b/Dockerfile index 81b5692..361366b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,14 @@ RUN go mod download COPY . . RUN CGO_ENABLED=0 go build -v -ldflags="-s -w" -o /usr/bin/app ./... +# Empty cache dir; ownership is set during COPY into the final stage so the +# distroless `nonroot` user (uid 65532) can write to it without an external +# volume mount. +RUN mkdir -p /cache + FROM gcr.io/distroless/static:nonroot EXPOSE 8080 COPY --from=build /usr/bin/app /app +COPY --from=build --chown=65532:65532 /cache /var/cache/pz8-relay +VOLUME /var/cache/pz8-relay ENTRYPOINT ["/app"] diff --git a/main.go b/main.go index e5f604b..5f4536b 100644 --- a/main.go +++ b/main.go @@ -1,61 +1,261 @@ package main import ( + "compress/gzip" + "context" "crypto/subtle" + "errors" + "fmt" + "io" "log/slog" "net/http" - "net/http/httputil" - "net/url" "os" + "os/signal" + "path/filepath" "strings" + "sync" + "syscall" + "time" ) +type Config struct { + TargetURL string + Username string + Password string + ListenAddr string + PublicURL string + EPGURLs []string + EPGRefresh time.Duration + PreferPlaylistEPG bool + CacheDir string + EPGContentType string +} + +func (c *Config) epgEnabled() bool { + return len(c.EPGURLs) > 0 || c.PreferPlaylistEPG +} + func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) slog.SetDefault(logger) - target := mustEnv("PZ8_RELAY_TARGET_URL") - username := mustEnv("PZ8_RELAY_USERNAME") - password := mustEnv("PZ8_RELAY_PASSWORD") - - addr := os.Getenv("PZ8_RELAY_LISTEN_ADDR") - if addr == "" { - addr = ":8080" - } - - targetURL, err := url.Parse(target) + cfg, err := loadConfig() if err != nil { - slog.Error("invalid PZ8_RELAY_TARGET_URL", "err", err) + slog.Error("config", "err", err) os.Exit(1) } - proxy := &httputil.ReverseProxy{ - Rewrite: func(preq *httputil.ProxyRequest) { - preq.SetURL(targetURL) - }, + if err := os.MkdirAll(cfg.CacheDir, 0o755); err != nil { + slog.Error("cache dir", "err", err, "path", cfg.CacheDir) + os.Exit(1) } - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + refresher := NewRefresher(RefreshConfig{ + TargetURL: cfg.TargetURL, + PublicURL: cfg.PublicURL, + EPGURLs: cfg.EPGURLs, + PreferPlaylistEPG: cfg.PreferPlaylistEPG, + CacheDir: cfg.CacheDir, + Interval: cfg.EPGRefresh, + }, &http.Client{Timeout: 5 * time.Minute}) + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + refresher.Run(ctx) + }() + + mux := http.NewServeMux() + mux.HandleFunc("/healthz", healthz(refresher)) + mux.HandleFunc("/playlist", basicAuth(cfg, servePlaylist(cfg, refresher))) + mux.HandleFunc("/epg", basicAuth(cfg, serveEPG(cfg, refresher))) + mux.HandleFunc("/", basicAuth(cfg, servePlaylist(cfg, refresher))) // alias + + srv := &http.Server{Addr: cfg.ListenAddr, Handler: mux} + + go func() { + <-ctx.Done() + shutCtx, shutCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutCancel() + _ = srv.Shutdown(shutCtx) + }() + + slog.Info("pz8-relay listening", + "addr", cfg.ListenAddr, + "target", redactURL(cfg.TargetURL), + "epg_enabled", cfg.epgEnabled(), + "cache_dir", cfg.CacheDir, + ) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("server", "err", err) + os.Exit(1) + } + wg.Wait() +} + +func loadConfig() (*Config, error) { + cfg := &Config{ + TargetURL: mustEnv("PZ8_RELAY_TARGET_URL"), + Username: mustEnv("PZ8_RELAY_USERNAME"), + Password: mustEnv("PZ8_RELAY_PASSWORD"), + ListenAddr: envOr("PZ8_RELAY_LISTEN_ADDR", ":8080"), + PublicURL: strings.TrimRight(os.Getenv("PZ8_RELAY_PUBLIC_URL"), "/"), + CacheDir: envOr("PZ8_RELAY_CACHE_DIR", "/var/cache/pz8-relay"), + EPGContentType: envOr("PZ8_RELAY_EPG_CONTENT_TYPE", "application/xml"), + } + + if raw := os.Getenv("PZ8_RELAY_EPG_URLS"); raw != "" { + for _, u := range strings.Split(raw, ",") { + if u = strings.TrimSpace(u); u != "" { + cfg.EPGURLs = append(cfg.EPGURLs, u) + } + } + } + + d, err := time.ParseDuration(envOr("PZ8_RELAY_EPG_REFRESH", "12h")) + if err != nil { + return nil, fmt.Errorf("PZ8_RELAY_EPG_REFRESH: %w", err) + } + cfg.EPGRefresh = d + cfg.PreferPlaylistEPG = strings.EqualFold(os.Getenv("PZ8_RELAY_PREFER_PLAYLIST_EPG"), "true") + + if cfg.epgEnabled() && cfg.PublicURL == "" { + return nil, errors.New("PZ8_RELAY_PUBLIC_URL is required when EPG is enabled") + } + return cfg, nil +} + +// --- HTTP handlers --- + +func basicAuth(cfg *Config, next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { u, p, ok := r.BasicAuth() - if !ok || subtle.ConstantTimeCompare([]byte(u), []byte(username)) != 1 || - subtle.ConstantTimeCompare([]byte(p), []byte(password)) != 1 { - slog.Warn("unauthorized request", "client", clientIP(r)) + if !ok || + subtle.ConstantTimeCompare([]byte(u), []byte(cfg.Username)) != 1 || + subtle.ConstantTimeCompare([]byte(p), []byte(cfg.Password)) != 1 { + slog.Warn("unauthorized request", "client", clientIP(r), "path", r.URL.Path) w.Header().Set("WWW-Authenticate", `Basic realm="pz8-relay"`) http.Error(w, "unauthorized", http.StatusUnauthorized) return } - - slog.Info("proxying request", "client", clientIP(r)) - proxy.ServeHTTP(w, r) - }) - - slog.Info("pz8-relay listening", "addr", addr, "target", targetURL.Redacted()) - if err := http.ListenAndServe(addr, nil); err != nil { - slog.Error("server stopped", "err", err) - os.Exit(1) + next(w, r) } } +func servePlaylist(cfg *Config, r *Refresher) http.HandlerFunc { + path := filepath.Join(cfg.CacheDir, playlistFilename) + return func(w http.ResponseWriter, req *http.Request) { + if !r.Ready() { + retryAfter(w, http.StatusServiceUnavailable, "warming up") + return + } + f, err := os.Open(path) + if err != nil { + retryAfter(w, http.StatusServiceUnavailable, "playlist not yet available") + return + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + http.Error(w, "stat", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + http.ServeContent(w, req, playlistFilename, fi.ModTime(), f) + } +} + +func serveEPG(cfg *Config, r *Refresher) http.HandlerFunc { + path := filepath.Join(cfg.CacheDir, epgFilename) + return func(w http.ResponseWriter, req *http.Request) { + if !r.Ready() { + retryAfter(w, http.StatusServiceUnavailable, "warming up") + return + } + f, err := os.Open(path) + if err != nil { + retryAfter(w, http.StatusServiceUnavailable, "epg not yet available") + return + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + http.Error(w, "stat", http.StatusInternalServerError) + return + } + + if clientAcceptsGzip(req) { + w.Header().Set("Content-Encoding", "gzip") + w.Header().Set("Content-Type", cfg.EPGContentType) + http.ServeContent(w, req, epgFilename, fi.ModTime(), f) + return + } + + // Decompress on the fly. ServeContent can't help here because we + // wrap the file in gzip.Reader, so handle If-Modified-Since manually. + mod := fi.ModTime() + if notModifiedSince(req, mod) { + w.WriteHeader(http.StatusNotModified) + return + } + gr, err := gzip.NewReader(f) + if err != nil { + slog.Error("epg cache corrupted", "err", err) + http.Error(w, "epg unavailable", http.StatusInternalServerError) + return + } + defer gr.Close() + + w.Header().Set("Content-Type", cfg.EPGContentType) + w.Header().Set("Last-Modified", mod.UTC().Format(http.TimeFormat)) + _, _ = io.Copy(w, gr) + } +} + +func healthz(r *Refresher) http.HandlerFunc { + return func(w http.ResponseWriter, _ *http.Request) { + if !r.Ready() { + http.Error(w, "warming up", http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, "ok\n") + } +} + +// --- helpers --- + +func clientAcceptsGzip(r *http.Request) bool { + for _, v := range strings.Split(r.Header.Get("Accept-Encoding"), ",") { + // crude but sufficient — no q-value parsing needed for our use + if strings.EqualFold(strings.TrimSpace(strings.SplitN(v, ";", 2)[0]), "gzip") { + return true + } + } + return false +} + +func notModifiedSince(r *http.Request, mod time.Time) bool { + h := r.Header.Get("If-Modified-Since") + if h == "" { + return false + } + t, err := http.ParseTime(h) + if err != nil { + return false + } + return !mod.Truncate(time.Second).After(t) +} + +func retryAfter(w http.ResponseWriter, code int, msg string) { + w.Header().Set("Retry-After", "30") + http.Error(w, msg, code) +} + // clientIP returns the original client address, trusting X-Forwarded-For // because this service runs behind Traefik. Do not expose directly. func clientIP(r *http.Request) string { @@ -68,6 +268,19 @@ func clientIP(r *http.Request) string { return r.RemoteAddr } +func redactURL(u string) string { + // Cheap redaction: drop anything between "//" and "@" (userinfo). + i := strings.Index(u, "://") + if i < 0 { + return u + } + rest := u[i+3:] + if at := strings.Index(rest, "@"); at >= 0 { + return u[:i+3] + "***@" + rest[at+1:] + } + return u +} + func mustEnv(key string) string { v := os.Getenv(key) if v == "" { @@ -76,3 +289,10 @@ func mustEnv(key string) string { } return v } + +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..520bfc6 --- /dev/null +++ b/worker.go @@ -0,0 +1,250 @@ +package main + +import ( + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +const ( + playlistFilename = "playlist.m3u" + epgFilename = "epg.xml.gz" +) + +type RefreshConfig struct { + TargetURL string + PublicURL string // base URL of this relay; "/epg" appended for url-tvg rewrite + EPGURLs []string + PreferPlaylistEPG bool + CacheDir string + Interval time.Duration +} + +func (c RefreshConfig) epgEnabled() bool { + return len(c.EPGURLs) > 0 || c.PreferPlaylistEPG +} + +// Refresher periodically refreshes the cached playlist and EPG. HTTP handlers +// read the cached files; the worker runs single-threaded and writes them +// atomically via os.Rename, so no synchronization is needed between the two. +type Refresher struct { + cfg RefreshConfig + client *http.Client + + readyCh chan struct{} + once sync.Once +} + +func NewRefresher(cfg RefreshConfig, client *http.Client) *Refresher { + return &Refresher{ + cfg: cfg, + client: client, + readyCh: make(chan struct{}), + } +} + +// Run blocks until ctx is cancelled. It performs an immediate refresh on entry +// (so the cache is warm as quickly as possible), then ticks at cfg.Interval. +func (r *Refresher) Run(ctx context.Context) { + if err := r.refresh(ctx); err != nil { + slog.Warn("initial refresh failed", "err", err) + } else { + r.markReady() + } + + t := time.NewTicker(r.cfg.Interval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if err := r.refresh(ctx); err != nil { + slog.Warn("refresh failed", "err", err) + continue + } + r.markReady() + } + } +} + +// Ready reports whether at least one refresh has succeeded. Handlers use this +// to return 503 cleanly during the warm-up window. +func (r *Refresher) Ready() bool { + select { + case <-r.readyCh: + return true + default: + return false + } +} + +func (r *Refresher) markReady() { + r.once.Do(func() { close(r.readyCh) }) +} + +func (r *Refresher) refresh(ctx context.Context) error { + if err := os.MkdirAll(r.cfg.CacheDir, 0o755); err != nil { + return fmt.Errorf("cache dir: %w", err) + } + + var epgPublicURL string + if r.cfg.epgEnabled() { + epgPublicURL = strings.TrimRight(r.cfg.PublicURL, "/") + "/epg" + } + + tvgIDs, playlistEPG, err := r.refreshPlaylist(ctx, epgPublicURL) + if err != nil { + return fmt.Errorf("playlist: %w", err) + } + slog.Info("playlist refreshed", + "tvg_ids", len(tvgIDs), + "playlist_has_epg", playlistEPG != "", + ) + + if !r.cfg.epgEnabled() { + return nil + } + + epgURLs := r.cfg.EPGURLs + if r.cfg.PreferPlaylistEPG && playlistEPG != "" { + epgURLs = []string{playlistEPG} + } + if len(epgURLs) == 0 { + slog.Warn("EPG enabled but no usable source URLs", + "prefer_playlist_epg", r.cfg.PreferPlaylistEPG, + "configured_urls", len(r.cfg.EPGURLs), + ) + return nil + } + + sourcePaths, err := r.fetchEPGSources(ctx, epgURLs) + defer func() { + for _, p := range sourcePaths { + _ = os.Remove(p) + } + }() + if err != nil { + return err + } + + return r.persistMergedEPG(sourcePaths, buildIDMap(tvgIDs)) +} + +func (r *Refresher) refreshPlaylist(ctx context.Context, epgURL string) (tvgIDs []string, playlistEPG string, err error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.cfg.TargetURL, nil) + if err != nil { + return nil, "", err + } + resp, err := r.client.Do(req) + if err != nil { + return nil, "", err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("upstream status %s", resp.Status) + } + + tmp := filepath.Join(r.cfg.CacheDir, playlistFilename+".tmp") + f, err := os.Create(tmp) + if err != nil { + return nil, "", err + } + cleanup := func() { + _ = f.Close() + _ = os.Remove(tmp) + } + + tvgIDs, playlistEPG, err = processPlaylist(resp.Body, f, epgURL) + if err != nil { + cleanup() + return nil, "", err + } + if err := f.Sync(); err != nil { + cleanup() + return nil, "", err + } + if err := f.Close(); err != nil { + _ = os.Remove(tmp) + return nil, "", err + } + if err := os.Rename(tmp, filepath.Join(r.cfg.CacheDir, playlistFilename)); err != nil { + _ = os.Remove(tmp) + return nil, "", err + } + return tvgIDs, playlistEPG, nil +} + +func (r *Refresher) fetchEPGSources(ctx context.Context, urls []string) (paths []string, err error) { + for i, url := range urls { + path := filepath.Join(r.cfg.CacheDir, fmt.Sprintf("epg-src-%d.xml.tmp", i)) + if ferr := fetchEPGSource(ctx, r.client, url, path); ferr != nil { + slog.Warn("epg source fetch failed", "url", url, "err", ferr) + _ = os.Remove(path) + continue + } + paths = append(paths, path) + } + if len(paths) == 0 { + return nil, errors.New("no EPG sources fetched successfully") + } + return paths, nil +} + +func (r *Refresher) persistMergedEPG(sourcePaths []string, idMap map[string]string) error { + sources := make([]io.ReadSeeker, 0, len(sourcePaths)) + closers := make([]io.Closer, 0, len(sourcePaths)) + defer func() { + for _, c := range closers { + _ = c.Close() + } + }() + for _, p := range sourcePaths { + f, err := os.Open(p) + if err != nil { + return err + } + sources = append(sources, f) + closers = append(closers, f) + } + + tmp := filepath.Join(r.cfg.CacheDir, epgFilename+".tmp") + f, err := os.Create(tmp) + if err != nil { + return err + } + cleanup := func() { + _ = f.Close() + _ = os.Remove(tmp) + } + + gw := gzip.NewWriter(f) + if mergeErr := mergeEPG(sources, idMap, gw); mergeErr != nil { + _ = gw.Close() + cleanup() + return mergeErr + } + if err := gw.Close(); err != nil { + cleanup() + return err + } + if err := f.Sync(); err != nil { + cleanup() + return err + } + if err := f.Close(); err != nil { + _ = os.Remove(tmp) + return err + } + return os.Rename(tmp, filepath.Join(r.cfg.CacheDir, epgFilename)) +}