fetch-text-by-line.ts 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import fs from 'node:fs';
  2. import { Readable } from 'node:stream';
  3. import type { FileHandle } from 'node:fs/promises';
  4. import { TextLineStream } from './text-line-transform-stream';
  5. import type { ReadableStream } from 'node:stream/web';
  6. import { TextDecoderStream } from 'node:stream/web';
  7. import { processLine } from './process-line';
  8. import { $fetch } from './make-fetch-happen';
  9. import type { NodeFetchResponse } from './make-fetch-happen';
  10. const getReadableStream = (file: string | FileHandle): ReadableStream => {
  11. if (typeof file === 'string') {
  12. // return fs.openAsBlob(file).then(blob => blob.stream())
  13. return Readable.toWeb(fs.createReadStream(file/* , { encoding: 'utf-8' } */));
  14. }
  15. return file.readableWebStream();
  16. };
  17. // TODO: use FileHandle.readLine()
  18. export const readFileByLine: ((file: string | FileHandle) => AsyncIterable<string>) = (file: string | FileHandle) => getReadableStream(file)
  19. .pipeThrough(new TextDecoderStream())
  20. .pipeThrough(new TextLineStream());
  21. const ensureResponseBody = <T extends Response | NodeFetchResponse>(resp: T): NonNullable<T['body']> => {
  22. if (!resp.body) {
  23. throw new Error('Failed to fetch remote text');
  24. }
  25. if (resp.bodyUsed) {
  26. throw new Error('Body has already been consumed.');
  27. }
  28. return resp.body;
  29. };
  30. export const createReadlineInterfaceFromResponse: ((resp: Response | NodeFetchResponse) => AsyncIterable<string>) = (resp) => {
  31. const stream = ensureResponseBody(resp);
  32. const webStream: ReadableStream<Uint8Array> = 'getReader' in stream
  33. ? stream
  34. : Readable.toWeb(new Readable().wrap(stream)) as any;
  35. return webStream
  36. .pipeThrough(new TextDecoderStream())
  37. .pipeThrough(new TextLineStream());
  38. };
  39. export function fetchRemoteTextByLine(url: string) {
  40. return $fetch(url).then(createReadlineInterfaceFromResponse);
  41. }
  42. export async function readFileIntoProcessedArray(file: string | FileHandle) {
  43. const results = [];
  44. for await (const line of readFileByLine(file)) {
  45. if (processLine(line)) {
  46. results.push(line);
  47. }
  48. }
  49. return results;
  50. }