fetch-text-by-line.ts 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import fs from 'node:fs';
  2. import fsp from 'node:fs/promises';
  3. import type { FileHandle } from 'node:fs/promises';
  4. import readline from 'node:readline';
  5. import { TextLineStream } from './text-line-transform-stream';
  6. import type { ReadableStream } from 'node:stream/web';
  7. import { TextDecoderStream } from 'node:stream/web';
  8. import { processLine, ProcessLineStream } from './process-line';
  9. import { $$fetch } from './fetch-retry';
  10. import type { UndiciResponseData } from './fetch-retry';
  11. import type { Response as UnidiciWebResponse } from 'undici';
  12. import { invariant } from 'foxts/guard';
  13. export function readFileByLine(file: string): AsyncIterable<string> {
  14. return readline.createInterface({
  15. input: fs.createReadStream(file/* , { encoding: 'utf-8' } */),
  16. crlfDelay: Infinity
  17. });
  18. }
  19. const fdReadLines = (fd: FileHandle) => fd.readLines();
  20. export async function readFileByLineNew(file: string): Promise<AsyncIterable<string>> {
  21. return fsp.open(file, 'r').then(fdReadLines);
  22. }
  23. export const createReadlineInterfaceFromResponse: ((resp: UndiciResponseData | UnidiciWebResponse, processLine?: boolean) => ReadableStream<string>) = (resp, processLine = false) => {
  24. invariant(resp.body, 'Failed to fetch remote text');
  25. if ('bodyUsed' in resp && resp.bodyUsed) {
  26. throw new Error('Body has already been consumed.');
  27. }
  28. let webStream: ReadableStream<Uint8Array>;
  29. if ('pipeThrough' in resp.body) {
  30. webStream = resp.body;
  31. } else {
  32. throw new TypeError('Invalid response body!');
  33. }
  34. const resultStream = webStream
  35. .pipeThrough(new TextDecoderStream())
  36. .pipeThrough(new TextLineStream());
  37. if (processLine) {
  38. return resultStream.pipeThrough(new ProcessLineStream());
  39. }
  40. return resultStream;
  41. };
  42. export function fetchRemoteTextByLine(url: string, processLine = false): Promise<AsyncIterable<string>> {
  43. return $$fetch(url).then(resp => createReadlineInterfaceFromResponse(resp, processLine));
  44. }
  45. export async function readFileIntoProcessedArray(file: string /* | FileHandle */) {
  46. const results = [];
  47. for await (const line of readFileByLine(file)) {
  48. if (processLine(line)) {
  49. results.push(line);
  50. }
  51. }
  52. return results;
  53. }