# Headline Analysis

This notebook collects headlines from all news sources and builds summary files.

In [1]:
import json
import csv
import re
from pathlib import Path
from datetime import datetime, timedelta, timezone
from email.utils import parsedate_to_datetime
import xml.etree.ElementTree as ET

try:
    import yaml
except ModuleNotFoundError:
    yaml = None

try:
    current = Path(__file__).resolve()
except NameError:
    current = Path.cwd()
REPO_DIR = current
while not ((REPO_DIR / 'data').exists() and (REPO_DIR / 'analysis').exists()):
    if REPO_DIR.parent == REPO_DIR:
        raise FileNotFoundError('Repository root not found')
    REPO_DIR = REPO_DIR.parent

NEWS_DIR = REPO_DIR / 'data/news'
PROJECT_DIR = REPO_DIR / 'analysis/headline_analysis'
ARCHIVE_DIR = PROJECT_DIR / 'archive'
PROJECT_DIR.mkdir(parents=True, exist_ok=True)
ARCHIVE_DIR.mkdir(exist_ok=True)


## Helper Functions
These utilities handle JSON and YAML files.

In [2]:
def save_json(obj, path: Path) -> None:
    """Write an object as JSON."""
    text = json.dumps(obj, indent=2, ensure_ascii=False)
    path.write_text(text + "\n", encoding='utf-8')


def archive_json(obj, archive_dir: Path, stem: str) -> None:
    """Save an archived JSON file."""
    archive_dir.mkdir(exist_ok=True)
    tag = datetime.now(timezone.utc).strftime('%Y-%m-%d-%H')
    save_json(obj, archive_dir / f'{stem}-{tag}.json')


def load_front_matter(path: Path) -> dict:
    """Return YAML front matter for a markdown file."""
    text = path.read_text(encoding='utf-8')
    match = re.search(r'^---\n(.*?)\n---', text, re.S)
    if not match:
        return {}
    front = match.group(1)
    if yaml:
        return yaml.safe_load(front)
    meta = {}
    for line in front.splitlines():
        if ':' in line:
            key, val = line.split(':', 1)
            meta[key.strip()] = val.strip()
    return meta


## Source Collection
Find all news source directories and read their metadata.

In [3]:
def collect_sources(news_dir: Path, out_dir: Path) -> list:
    """Collect news sources."""
    sources = []
    for index_path in news_dir.rglob('index.md'):
        meta = load_front_matter(index_path)
        title = meta.get('title')
        category = meta.get('category')
        src = meta.get('source')
        if title and category and src:
            sources.append({
                'title': str(title),
                'category': str(category),
                'source': str(src),
                'path': str(index_path.parent)
            })
    save_json(sources, out_dir / 'sources.json')
    archive_json(sources, out_dir / 'archive', 'sources')
    return sources

## Feed Parsers
Functions that convert feed files into headline records.

In [4]:
def parse_pubdate(date_str: str) -> datetime | None:
    """Parse a date string into a UTC datetime."""
    if not date_str:
        return None
    try:
        dt = parsedate_to_datetime(date_str)
    except Exception:
        dt = None
    if not dt:
        try:
            dt = datetime.strptime(date_str, '%Y-%m-%d-%H-%M-%S %z')
        except Exception:
            return None
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return dt.astimezone(timezone.utc)

def format_pubdate(dt: datetime | None) -> str:
    """Return a timestamp string for a publication date."""
    return dt.strftime('%Y-%m-%d-%H-%M-%S +0000') if dt else ''

def parse_feed(path: Path) -> list[tuple[datetime | None, str, str]]:
    """Read an RSS or JSON feed file."""
    entries = []
    if path.suffix == '.json':
        with open(path, 'r', encoding='utf-8') as f:
            try:
                data = json.load(f)
            except Exception:
                return entries
        for item in data.get('entries', []):
            title = item.get('title')
            link = item.get('link')
            pub = parse_pubdate(item.get('published'))
            if title and link:
                entries.append((pub, title.strip(), link.strip()))
    else:
        try:
            tree = ET.parse(path)
            root = tree.getroot()
        except ET.ParseError:
            return entries
        for item in root.iter():
            if item.tag.lower().endswith(('item', 'entry')):
                title = None
                link = None
                pub = None
                for child in item:
                    tag = child.tag.lower()
                    if tag.endswith('title'):
                        title = (child.text or '').strip()
                    if tag.endswith('link'):
                        link = (child.text or '').strip() or child.attrib.get('href')
                    if tag.endswith(('pubdate', 'published', 'updated')):
                        pub = parse_pubdate((child.text or '').strip())
                if title and link:
                    entries.append((pub, title, link))
    return entries


## Headline Collection
Gather headlines from each source and filter by time.

In [5]:
def collect_headlines(sources: list, out_dir: Path) -> list:
    """Collect headlines from all sources."""
    headlines = []
    for src in sources:
        dir_path = Path(src['path'])
        latest = None
        for ext in ('.json', '.rss', '.xml'):
            fp = dir_path / f'latest{ext}'
            if fp.exists():
                latest = fp
                break
        if not latest:
            continue
        for pub, title, link in parse_feed(latest):
            headlines.append({
                'headline': title,
                'link': link,
                'source': src['source'],
                'pubdate': pub
            })
    min_time = datetime.min.replace(tzinfo=timezone.utc)
    headlines.sort(key=lambda r: r['pubdate'] or min_time, reverse=True)
    serial = [{**h, 'pubdate': format_pubdate(h['pubdate'])} for h in headlines]
    save_json(serial, out_dir / 'headlines.json')
    archive_json(serial, out_dir / 'archive', 'headlines')
    return headlines


