feat: two-pass merge with first-source-wins
This commit is contained in:
@@ -0,0 +1,226 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"compress/gzip"
|
||||||
|
"context"
|
||||||
|
"encoding/xml"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fetchEPGSource downloads url into dest as plain XML, transparently
|
||||||
|
// gunzipping if the URL ends in .gz or the response Content-Type indicates
|
||||||
|
// gzip. Streaming — the body never lives fully in memory.
|
||||||
|
func fetchEPGSource(ctx context.Context, client *http.Client, url, dest string) error {
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("epg fetch %s: %s", url, resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
src := io.Reader(resp.Body)
|
||||||
|
if responseLooksGzipped(url, resp) {
|
||||||
|
gr, err := gzip.NewReader(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("epg fetch %s: gzip: %w", url, err)
|
||||||
|
}
|
||||||
|
defer gr.Close()
|
||||||
|
src = gr
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Create(dest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
if _, err := io.Copy(f, src); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return f.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
|
func responseLooksGzipped(url string, resp *http.Response) bool {
|
||||||
|
base := strings.ToLower(strings.SplitN(url, "?", 2)[0])
|
||||||
|
if strings.HasSuffix(base, ".gz") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if strings.Contains(strings.ToLower(resp.Header.Get("Content-Type")), "gzip") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeEPG writes a merged XMLTV stream to dst, drawing from the given
|
||||||
|
// sources. Channels not covered by tvgIDMap are dropped; matched channels
|
||||||
|
// have their ids rewritten to the canonical (playlist) form. When multiple
|
||||||
|
// sources cover the same channel, the first one wins and programmes from
|
||||||
|
// other sources for that channel are dropped.
|
||||||
|
//
|
||||||
|
// Each source is read twice (channels, then programmes), which is why the
|
||||||
|
// argument is io.ReadSeeker rather than io.Reader.
|
||||||
|
func mergeEPG(sources []io.ReadSeeker, tvgIDMap map[string]string, dst io.Writer) error {
|
||||||
|
if _, err := io.WriteString(dst, xml.Header); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
enc := xml.NewEncoder(dst)
|
||||||
|
tv := xml.StartElement{
|
||||||
|
Name: xml.Name{Local: "tv"},
|
||||||
|
Attr: []xml.Attr{{Name: xml.Name{Local: "generator-info-name"}, Value: "pz8-relay"}},
|
||||||
|
}
|
||||||
|
if err := enc.EncodeToken(tv); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ownership := make(map[string]int)
|
||||||
|
|
||||||
|
for i, src := range sources {
|
||||||
|
if _, err := src.Seek(0, io.SeekStart); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := emitChannels(src, enc, tvgIDMap, ownership, i); err != nil {
|
||||||
|
return fmt.Errorf("source %d channels: %w", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, src := range sources {
|
||||||
|
if _, err := src.Seek(0, io.SeekStart); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := emitProgrammes(src, enc, tvgIDMap, ownership, i); err != nil {
|
||||||
|
return fmt.Errorf("source %d programmes: %w", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := enc.EncodeToken(tv.End()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return enc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func emitChannels(r io.Reader, enc *xml.Encoder, tvgIDMap map[string]string, ownership map[string]int, sourceIdx int) error {
|
||||||
|
dec := xml.NewDecoder(r)
|
||||||
|
for {
|
||||||
|
tok, err := dec.Token()
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
start, ok := tok.(xml.StartElement)
|
||||||
|
if !ok || start.Name.Local != "channel" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idIdx := findAttr(start.Attr, "id")
|
||||||
|
if idIdx < 0 {
|
||||||
|
if err := dec.Skip(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
canonical, matched := tvgIDMap[normalizeChannelID(start.Attr[idIdx].Value)]
|
||||||
|
if !matched {
|
||||||
|
if err := dec.Skip(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, taken := ownership[canonical]; taken {
|
||||||
|
if err := dec.Skip(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ownership[canonical] = sourceIdx
|
||||||
|
start.Attr[idIdx].Value = canonical
|
||||||
|
if err := copyElement(dec, enc, start); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func emitProgrammes(r io.Reader, enc *xml.Encoder, tvgIDMap map[string]string, ownership map[string]int, sourceIdx int) error {
|
||||||
|
dec := xml.NewDecoder(r)
|
||||||
|
for {
|
||||||
|
tok, err := dec.Token()
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
start, ok := tok.(xml.StartElement)
|
||||||
|
if !ok || start.Name.Local != "programme" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
chIdx := findAttr(start.Attr, "channel")
|
||||||
|
if chIdx < 0 {
|
||||||
|
if err := dec.Skip(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
canonical, matched := tvgIDMap[normalizeChannelID(start.Attr[chIdx].Value)]
|
||||||
|
if !matched {
|
||||||
|
if err := dec.Skip(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if owner, ok := ownership[canonical]; !ok || owner != sourceIdx {
|
||||||
|
if err := dec.Skip(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
start.Attr[chIdx].Value = canonical
|
||||||
|
if err := copyElement(dec, enc, start); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func findAttr(attrs []xml.Attr, name string) int {
|
||||||
|
for i, a := range attrs {
|
||||||
|
if a.Name.Local == name {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
// copyElement emits start and every token up to its matching end element,
|
||||||
|
// using depth tracking to handle nested elements.
|
||||||
|
func copyElement(dec *xml.Decoder, enc *xml.Encoder, start xml.StartElement) error {
|
||||||
|
if err := enc.EncodeToken(start); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
depth := 1
|
||||||
|
for depth > 0 {
|
||||||
|
tok, err := dec.Token()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := enc.EncodeToken(tok); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch tok.(type) {
|
||||||
|
case xml.StartElement:
|
||||||
|
depth++
|
||||||
|
case xml.EndElement:
|
||||||
|
depth--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
+119
@@ -0,0 +1,119 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Fixtures modelled on real open-epg output: ids of the form "Italia 1 HD.it",
|
||||||
|
// single-line channel/programme elements, "YYYYMMDDHHMMSS +0000" times.
|
||||||
|
|
||||||
|
const epgSource1 = `<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<tv generator-info-name="open-epg">
|
||||||
|
<channel id="Italia 1 HD.it"><display-name>Italia 1 HD.it</display-name></channel>
|
||||||
|
<channel id="Rai 1 HD.it"><display-name>Rai 1 HD.it</display-name></channel>
|
||||||
|
<channel id="Unmatched HD.it"><display-name>Unmatched</display-name></channel>
|
||||||
|
<programme start="20260101000000 +0000" stop="20260101010000 +0000" channel="Italia 1 HD.it"><title>Show A</title></programme>
|
||||||
|
<programme start="20260101010000 +0000" stop="20260101020000 +0000" channel="Rai 1 HD.it"><title>Show B</title></programme>
|
||||||
|
<programme start="20260101020000 +0000" stop="20260101030000 +0000" channel="Unmatched HD.it"><title>Dropped programme</title></programme>
|
||||||
|
</tv>`
|
||||||
|
|
||||||
|
const epgSource2 = `<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<tv generator-info-name="open-epg-2">
|
||||||
|
<channel id="Italia 1 HD.it"><display-name>From src2 - duplicate</display-name></channel>
|
||||||
|
<channel id="TV8 HD.it"><display-name>TV8 HD</display-name></channel>
|
||||||
|
<programme start="20260102000000 +0000" stop="20260102010000 +0000" channel="Italia 1 HD.it"><title>Src2 dup show</title></programme>
|
||||||
|
<programme start="20260102010000 +0000" stop="20260102020000 +0000" channel="TV8 HD.it"><title>TV8 show</title></programme>
|
||||||
|
</tv>`
|
||||||
|
|
||||||
|
func runMerge(t *testing.T, tvgIDs []string, sources ...string) string {
|
||||||
|
t.Helper()
|
||||||
|
rs := make([]io.ReadSeeker, len(sources))
|
||||||
|
for i, s := range sources {
|
||||||
|
rs[i] = bytes.NewReader([]byte(s))
|
||||||
|
}
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if err := mergeEPG(rs, buildIDMap(tvgIDs), &buf); err != nil {
|
||||||
|
t.Fatalf("mergeEPG: %v", err)
|
||||||
|
}
|
||||||
|
return buf.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMergeEPG_SingleSourceFiltersAndRewrites(t *testing.T) {
|
||||||
|
out := runMerge(t, []string{"Italia 1", "Rai 1"}, epgSource1)
|
||||||
|
|
||||||
|
mustContain(t, out, `<channel id="Italia 1">`)
|
||||||
|
mustContain(t, out, `<channel id="Rai 1">`)
|
||||||
|
mustNotContain(t, out, `Unmatched HD.it`)
|
||||||
|
mustNotContain(t, out, `Dropped programme`)
|
||||||
|
|
||||||
|
mustContain(t, out, `channel="Italia 1"`)
|
||||||
|
mustContain(t, out, `channel="Rai 1"`)
|
||||||
|
mustContain(t, out, `<title>Show A</title>`)
|
||||||
|
mustContain(t, out, `<title>Show B</title>`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMergeEPG_TwoSourcesFirstWinsAndProgrammesScoped(t *testing.T) {
|
||||||
|
out := runMerge(t,
|
||||||
|
[]string{"Italia 1", "Rai 1", "TV8"},
|
||||||
|
epgSource1, epgSource2,
|
||||||
|
)
|
||||||
|
|
||||||
|
// All three matched playlist channels should be present, exactly once.
|
||||||
|
for _, want := range []string{
|
||||||
|
`<channel id="Italia 1">`,
|
||||||
|
`<channel id="Rai 1">`,
|
||||||
|
`<channel id="TV8">`,
|
||||||
|
} {
|
||||||
|
if c := strings.Count(out, want); c != 1 {
|
||||||
|
t.Errorf("%q appears %d times in output, want 1", want, c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Source 1 owns "Italia 1" — its show appears, src2's must NOT.
|
||||||
|
mustContain(t, out, `<title>Show A</title>`)
|
||||||
|
mustNotContain(t, out, `Src2 dup show`)
|
||||||
|
mustNotContain(t, out, `From src2 - duplicate`) // dedup at channel level too
|
||||||
|
|
||||||
|
// Source 2 owns "TV8" — its programme should appear.
|
||||||
|
mustContain(t, out, `<title>TV8 show</title>`)
|
||||||
|
|
||||||
|
// "Unmatched" is not in the playlist tvg-id set.
|
||||||
|
mustNotContain(t, out, `Unmatched`)
|
||||||
|
mustNotContain(t, out, `Dropped programme`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMergeEPG_EmptyOutputForUnmatchedPlaylist(t *testing.T) {
|
||||||
|
out := runMerge(t, []string{"Channel That Does Not Exist"}, epgSource1)
|
||||||
|
|
||||||
|
mustNotContain(t, out, "<channel ")
|
||||||
|
mustNotContain(t, out, "<programme ")
|
||||||
|
mustContain(t, out, "<tv ") // wrapper still emitted
|
||||||
|
mustContain(t, out, "</tv>")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMergeEPG_NoSourcesProducesValidEmptyTV(t *testing.T) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if err := mergeEPG(nil, buildIDMap([]string{"Italia 1"}), &buf); err != nil {
|
||||||
|
t.Fatalf("mergeEPG: %v", err)
|
||||||
|
}
|
||||||
|
out := buf.String()
|
||||||
|
mustContain(t, out, "<tv ")
|
||||||
|
mustContain(t, out, "</tv>")
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustContain(t *testing.T, hay, needle string) {
|
||||||
|
t.Helper()
|
||||||
|
if !strings.Contains(hay, needle) {
|
||||||
|
t.Errorf("expected output to contain %q\n--- output ---\n%s", needle, hay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustNotContain(t *testing.T, hay, needle string) {
|
||||||
|
t.Helper()
|
||||||
|
if strings.Contains(hay, needle) {
|
||||||
|
t.Errorf("expected output NOT to contain %q\n--- output ---\n%s", needle, hay)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user