fetch-text-by-line.ts 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import fs from 'node:fs';
  2. import { Readable } from 'node:stream';
  3. import type { FileHandle } from 'node:fs/promises';
  4. import { TextLineStream } from './text-line-transform-stream';
  5. import type { ReadableStream } from 'node:stream/web';
  6. import { TextDecoderStream } from 'node:stream/web';
  7. import { processLine } from './process-line';
  8. import { $fetch } from './make-fetch-happen';
  9. import type { NodeFetchResponse } from './make-fetch-happen';
  10. import type { UndiciResponseData } from './fetch-retry';
  11. function getReadableStream(file: string | FileHandle): ReadableStream {
  12. if (typeof file === 'string') {
  13. // return fs.openAsBlob(file).then(blob => blob.stream())
  14. return Readable.toWeb(fs.createReadStream(file/* , { encoding: 'utf-8' } */));
  15. }
  16. return file.readableWebStream();
  17. }
  18. // TODO: use FileHandle.readLine()
  19. export const readFileByLine: ((file: string | FileHandle) => AsyncIterable<string>) = (file: string | FileHandle) => getReadableStream(file)
  20. .pipeThrough(new TextDecoderStream())
  21. .pipeThrough(new TextLineStream());
  22. function ensureResponseBody<T extends Response | NodeFetchResponse | UndiciResponseData>(resp: T): NonNullable<T['body']> {
  23. if (!resp.body) {
  24. throw new Error('Failed to fetch remote text');
  25. }
  26. if ('bodyUsed' in resp && resp.bodyUsed) {
  27. throw new Error('Body has already been consumed.');
  28. }
  29. return resp.body;
  30. }
  31. export const createReadlineInterfaceFromResponse: ((resp: Response | NodeFetchResponse | UndiciResponseData) => AsyncIterable<string>) = (resp) => {
  32. const stream = ensureResponseBody(resp);
  33. const webStream: ReadableStream<Uint8Array> = 'getReader' in stream
  34. ? stream
  35. : (
  36. 'body' in stream
  37. ? stream.body
  38. : Readable.toWeb(new Readable().wrap(stream)) as any
  39. );
  40. return webStream
  41. .pipeThrough(new TextDecoderStream())
  42. .pipeThrough(new TextLineStream());
  43. };
  44. export function fetchRemoteTextByLine(url: string) {
  45. return $fetch(url).then(createReadlineInterfaceFromResponse);
  46. }
  47. export async function readFileIntoProcessedArray(file: string | FileHandle) {
  48. const results = [];
  49. for await (const line of readFileByLine(file)) {
  50. if (processLine(line)) {
  51. results.push(line);
  52. }
  53. }
  54. return results;
  55. }