package main import ( "compress/gzip" "context" "errors" "fmt" "io" "log/slog" "net/http" "os" "path/filepath" "strings" "sync" "time" ) const ( playlistFilename = "playlist.m3u" epgFilename = "epg.xml.gz" ) type RefreshConfig struct { TargetURL string PublicURL string // base URL of this relay; "/epg" appended for url-tvg rewrite EPGURLs []string PreferPlaylistEPG bool CacheDir string Interval time.Duration } func (c RefreshConfig) epgEnabled() bool { return len(c.EPGURLs) > 0 || c.PreferPlaylistEPG } // Refresher periodically refreshes the cached playlist and EPG. HTTP handlers // read the cached files; the worker runs single-threaded and writes them // atomically via os.Rename, so no synchronization is needed between the two. type Refresher struct { cfg RefreshConfig client *http.Client readyCh chan struct{} once sync.Once } func NewRefresher(cfg RefreshConfig, client *http.Client) *Refresher { return &Refresher{ cfg: cfg, client: client, readyCh: make(chan struct{}), } } // Run blocks until ctx is cancelled. It performs an immediate refresh on entry // (so the cache is warm as quickly as possible), then ticks at cfg.Interval. func (r *Refresher) Run(ctx context.Context) { if err := r.refresh(ctx); err != nil { slog.Warn("initial refresh failed", "err", err) } else { r.markReady() } t := time.NewTicker(r.cfg.Interval) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: if err := r.refresh(ctx); err != nil { slog.Warn("refresh failed", "err", err) continue } r.markReady() } } } // Ready reports whether at least one refresh has succeeded. Handlers use this // to return 503 cleanly during the warm-up window. func (r *Refresher) Ready() bool { select { case <-r.readyCh: return true default: return false } } func (r *Refresher) markReady() { r.once.Do(func() { close(r.readyCh) }) } func (r *Refresher) refresh(ctx context.Context) error { if err := os.MkdirAll(r.cfg.CacheDir, 0o755); err != nil { return fmt.Errorf("cache dir: %w", err) } var epgPublicURL string if r.cfg.epgEnabled() { epgPublicURL = strings.TrimRight(r.cfg.PublicURL, "/") + "/epg" } tvgIDs, playlistEPG, err := r.refreshPlaylist(ctx, epgPublicURL) if err != nil { return fmt.Errorf("playlist: %w", err) } slog.Info("playlist refreshed", "tvg_ids", len(tvgIDs), "playlist_has_epg", playlistEPG != "", ) if !r.cfg.epgEnabled() { return nil } epgURLs := r.cfg.EPGURLs if r.cfg.PreferPlaylistEPG && playlistEPG != "" { epgURLs = []string{playlistEPG} } if len(epgURLs) == 0 { slog.Warn("EPG enabled but no usable source URLs", "prefer_playlist_epg", r.cfg.PreferPlaylistEPG, "configured_urls", len(r.cfg.EPGURLs), ) return nil } sourcePaths, err := r.fetchEPGSources(ctx, epgURLs) defer func() { for _, p := range sourcePaths { _ = os.Remove(p) } }() if err != nil { return err } return r.persistMergedEPG(sourcePaths, buildIDMap(tvgIDs)) } func (r *Refresher) refreshPlaylist(ctx context.Context, epgURL string) (tvgIDs []string, playlistEPG string, err error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.cfg.TargetURL, nil) if err != nil { return nil, "", err } resp, err := r.client.Do(req) if err != nil { return nil, "", err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, "", fmt.Errorf("upstream status %s", resp.Status) } tmp := filepath.Join(r.cfg.CacheDir, playlistFilename+".tmp") f, err := os.Create(tmp) if err != nil { return nil, "", err } cleanup := func() { _ = f.Close() _ = os.Remove(tmp) } tvgIDs, playlistEPG, err = processPlaylist(resp.Body, f, epgURL) if err != nil { cleanup() return nil, "", err } if err := f.Sync(); err != nil { cleanup() return nil, "", err } if err := f.Close(); err != nil { _ = os.Remove(tmp) return nil, "", err } if err := os.Rename(tmp, filepath.Join(r.cfg.CacheDir, playlistFilename)); err != nil { _ = os.Remove(tmp) return nil, "", err } return tvgIDs, playlistEPG, nil } func (r *Refresher) fetchEPGSources(ctx context.Context, urls []string) (paths []string, err error) { for i, url := range urls { path := filepath.Join(r.cfg.CacheDir, fmt.Sprintf("epg-src-%d.xml.tmp", i)) if ferr := fetchEPGSource(ctx, r.client, url, path); ferr != nil { slog.Warn("epg source fetch failed", "url", url, "err", ferr) _ = os.Remove(path) continue } paths = append(paths, path) } if len(paths) == 0 { return nil, errors.New("no EPG sources fetched successfully") } return paths, nil } func (r *Refresher) persistMergedEPG(sourcePaths []string, idMap map[string]string) error { sources := make([]io.ReadSeeker, 0, len(sourcePaths)) closers := make([]io.Closer, 0, len(sourcePaths)) defer func() { for _, c := range closers { _ = c.Close() } }() for _, p := range sourcePaths { f, err := os.Open(p) if err != nil { return err } sources = append(sources, f) closers = append(closers, f) } tmp := filepath.Join(r.cfg.CacheDir, epgFilename+".tmp") f, err := os.Create(tmp) if err != nil { return err } cleanup := func() { _ = f.Close() _ = os.Remove(tmp) } gw := gzip.NewWriter(f) if mergeErr := mergeEPG(sources, idMap, gw); mergeErr != nil { _ = gw.Close() cleanup() return mergeErr } if err := gw.Close(); err != nil { cleanup() return err } if err := f.Sync(); err != nil { cleanup() return err } if err := f.Close(); err != nil { _ = os.Remove(tmp) return err } return os.Rename(tmp, filepath.Join(r.cfg.CacheDir, epgFilename)) }