def filter_headlines(headlines: list, delta: timedelta, name: str, out_dir: Path) -> list:
    """Filter headlines newer than cutoff."""
    cutoff = datetime.now(timezone.utc) - delta
    subset = [h for h in headlines if h['pubdate'] and h['pubdate'] >= cutoff]
    serial = [{**h, 'pubdate': format_pubdate(h['pubdate'])} for h in subset]
    save_json(serial, out_dir / f'{name}.json')
    archive_json(serial, out_dir / 'archive', name)
    return subset

## Summary Building
Create a CSV with headline counts for each source.

In [6]:
def archive_csv(rows: list, archive_dir: Path, stem: str) -> None:
    """Archive a CSV file."""
    archive_dir.mkdir(exist_ok=True)
    tag = datetime.now(timezone.utc).strftime('%Y-%m-%d')
    path = archive_dir / f'{stem}-{tag}.csv'
    with open(path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['source', '1h', '24h', '7d'])
        writer.writeheader()
        writer.writerows(rows)


def build_summary(headlines: list, out_dir: Path) -> None:
    """Create summary counts for each source."""
    counts = {}
    now = datetime.now(timezone.utc)
    for h in headlines:
        src = h['source']
        dt = h['pubdate']
        if not dt:
            continue
        counts.setdefault(src, [0, 0, 0])
        if dt >= now - timedelta(hours=1):
            counts[src][0] += 1
        if dt >= now - timedelta(hours=24):
            counts[src][1] += 1
        if dt >= now - timedelta(days=7):
            counts[src][2] += 1
    rows = [{'source': s, '1h': c[0], '24h': c[1], '7d': c[2]} for s, c in counts.items()]
    rows.sort(key=lambda r: (-r['1h'], -r['24h'], -r['7d'], r['source']))
    out_path = out_dir / 'summary.csv'
    with open(out_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['source', '1h', '24h', '7d'])
        writer.writeheader()
        writer.writerows(rows)
    archive_csv(rows, out_dir / 'archive', 'summary')

## Run Everything

In [7]:
def main() -> None:
    sources = collect_sources(NEWS_DIR, PROJECT_DIR)
    headlines = collect_headlines(sources, PROJECT_DIR)
    filter_headlines(headlines, timedelta(hours=1), 'all1h', PROJECT_DIR)
    filter_headlines(headlines, timedelta(hours=24), 'all24h', PROJECT_DIR)
    filter_headlines(headlines, timedelta(days=7), 'all7d', PROJECT_DIR)
    build_summary(headlines, PROJECT_DIR)


if __name__ == '__main__':
    main()

In [8]:
from collections import Counter
import pandas as pd
import re

def load_stop_words() -> set[str]:
    """Return words to ignore."""
    path = PROJECT_DIR / '../news-topics/exclude.txt'
    with open(path) as f:
        return {w.strip() for w in f if w.strip()}


def rank_headlines(df: pd.DataFrame, stop_words: set[str]) -> list[dict]:
    """Return highest scoring headlines."""
    if 'headline' not in df.columns:
        return []
    words = re.findall(r'[A-Za-z]+', ' '.join(df['headline']).lower())
    filtered = [w for w in words if w not in stop_words and len(w) > 1]
    counts = Counter(filtered)
    working = dict(counts)
    remaining = df.copy()
    top_rows = []
    for _ in range(10):
        scored = remaining.assign(
            score=remaining['headline'].apply(
                lambda t: sum(
                    working.get(w.lower(), 0)
                    for w in re.findall(r'[A-Za-z]+', t)
                    if len(w) > 1
                )
            )
        ).sort_values('score', ascending=False)
        if scored.empty:
            break
        top_story = scored.iloc[0]
        top_rows.append({
            'score': int(top_story['score']),
            'pubdate': format_pubdate(top_story['pubdate']),
            'source': top_story['source'],
            'headline': top_story['headline'],
            'link': top_story['link'],
        })
        for w in re.findall(r'[A-Za-z]+', top_story['headline'].lower()):
            working.pop(w, None)
        remaining = remaining.drop(top_story.name)
    return top_rows


def build_top_list(data_path: Path, out_path: Path) -> None:
    """Create a JSON of top-scoring headlines."""
    records = json.loads(data_path.read_text(encoding='utf-8'))
    if not records:
        save_json([], out_path)
        return
    rows = [{**r, 'pubdate': parse_pubdate(r['pubdate'])} for r in records]
    df = pd.DataFrame(rows)
    stop_words = load_stop_words()
    top_rows = rank_headlines(df, stop_words)
    save_json(top_rows, out_path)
    archive_json(top_rows, ARCHIVE_DIR, out_path.stem)


build_top_list(PROJECT_DIR / 'all1h.json', PROJECT_DIR / 'top1h.json')
build_top_list(PROJECT_DIR / 'all24h.json', PROJECT_DIR / 'top24h.json')
build_top_list(PROJECT_DIR / 'all7d.json', PROJECT_DIR / 'top7d.json')
