Files
pz8-relay/worker.go
T

251 lines
5.6 KiB
Go

package main
import (
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
const (
playlistFilename = "playlist.m3u"
epgFilename = "epg.xml.gz"
)
type RefreshConfig struct {
TargetURL string
PublicURL string // base URL of this relay; "/epg" appended for url-tvg rewrite
EPGURLs []string
PreferPlaylistEPG bool
CacheDir string
Interval time.Duration
}
func (c RefreshConfig) epgEnabled() bool {
return len(c.EPGURLs) > 0 || c.PreferPlaylistEPG
}
// Refresher periodically refreshes the cached playlist and EPG. HTTP handlers
// read the cached files; the worker runs single-threaded and writes them
// atomically via os.Rename, so no synchronization is needed between the two.
type Refresher struct {
cfg RefreshConfig
client *http.Client
readyCh chan struct{}
once sync.Once
}
func NewRefresher(cfg RefreshConfig, client *http.Client) *Refresher {
return &Refresher{
cfg: cfg,
client: client,
readyCh: make(chan struct{}),
}
}
// Run blocks until ctx is cancelled. It performs an immediate refresh on entry
// (so the cache is warm as quickly as possible), then ticks at cfg.Interval.
func (r *Refresher) Run(ctx context.Context) {
if err := r.refresh(ctx); err != nil {
slog.Warn("initial refresh failed", "err", err)
} else {
r.markReady()
}
t := time.NewTicker(r.cfg.Interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := r.refresh(ctx); err != nil {
slog.Warn("refresh failed", "err", err)
continue
}
r.markReady()
}
}
}
// Ready reports whether at least one refresh has succeeded. Handlers use this
// to return 503 cleanly during the warm-up window.
func (r *Refresher) Ready() bool {
select {
case <-r.readyCh:
return true
default:
return false
}
}
func (r *Refresher) markReady() {
r.once.Do(func() { close(r.readyCh) })
}
func (r *Refresher) refresh(ctx context.Context) error {
if err := os.MkdirAll(r.cfg.CacheDir, 0o755); err != nil {
return fmt.Errorf("cache dir: %w", err)
}
var epgPublicURL string
if r.cfg.epgEnabled() {
epgPublicURL = strings.TrimRight(r.cfg.PublicURL, "/") + "/epg"
}
tvgIDs, playlistEPG, err := r.refreshPlaylist(ctx, epgPublicURL)
if err != nil {
return fmt.Errorf("playlist: %w", err)
}
slog.Info("playlist refreshed",
"tvg_ids", len(tvgIDs),
"playlist_has_epg", playlistEPG != "",
)
if !r.cfg.epgEnabled() {
return nil
}
epgURLs := r.cfg.EPGURLs
if r.cfg.PreferPlaylistEPG && playlistEPG != "" {
epgURLs = []string{playlistEPG}
}
if len(epgURLs) == 0 {
slog.Warn("EPG enabled but no usable source URLs",
"prefer_playlist_epg", r.cfg.PreferPlaylistEPG,
"configured_urls", len(r.cfg.EPGURLs),
)
return nil
}
sourcePaths, err := r.fetchEPGSources(ctx, epgURLs)
defer func() {
for _, p := range sourcePaths {
_ = os.Remove(p)
}
}()
if err != nil {
return err
}
return r.persistMergedEPG(sourcePaths, buildIDMap(tvgIDs))
}
func (r *Refresher) refreshPlaylist(ctx context.Context, epgURL string) (tvgIDs []string, playlistEPG string, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.cfg.TargetURL, nil)
if err != nil {
return nil, "", err
}
resp, err := r.client.Do(req)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("upstream status %s", resp.Status)
}
tmp := filepath.Join(r.cfg.CacheDir, playlistFilename+".tmp")
f, err := os.Create(tmp)
if err != nil {
return nil, "", err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(tmp)
}
tvgIDs, playlistEPG, err = processPlaylist(resp.Body, f, epgURL)
if err != nil {
cleanup()
return nil, "", err
}
if err := f.Sync(); err != nil {
cleanup()
return nil, "", err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return nil, "", err
}
if err := os.Rename(tmp, filepath.Join(r.cfg.CacheDir, playlistFilename)); err != nil {
_ = os.Remove(tmp)
return nil, "", err
}
return tvgIDs, playlistEPG, nil
}
func (r *Refresher) fetchEPGSources(ctx context.Context, urls []string) (paths []string, err error) {
for i, url := range urls {
path := filepath.Join(r.cfg.CacheDir, fmt.Sprintf("epg-src-%d.xml.tmp", i))
if ferr := fetchEPGSource(ctx, r.client, url, path); ferr != nil {
slog.Warn("epg source fetch failed", "url", url, "err", ferr)
_ = os.Remove(path)
continue
}
paths = append(paths, path)
}
if len(paths) == 0 {
return nil, errors.New("no EPG sources fetched successfully")
}
return paths, nil
}
func (r *Refresher) persistMergedEPG(sourcePaths []string, idMap map[string]string) error {
sources := make([]io.ReadSeeker, 0, len(sourcePaths))
closers := make([]io.Closer, 0, len(sourcePaths))
defer func() {
for _, c := range closers {
_ = c.Close()
}
}()
for _, p := range sourcePaths {
f, err := os.Open(p)
if err != nil {
return err
}
sources = append(sources, f)
closers = append(closers, f)
}
tmp := filepath.Join(r.cfg.CacheDir, epgFilename+".tmp")
f, err := os.Create(tmp)
if err != nil {
return err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(tmp)
}
gw := gzip.NewWriter(f)
if mergeErr := mergeEPG(sources, idMap, gw); mergeErr != nil {
_ = gw.Close()
cleanup()
return mergeErr
}
if err := gw.Close(); err != nil {
cleanup()
return err
}
if err := f.Sync(); err != nil {
cleanup()
return err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
return os.Rename(tmp, filepath.Join(r.cfg.CacheDir, epgFilename))
}