fetch-text-by-line.ts 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import fs from 'fs';
  2. import { Readable } from 'stream';
  3. import { fetchWithRetry, defaultRequestInit } from './fetch-retry';
  4. import type { FileHandle } from 'fs/promises';
  5. import { TextLineStream } from './text-line-transform-stream';
  6. import type { ReadableStream } from 'stream/web';
  7. import { TextDecoderStream } from 'stream/web';
  8. import { processLine } from './process-line';
  9. const getReadableStream = (file: string | FileHandle): ReadableStream => {
  10. if (typeof file === 'string') {
  11. return Readable.toWeb(fs.createReadStream(file/* , { encoding: 'utf-8' } */));
  12. }
  13. return file.readableWebStream();
  14. };
  15. // TODO: use FileHandle.readLine()
  16. export const readFileByLine: ((file: string | FileHandle) => AsyncIterable<string>) = (file: string | FileHandle) => getReadableStream(file)
  17. .pipeThrough(new TextDecoderStream())
  18. .pipeThrough(new TextLineStream());
  19. const ensureResponseBody = (resp: Response) => {
  20. if (!resp.body) {
  21. throw new Error('Failed to fetch remote text');
  22. }
  23. if (resp.bodyUsed) {
  24. throw new Error('Body has already been consumed.');
  25. }
  26. return resp.body;
  27. };
  28. export const createReadlineInterfaceFromResponse: ((resp: Response) => AsyncIterable<string>) = (resp) => ensureResponseBody(resp)
  29. .pipeThrough(new TextDecoderStream())
  30. .pipeThrough(new TextLineStream());
  31. export function fetchRemoteTextByLine(url: string | URL) {
  32. return fetchWithRetry(url, defaultRequestInit).then(createReadlineInterfaceFromResponse);
  33. }
  34. export async function readFileIntoProcessedArray(file: string | FileHandle) {
  35. const results = [];
  36. for await (const line of readFileByLine(file)) {
  37. if (processLine(line)) {
  38. results.push(line);
  39. }
  40. }
  41. return results;
  42. }