feat: wiring the worker in the main loop

This commit is contained in:
2026-05-06 11:28:19 +02:00
parent b592a0e229
commit 1e8a4e7251
3 changed files with 507 additions and 30 deletions
+7
View File
@@ -8,7 +8,14 @@ RUN go mod download
COPY . . COPY . .
RUN CGO_ENABLED=0 go build -v -ldflags="-s -w" -o /usr/bin/app ./... RUN CGO_ENABLED=0 go build -v -ldflags="-s -w" -o /usr/bin/app ./...
# Empty cache dir; ownership is set during COPY into the final stage so the
# distroless `nonroot` user (uid 65532) can write to it without an external
# volume mount.
RUN mkdir -p /cache
FROM gcr.io/distroless/static:nonroot FROM gcr.io/distroless/static:nonroot
EXPOSE 8080 EXPOSE 8080
COPY --from=build /usr/bin/app /app COPY --from=build /usr/bin/app /app
COPY --from=build --chown=65532:65532 /cache /var/cache/pz8-relay
VOLUME /var/cache/pz8-relay
ENTRYPOINT ["/app"] ENTRYPOINT ["/app"]
+250 -30
View File
@@ -1,61 +1,261 @@
package main package main
import ( import (
"compress/gzip"
"context"
"crypto/subtle" "crypto/subtle"
"errors"
"fmt"
"io"
"log/slog" "log/slog"
"net/http" "net/http"
"net/http/httputil"
"net/url"
"os" "os"
"os/signal"
"path/filepath"
"strings" "strings"
"sync"
"syscall"
"time"
) )
type Config struct {
TargetURL string
Username string
Password string
ListenAddr string
PublicURL string
EPGURLs []string
EPGRefresh time.Duration
PreferPlaylistEPG bool
CacheDir string
EPGContentType string
}
func (c *Config) epgEnabled() bool {
return len(c.EPGURLs) > 0 || c.PreferPlaylistEPG
}
func main() { func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
slog.SetDefault(logger) slog.SetDefault(logger)
target := mustEnv("PZ8_RELAY_TARGET_URL") cfg, err := loadConfig()
username := mustEnv("PZ8_RELAY_USERNAME")
password := mustEnv("PZ8_RELAY_PASSWORD")
addr := os.Getenv("PZ8_RELAY_LISTEN_ADDR")
if addr == "" {
addr = ":8080"
}
targetURL, err := url.Parse(target)
if err != nil { if err != nil {
slog.Error("invalid PZ8_RELAY_TARGET_URL", "err", err) slog.Error("config", "err", err)
os.Exit(1) os.Exit(1)
} }
proxy := &httputil.ReverseProxy{ if err := os.MkdirAll(cfg.CacheDir, 0o755); err != nil {
Rewrite: func(preq *httputil.ProxyRequest) { slog.Error("cache dir", "err", err, "path", cfg.CacheDir)
preq.SetURL(targetURL) os.Exit(1)
},
} }
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { refresher := NewRefresher(RefreshConfig{
TargetURL: cfg.TargetURL,
PublicURL: cfg.PublicURL,
EPGURLs: cfg.EPGURLs,
PreferPlaylistEPG: cfg.PreferPlaylistEPG,
CacheDir: cfg.CacheDir,
Interval: cfg.EPGRefresh,
}, &http.Client{Timeout: 5 * time.Minute})
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
refresher.Run(ctx)
}()
mux := http.NewServeMux()
mux.HandleFunc("/healthz", healthz(refresher))
mux.HandleFunc("/playlist", basicAuth(cfg, servePlaylist(cfg, refresher)))
mux.HandleFunc("/epg", basicAuth(cfg, serveEPG(cfg, refresher)))
mux.HandleFunc("/", basicAuth(cfg, servePlaylist(cfg, refresher))) // alias
srv := &http.Server{Addr: cfg.ListenAddr, Handler: mux}
go func() {
<-ctx.Done()
shutCtx, shutCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutCancel()
_ = srv.Shutdown(shutCtx)
}()
slog.Info("pz8-relay listening",
"addr", cfg.ListenAddr,
"target", redactURL(cfg.TargetURL),
"epg_enabled", cfg.epgEnabled(),
"cache_dir", cfg.CacheDir,
)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
slog.Error("server", "err", err)
os.Exit(1)
}
wg.Wait()
}
func loadConfig() (*Config, error) {
cfg := &Config{
TargetURL: mustEnv("PZ8_RELAY_TARGET_URL"),
Username: mustEnv("PZ8_RELAY_USERNAME"),
Password: mustEnv("PZ8_RELAY_PASSWORD"),
ListenAddr: envOr("PZ8_RELAY_LISTEN_ADDR", ":8080"),
PublicURL: strings.TrimRight(os.Getenv("PZ8_RELAY_PUBLIC_URL"), "/"),
CacheDir: envOr("PZ8_RELAY_CACHE_DIR", "/var/cache/pz8-relay"),
EPGContentType: envOr("PZ8_RELAY_EPG_CONTENT_TYPE", "application/xml"),
}
if raw := os.Getenv("PZ8_RELAY_EPG_URLS"); raw != "" {
for _, u := range strings.Split(raw, ",") {
if u = strings.TrimSpace(u); u != "" {
cfg.EPGURLs = append(cfg.EPGURLs, u)
}
}
}
d, err := time.ParseDuration(envOr("PZ8_RELAY_EPG_REFRESH", "12h"))
if err != nil {
return nil, fmt.Errorf("PZ8_RELAY_EPG_REFRESH: %w", err)
}
cfg.EPGRefresh = d
cfg.PreferPlaylistEPG = strings.EqualFold(os.Getenv("PZ8_RELAY_PREFER_PLAYLIST_EPG"), "true")
if cfg.epgEnabled() && cfg.PublicURL == "" {
return nil, errors.New("PZ8_RELAY_PUBLIC_URL is required when EPG is enabled")
}
return cfg, nil
}
// --- HTTP handlers ---
func basicAuth(cfg *Config, next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
u, p, ok := r.BasicAuth() u, p, ok := r.BasicAuth()
if !ok || subtle.ConstantTimeCompare([]byte(u), []byte(username)) != 1 || if !ok ||
subtle.ConstantTimeCompare([]byte(p), []byte(password)) != 1 { subtle.ConstantTimeCompare([]byte(u), []byte(cfg.Username)) != 1 ||
slog.Warn("unauthorized request", "client", clientIP(r)) subtle.ConstantTimeCompare([]byte(p), []byte(cfg.Password)) != 1 {
slog.Warn("unauthorized request", "client", clientIP(r), "path", r.URL.Path)
w.Header().Set("WWW-Authenticate", `Basic realm="pz8-relay"`) w.Header().Set("WWW-Authenticate", `Basic realm="pz8-relay"`)
http.Error(w, "unauthorized", http.StatusUnauthorized) http.Error(w, "unauthorized", http.StatusUnauthorized)
return return
} }
next(w, r)
slog.Info("proxying request", "client", clientIP(r))
proxy.ServeHTTP(w, r)
})
slog.Info("pz8-relay listening", "addr", addr, "target", targetURL.Redacted())
if err := http.ListenAndServe(addr, nil); err != nil {
slog.Error("server stopped", "err", err)
os.Exit(1)
} }
} }
func servePlaylist(cfg *Config, r *Refresher) http.HandlerFunc {
path := filepath.Join(cfg.CacheDir, playlistFilename)
return func(w http.ResponseWriter, req *http.Request) {
if !r.Ready() {
retryAfter(w, http.StatusServiceUnavailable, "warming up")
return
}
f, err := os.Open(path)
if err != nil {
retryAfter(w, http.StatusServiceUnavailable, "playlist not yet available")
return
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
http.Error(w, "stat", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
http.ServeContent(w, req, playlistFilename, fi.ModTime(), f)
}
}
func serveEPG(cfg *Config, r *Refresher) http.HandlerFunc {
path := filepath.Join(cfg.CacheDir, epgFilename)
return func(w http.ResponseWriter, req *http.Request) {
if !r.Ready() {
retryAfter(w, http.StatusServiceUnavailable, "warming up")
return
}
f, err := os.Open(path)
if err != nil {
retryAfter(w, http.StatusServiceUnavailable, "epg not yet available")
return
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
http.Error(w, "stat", http.StatusInternalServerError)
return
}
if clientAcceptsGzip(req) {
w.Header().Set("Content-Encoding", "gzip")
w.Header().Set("Content-Type", cfg.EPGContentType)
http.ServeContent(w, req, epgFilename, fi.ModTime(), f)
return
}
// Decompress on the fly. ServeContent can't help here because we
// wrap the file in gzip.Reader, so handle If-Modified-Since manually.
mod := fi.ModTime()
if notModifiedSince(req, mod) {
w.WriteHeader(http.StatusNotModified)
return
}
gr, err := gzip.NewReader(f)
if err != nil {
slog.Error("epg cache corrupted", "err", err)
http.Error(w, "epg unavailable", http.StatusInternalServerError)
return
}
defer gr.Close()
w.Header().Set("Content-Type", cfg.EPGContentType)
w.Header().Set("Last-Modified", mod.UTC().Format(http.TimeFormat))
_, _ = io.Copy(w, gr)
}
}
func healthz(r *Refresher) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
if !r.Ready() {
http.Error(w, "warming up", http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, "ok\n")
}
}
// --- helpers ---
func clientAcceptsGzip(r *http.Request) bool {
for _, v := range strings.Split(r.Header.Get("Accept-Encoding"), ",") {
// crude but sufficient — no q-value parsing needed for our use
if strings.EqualFold(strings.TrimSpace(strings.SplitN(v, ";", 2)[0]), "gzip") {
return true
}
}
return false
}
func notModifiedSince(r *http.Request, mod time.Time) bool {
h := r.Header.Get("If-Modified-Since")
if h == "" {
return false
}
t, err := http.ParseTime(h)
if err != nil {
return false
}
return !mod.Truncate(time.Second).After(t)
}
func retryAfter(w http.ResponseWriter, code int, msg string) {
w.Header().Set("Retry-After", "30")
http.Error(w, msg, code)
}
// clientIP returns the original client address, trusting X-Forwarded-For // clientIP returns the original client address, trusting X-Forwarded-For
// because this service runs behind Traefik. Do not expose directly. // because this service runs behind Traefik. Do not expose directly.
func clientIP(r *http.Request) string { func clientIP(r *http.Request) string {
@@ -68,6 +268,19 @@ func clientIP(r *http.Request) string {
return r.RemoteAddr return r.RemoteAddr
} }
func redactURL(u string) string {
// Cheap redaction: drop anything between "//" and "@" (userinfo).
i := strings.Index(u, "://")
if i < 0 {
return u
}
rest := u[i+3:]
if at := strings.Index(rest, "@"); at >= 0 {
return u[:i+3] + "***@" + rest[at+1:]
}
return u
}
func mustEnv(key string) string { func mustEnv(key string) string {
v := os.Getenv(key) v := os.Getenv(key)
if v == "" { if v == "" {
@@ -76,3 +289,10 @@ func mustEnv(key string) string {
} }
return v return v
} }
func envOr(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
+250
View File
@@ -0,0 +1,250 @@
package main
import (
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
const (
playlistFilename = "playlist.m3u"
epgFilename = "epg.xml.gz"
)
type RefreshConfig struct {
TargetURL string
PublicURL string // base URL of this relay; "/epg" appended for url-tvg rewrite
EPGURLs []string
PreferPlaylistEPG bool
CacheDir string
Interval time.Duration
}
func (c RefreshConfig) epgEnabled() bool {
return len(c.EPGURLs) > 0 || c.PreferPlaylistEPG
}
// Refresher periodically refreshes the cached playlist and EPG. HTTP handlers
// read the cached files; the worker runs single-threaded and writes them
// atomically via os.Rename, so no synchronization is needed between the two.
type Refresher struct {
cfg RefreshConfig
client *http.Client
readyCh chan struct{}
once sync.Once
}
func NewRefresher(cfg RefreshConfig, client *http.Client) *Refresher {
return &Refresher{
cfg: cfg,
client: client,
readyCh: make(chan struct{}),
}
}
// Run blocks until ctx is cancelled. It performs an immediate refresh on entry
// (so the cache is warm as quickly as possible), then ticks at cfg.Interval.
func (r *Refresher) Run(ctx context.Context) {
if err := r.refresh(ctx); err != nil {
slog.Warn("initial refresh failed", "err", err)
} else {
r.markReady()
}
t := time.NewTicker(r.cfg.Interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := r.refresh(ctx); err != nil {
slog.Warn("refresh failed", "err", err)
continue
}
r.markReady()
}
}
}
// Ready reports whether at least one refresh has succeeded. Handlers use this
// to return 503 cleanly during the warm-up window.
func (r *Refresher) Ready() bool {
select {
case <-r.readyCh:
return true
default:
return false
}
}
func (r *Refresher) markReady() {
r.once.Do(func() { close(r.readyCh) })
}
func (r *Refresher) refresh(ctx context.Context) error {
if err := os.MkdirAll(r.cfg.CacheDir, 0o755); err != nil {
return fmt.Errorf("cache dir: %w", err)
}
var epgPublicURL string
if r.cfg.epgEnabled() {
epgPublicURL = strings.TrimRight(r.cfg.PublicURL, "/") + "/epg"
}
tvgIDs, playlistEPG, err := r.refreshPlaylist(ctx, epgPublicURL)
if err != nil {
return fmt.Errorf("playlist: %w", err)
}
slog.Info("playlist refreshed",
"tvg_ids", len(tvgIDs),
"playlist_has_epg", playlistEPG != "",
)
if !r.cfg.epgEnabled() {
return nil
}
epgURLs := r.cfg.EPGURLs
if r.cfg.PreferPlaylistEPG && playlistEPG != "" {
epgURLs = []string{playlistEPG}
}
if len(epgURLs) == 0 {
slog.Warn("EPG enabled but no usable source URLs",
"prefer_playlist_epg", r.cfg.PreferPlaylistEPG,
"configured_urls", len(r.cfg.EPGURLs),
)
return nil
}
sourcePaths, err := r.fetchEPGSources(ctx, epgURLs)
defer func() {
for _, p := range sourcePaths {
_ = os.Remove(p)
}
}()
if err != nil {
return err
}
return r.persistMergedEPG(sourcePaths, buildIDMap(tvgIDs))
}
func (r *Refresher) refreshPlaylist(ctx context.Context, epgURL string) (tvgIDs []string, playlistEPG string, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.cfg.TargetURL, nil)
if err != nil {
return nil, "", err
}
resp, err := r.client.Do(req)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("upstream status %s", resp.Status)
}
tmp := filepath.Join(r.cfg.CacheDir, playlistFilename+".tmp")
f, err := os.Create(tmp)
if err != nil {
return nil, "", err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(tmp)
}
tvgIDs, playlistEPG, err = processPlaylist(resp.Body, f, epgURL)
if err != nil {
cleanup()
return nil, "", err
}
if err := f.Sync(); err != nil {
cleanup()
return nil, "", err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return nil, "", err
}
if err := os.Rename(tmp, filepath.Join(r.cfg.CacheDir, playlistFilename)); err != nil {
_ = os.Remove(tmp)
return nil, "", err
}
return tvgIDs, playlistEPG, nil
}
func (r *Refresher) fetchEPGSources(ctx context.Context, urls []string) (paths []string, err error) {
for i, url := range urls {
path := filepath.Join(r.cfg.CacheDir, fmt.Sprintf("epg-src-%d.xml.tmp", i))
if ferr := fetchEPGSource(ctx, r.client, url, path); ferr != nil {
slog.Warn("epg source fetch failed", "url", url, "err", ferr)
_ = os.Remove(path)
continue
}
paths = append(paths, path)
}
if len(paths) == 0 {
return nil, errors.New("no EPG sources fetched successfully")
}
return paths, nil
}
func (r *Refresher) persistMergedEPG(sourcePaths []string, idMap map[string]string) error {
sources := make([]io.ReadSeeker, 0, len(sourcePaths))
closers := make([]io.Closer, 0, len(sourcePaths))
defer func() {
for _, c := range closers {
_ = c.Close()
}
}()
for _, p := range sourcePaths {
f, err := os.Open(p)
if err != nil {
return err
}
sources = append(sources, f)
closers = append(closers, f)
}
tmp := filepath.Join(r.cfg.CacheDir, epgFilename+".tmp")
f, err := os.Create(tmp)
if err != nil {
return err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(tmp)
}
gw := gzip.NewWriter(f)
if mergeErr := mergeEPG(sources, idMap, gw); mergeErr != nil {
_ = gw.Close()
cleanup()
return mergeErr
}
if err := gw.Close(); err != nil {
cleanup()
return err
}
if err := f.Sync(); err != nil {
cleanup()
return err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
return os.Rename(tmp, filepath.Join(r.cfg.CacheDir, epgFilename))
}