mirror of
https://github.com/MichMich/MagicMirror.git
synced 2026-04-25 23:32:10 +00:00
In PR #4072 GitHub Bot complained that `newsfeedfetcher.js` used the old `.pipe()` method to connect streams (HTTP body → iconv decoding → RSS parser). `.pipe()` has a weakness: errors in one stream are **not** automatically forwarded to downstream streams. An I/O or decoding error would silently disappear. ## Solution Replaced `.pipe()` with `await stream.promises.pipeline()`. The `pipeline()` API is designed to propagate errors correctly through the entire chain and to clean up all streams on failure. Errors now reliably land in the `catch` block and call `fetchFailedCallback` exactly once. The redundant `parser.on("error")` handler was removed, as it would have caught the same error again and called the callback a second time. ## Why not the bot's suggested fix? The GitHub Bot suggested the older callback-based `stream.pipeline(callback)` variant: ```js stream.pipeline(nodeStream, iconv.decodeStream(...), parser, (error) => { if (!error) return; // error handling... }); ``` This has two drawbacks compared to my approach: 1. It uses the older callback style — `stream.promises.pipeline()` with `async/await` is the modern, more readable API. 2. The bot's suggestion kept both the `parser.on("error")` handler **and** the `catch` block, which would not have fixed the double-callback problem. ---- Related to #4073
163 lines
5.0 KiB
JavaScript
163 lines
5.0 KiB
JavaScript
const crypto = require("node:crypto");
|
|
const stream = require("node:stream");
|
|
const FeedMe = require("feedme");
|
|
const iconv = require("iconv-lite");
|
|
const { htmlToText } = require("html-to-text");
|
|
const Log = require("logger");
|
|
const HTTPFetcher = require("#http_fetcher");
|
|
|
|
/**
|
|
* NewsfeedFetcher - Fetches and parses RSS/Atom feed data
|
|
* Uses HTTPFetcher for HTTP handling with intelligent error handling
|
|
* @class
|
|
*/
|
|
class NewsfeedFetcher {
|
|
|
|
/**
|
|
* Creates a new NewsfeedFetcher instance
|
|
* @param {string} url - The URL of the news feed to fetch
|
|
* @param {number} reloadInterval - Time in ms between fetches
|
|
* @param {string} encoding - Encoding of the feed (e.g., 'UTF-8')
|
|
* @param {boolean} logFeedWarnings - If true log warnings when there is an error parsing a news article
|
|
* @param {boolean} useCorsProxy - If true cors proxy is used for article url's
|
|
*/
|
|
constructor (url, reloadInterval, encoding, logFeedWarnings, useCorsProxy) {
|
|
this.url = url;
|
|
this.encoding = encoding;
|
|
this.logFeedWarnings = logFeedWarnings;
|
|
this.useCorsProxy = useCorsProxy;
|
|
this.items = [];
|
|
this.fetchFailedCallback = () => {};
|
|
this.itemsReceivedCallback = () => {};
|
|
|
|
// Use HTTPFetcher for HTTP handling (Composition)
|
|
this.httpFetcher = new HTTPFetcher(url, {
|
|
reloadInterval: Math.max(reloadInterval, 1000),
|
|
headers: {
|
|
"Cache-Control": "max-age=0, no-cache, no-store, must-revalidate",
|
|
Pragma: "no-cache"
|
|
}
|
|
});
|
|
|
|
// Wire up HTTPFetcher events
|
|
this.httpFetcher.on("response", (response) => void this.#handleResponse(response));
|
|
this.httpFetcher.on("error", (errorInfo) => this.fetchFailedCallback(this, errorInfo));
|
|
}
|
|
|
|
/**
|
|
* Creates a parse error info object
|
|
* @param {string} message - Error message
|
|
* @param {Error} error - Original error
|
|
* @returns {object} Error info object
|
|
*/
|
|
#createParseError (message, error) {
|
|
return {
|
|
message,
|
|
status: null,
|
|
errorType: "PARSE_ERROR",
|
|
translationKey: "MODULE_ERROR_UNSPECIFIED",
|
|
retryAfter: this.httpFetcher.reloadInterval,
|
|
retryCount: 0,
|
|
url: this.url,
|
|
originalError: error
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Handles successful HTTP response
|
|
* @param {Response} response - The fetch Response object
|
|
*/
|
|
async #handleResponse (response) {
|
|
this.items = [];
|
|
const parser = new FeedMe();
|
|
|
|
parser.on("item", (item) => {
|
|
const title = item.title;
|
|
let description = item.description || item.summary || item.content || "";
|
|
const pubdate = item.pubdate || item.published || item.updated || item["dc:date"] || item["a10:updated"];
|
|
const url = item.url || item.link || "";
|
|
|
|
if (title && pubdate) {
|
|
// Convert HTML entities, codes and tag
|
|
description = htmlToText(description, {
|
|
wordwrap: false,
|
|
selectors: [
|
|
{ selector: "a", options: { ignoreHref: true, noAnchorUrl: true } },
|
|
{ selector: "br", format: "inlineSurround", options: { prefix: " " } },
|
|
{ selector: "img", format: "skip" }
|
|
]
|
|
});
|
|
|
|
this.items.push({
|
|
title,
|
|
description,
|
|
pubdate,
|
|
url,
|
|
useCorsProxy: this.useCorsProxy,
|
|
hash: crypto.createHash("sha256").update(`${pubdate} :: ${title} :: ${url}`).digest("hex")
|
|
});
|
|
} else if (this.logFeedWarnings) {
|
|
Log.warn("Can't parse feed item:", item);
|
|
Log.warn(`Title: ${title}`);
|
|
Log.warn(`Description: ${description}`);
|
|
Log.warn(`Pubdate: ${pubdate}`);
|
|
}
|
|
});
|
|
|
|
parser.on("end", () => this.broadcastItems());
|
|
|
|
parser.on("ttl", (minutes) => {
|
|
const ttlms = Math.min(minutes * 60 * 1000, 86400000);
|
|
if (ttlms > this.httpFetcher.reloadInterval) {
|
|
this.httpFetcher.reloadInterval = ttlms;
|
|
Log.info(`reloadInterval set to ttl=${ttlms} for url ${this.url}`);
|
|
}
|
|
});
|
|
|
|
try {
|
|
const nodeStream = response.body instanceof stream.Readable
|
|
? response.body
|
|
: stream.Readable.fromWeb(response.body);
|
|
await stream.promises.pipeline(nodeStream, iconv.decodeStream(this.encoding), parser);
|
|
} catch (error) {
|
|
Log.error(`${this.url} - Stream processing failed: ${error.message}`);
|
|
this.fetchFailedCallback(this, this.#createParseError(`Stream processing failed: ${error.message}`, error));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Update the reload interval, but only if we need to increase the speed.
|
|
* @param {number} interval - Interval for the update in milliseconds.
|
|
*/
|
|
setReloadInterval (interval) {
|
|
if (interval > 1000 && interval < this.httpFetcher.reloadInterval) {
|
|
this.httpFetcher.reloadInterval = interval;
|
|
}
|
|
}
|
|
|
|
startFetch () {
|
|
this.httpFetcher.startPeriodicFetch();
|
|
}
|
|
|
|
broadcastItems () {
|
|
if (this.items.length <= 0) {
|
|
Log.info("No items to broadcast yet.");
|
|
return;
|
|
}
|
|
Log.info(`Broadcasting ${this.items.length} items.`);
|
|
this.itemsReceivedCallback(this);
|
|
}
|
|
|
|
/** @param {function(NewsfeedFetcher): void} callback - Called when items are received */
|
|
onReceive (callback) {
|
|
this.itemsReceivedCallback = callback;
|
|
}
|
|
|
|
/** @param {function(NewsfeedFetcher, object): void} callback - Called on fetch error */
|
|
onError (callback) {
|
|
this.fetchFailedCallback = callback;
|
|
}
|
|
}
|
|
|
|
module.exports = NewsfeedFetcher;
|