fetch-text-by-line.ts 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import fs from 'node:fs';
  2. import { Readable } from 'node:stream';
  3. import type { FileHandle } from 'node:fs/promises';
  4. import { TextLineStream } from './text-line-transform-stream';
  5. import type { ReadableStream } from 'node:stream/web';
  6. import { TextDecoderStream } from 'node:stream/web';
  7. import { processLine } from './process-line';
  8. import { $fetch } from './make-fetch-happen';
  9. import type { NodeFetchResponse } from './make-fetch-happen';
  10. import type { UndiciResponseData } from './fetch-retry';
  11. function getReadableStream(file: string | FileHandle): ReadableStream {
  12. if (typeof file === 'string') {
  13. // return fs.openAsBlob(file).then(blob => blob.stream())
  14. return Readable.toWeb(fs.createReadStream(file/* , { encoding: 'utf-8' } */));
  15. }
  16. return file.readableWebStream();
  17. }
  18. // TODO: use FileHandle.readLine()
  19. export const readFileByLine: ((file: string | FileHandle) => AsyncIterable<string>) = (file: string | FileHandle) => getReadableStream(file)
  20. .pipeThrough(new TextDecoderStream())
  21. .pipeThrough(new TextLineStream());
  22. function ensureResponseBody<T extends NodeFetchResponse | UndiciResponseData>(resp: T): NonNullable<T['body']> {
  23. // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- NodeFetchResponse['body'] is nullable
  24. if (resp.body == null) {
  25. throw new Error('Failed to fetch remote text');
  26. }
  27. if ('bodyUsed' in resp && resp.bodyUsed) {
  28. throw new Error('Body has already been consumed.');
  29. }
  30. return resp.body;
  31. }
  32. export const createReadlineInterfaceFromResponse: ((resp: NodeFetchResponse | UndiciResponseData) => AsyncIterable<string>) = (resp) => {
  33. const stream = ensureResponseBody(resp);
  34. const webStream: ReadableStream<Uint8Array> = 'getReader' in stream
  35. ? stream
  36. : (
  37. 'body' in stream
  38. ? stream.body
  39. : Readable.toWeb(new Readable().wrap(stream)) as any
  40. );
  41. return webStream
  42. .pipeThrough(new TextDecoderStream())
  43. .pipeThrough(new TextLineStream());
  44. };
  45. export function fetchRemoteTextByLine(url: string) {
  46. return $fetch(url).then(createReadlineInterfaceFromResponse);
  47. }
  48. export async function readFileIntoProcessedArray(file: string | FileHandle) {
  49. const results = [];
  50. for await (const line of readFileByLine(file)) {
  51. if (processLine(line)) {
  52. results.push(line);
  53. }
  54. }
  55. return results;
  56. }