From b592a0e2297394aeb361203fe6eea5b1d4bb7d81 Mon Sep 17 00:00:00 2001 From: Domenico Testa Date: Wed, 6 May 2026 11:12:31 +0200 Subject: [PATCH] feat: two-pass merge with first-source-wins --- epg.go | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++++ epg_test.go | 119 +++++++++++++++++++++++++++ 2 files changed, 345 insertions(+) create mode 100644 epg.go create mode 100644 epg_test.go diff --git a/epg.go b/epg.go new file mode 100644 index 0000000..9243381 --- /dev/null +++ b/epg.go @@ -0,0 +1,226 @@ +package main + +import ( + "compress/gzip" + "context" + "encoding/xml" + "fmt" + "io" + "net/http" + "os" + "strings" +) + +// fetchEPGSource downloads url into dest as plain XML, transparently +// gunzipping if the URL ends in .gz or the response Content-Type indicates +// gzip. Streaming — the body never lives fully in memory. +func fetchEPGSource(ctx context.Context, client *http.Client, url, dest string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("epg fetch %s: %s", url, resp.Status) + } + + src := io.Reader(resp.Body) + if responseLooksGzipped(url, resp) { + gr, err := gzip.NewReader(resp.Body) + if err != nil { + return fmt.Errorf("epg fetch %s: gzip: %w", url, err) + } + defer gr.Close() + src = gr + } + + f, err := os.Create(dest) + if err != nil { + return err + } + defer f.Close() + if _, err := io.Copy(f, src); err != nil { + return err + } + return f.Sync() +} + +func responseLooksGzipped(url string, resp *http.Response) bool { + base := strings.ToLower(strings.SplitN(url, "?", 2)[0]) + if strings.HasSuffix(base, ".gz") { + return true + } + if strings.Contains(strings.ToLower(resp.Header.Get("Content-Type")), "gzip") { + return true + } + return false +} + +// mergeEPG writes a merged XMLTV stream to dst, drawing from the given +// sources. Channels not covered by tvgIDMap are dropped; matched channels +// have their ids rewritten to the canonical (playlist) form. When multiple +// sources cover the same channel, the first one wins and programmes from +// other sources for that channel are dropped. +// +// Each source is read twice (channels, then programmes), which is why the +// argument is io.ReadSeeker rather than io.Reader. +func mergeEPG(sources []io.ReadSeeker, tvgIDMap map[string]string, dst io.Writer) error { + if _, err := io.WriteString(dst, xml.Header); err != nil { + return err + } + + enc := xml.NewEncoder(dst) + tv := xml.StartElement{ + Name: xml.Name{Local: "tv"}, + Attr: []xml.Attr{{Name: xml.Name{Local: "generator-info-name"}, Value: "pz8-relay"}}, + } + if err := enc.EncodeToken(tv); err != nil { + return err + } + + ownership := make(map[string]int) + + for i, src := range sources { + if _, err := src.Seek(0, io.SeekStart); err != nil { + return err + } + if err := emitChannels(src, enc, tvgIDMap, ownership, i); err != nil { + return fmt.Errorf("source %d channels: %w", i, err) + } + } + + for i, src := range sources { + if _, err := src.Seek(0, io.SeekStart); err != nil { + return err + } + if err := emitProgrammes(src, enc, tvgIDMap, ownership, i); err != nil { + return fmt.Errorf("source %d programmes: %w", i, err) + } + } + + if err := enc.EncodeToken(tv.End()); err != nil { + return err + } + return enc.Close() +} + +func emitChannels(r io.Reader, enc *xml.Encoder, tvgIDMap map[string]string, ownership map[string]int, sourceIdx int) error { + dec := xml.NewDecoder(r) + for { + tok, err := dec.Token() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + start, ok := tok.(xml.StartElement) + if !ok || start.Name.Local != "channel" { + continue + } + idIdx := findAttr(start.Attr, "id") + if idIdx < 0 { + if err := dec.Skip(); err != nil { + return err + } + continue + } + canonical, matched := tvgIDMap[normalizeChannelID(start.Attr[idIdx].Value)] + if !matched { + if err := dec.Skip(); err != nil { + return err + } + continue + } + if _, taken := ownership[canonical]; taken { + if err := dec.Skip(); err != nil { + return err + } + continue + } + ownership[canonical] = sourceIdx + start.Attr[idIdx].Value = canonical + if err := copyElement(dec, enc, start); err != nil { + return err + } + } +} + +func emitProgrammes(r io.Reader, enc *xml.Encoder, tvgIDMap map[string]string, ownership map[string]int, sourceIdx int) error { + dec := xml.NewDecoder(r) + for { + tok, err := dec.Token() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + start, ok := tok.(xml.StartElement) + if !ok || start.Name.Local != "programme" { + continue + } + chIdx := findAttr(start.Attr, "channel") + if chIdx < 0 { + if err := dec.Skip(); err != nil { + return err + } + continue + } + canonical, matched := tvgIDMap[normalizeChannelID(start.Attr[chIdx].Value)] + if !matched { + if err := dec.Skip(); err != nil { + return err + } + continue + } + if owner, ok := ownership[canonical]; !ok || owner != sourceIdx { + if err := dec.Skip(); err != nil { + return err + } + continue + } + start.Attr[chIdx].Value = canonical + if err := copyElement(dec, enc, start); err != nil { + return err + } + } +} + +func findAttr(attrs []xml.Attr, name string) int { + for i, a := range attrs { + if a.Name.Local == name { + return i + } + } + return -1 +} + +// copyElement emits start and every token up to its matching end element, +// using depth tracking to handle nested elements. +func copyElement(dec *xml.Decoder, enc *xml.Encoder, start xml.StartElement) error { + if err := enc.EncodeToken(start); err != nil { + return err + } + depth := 1 + for depth > 0 { + tok, err := dec.Token() + if err != nil { + return err + } + if err := enc.EncodeToken(tok); err != nil { + return err + } + switch tok.(type) { + case xml.StartElement: + depth++ + case xml.EndElement: + depth-- + } + } + return nil +} diff --git a/epg_test.go b/epg_test.go new file mode 100644 index 0000000..aeb2198 --- /dev/null +++ b/epg_test.go @@ -0,0 +1,119 @@ +package main + +import ( + "bytes" + "io" + "strings" + "testing" +) + +// Fixtures modelled on real open-epg output: ids of the form "Italia 1 HD.it", +// single-line channel/programme elements, "YYYYMMDDHHMMSS +0000" times. + +const epgSource1 = ` + +Italia 1 HD.it +Rai 1 HD.it +Unmatched +Show A +Show B +Dropped programme +` + +const epgSource2 = ` + +From src2 - duplicate +TV8 HD +Src2 dup show +TV8 show +` + +func runMerge(t *testing.T, tvgIDs []string, sources ...string) string { + t.Helper() + rs := make([]io.ReadSeeker, len(sources)) + for i, s := range sources { + rs[i] = bytes.NewReader([]byte(s)) + } + var buf bytes.Buffer + if err := mergeEPG(rs, buildIDMap(tvgIDs), &buf); err != nil { + t.Fatalf("mergeEPG: %v", err) + } + return buf.String() +} + +func TestMergeEPG_SingleSourceFiltersAndRewrites(t *testing.T) { + out := runMerge(t, []string{"Italia 1", "Rai 1"}, epgSource1) + + mustContain(t, out, ``) + mustContain(t, out, ``) + mustNotContain(t, out, `Unmatched HD.it`) + mustNotContain(t, out, `Dropped programme`) + + mustContain(t, out, `channel="Italia 1"`) + mustContain(t, out, `channel="Rai 1"`) + mustContain(t, out, `Show A`) + mustContain(t, out, `Show B`) +} + +func TestMergeEPG_TwoSourcesFirstWinsAndProgrammesScoped(t *testing.T) { + out := runMerge(t, + []string{"Italia 1", "Rai 1", "TV8"}, + epgSource1, epgSource2, + ) + + // All three matched playlist channels should be present, exactly once. + for _, want := range []string{ + ``, + ``, + ``, + } { + if c := strings.Count(out, want); c != 1 { + t.Errorf("%q appears %d times in output, want 1", want, c) + } + } + + // Source 1 owns "Italia 1" — its show appears, src2's must NOT. + mustContain(t, out, `Show A`) + mustNotContain(t, out, `Src2 dup show`) + mustNotContain(t, out, `From src2 - duplicate`) // dedup at channel level too + + // Source 2 owns "TV8" — its programme should appear. + mustContain(t, out, `TV8 show`) + + // "Unmatched" is not in the playlist tvg-id set. + mustNotContain(t, out, `Unmatched`) + mustNotContain(t, out, `Dropped programme`) +} + +func TestMergeEPG_EmptyOutputForUnmatchedPlaylist(t *testing.T) { + out := runMerge(t, []string{"Channel That Does Not Exist"}, epgSource1) + + mustNotContain(t, out, "") +} + +func TestMergeEPG_NoSourcesProducesValidEmptyTV(t *testing.T) { + var buf bytes.Buffer + if err := mergeEPG(nil, buildIDMap([]string{"Italia 1"}), &buf); err != nil { + t.Fatalf("mergeEPG: %v", err) + } + out := buf.String() + mustContain(t, out, "") +} + +func mustContain(t *testing.T, hay, needle string) { + t.Helper() + if !strings.Contains(hay, needle) { + t.Errorf("expected output to contain %q\n--- output ---\n%s", needle, hay) + } +} + +func mustNotContain(t *testing.T, hay, needle string) { + t.Helper() + if strings.Contains(hay, needle) { + t.Errorf("expected output NOT to contain %q\n--- output ---\n%s", needle, hay) + } +}