fetch-text-by-line.ts 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import fs from 'fs';
  2. import { Readable } from 'stream';
  3. import { fetchWithRetry, defaultRequestInit } from './fetch-retry';
  4. import type { FileHandle } from 'fs/promises';
  5. import { TextLineStream } from './text-line-transform-stream';
  6. import type { ReadableStream } from 'stream/web';
  7. import { TextDecoderStream } from 'stream/web';
  8. import { processLine } from './process-line';
  9. const enableTextLineStream = !!process.env.ENABLE_TEXT_LINE_STREAM;
  10. const decoder = new TextDecoder('utf-8');
  11. async function *createTextLineAsyncIterableFromStreamSource(stream: ReadableStream<Uint8Array>): AsyncIterable<string> {
  12. let buf = '';
  13. const reader = stream.getReader();
  14. while (true) {
  15. const res = await reader.read();
  16. if (res.done) {
  17. break;
  18. }
  19. const chunkStr = decoder.decode(res.value).replaceAll('\r\n', '\n');
  20. for (let i = 0, len = chunkStr.length; i < len; i++) {
  21. const char = chunkStr[i];
  22. if (char === '\n') {
  23. yield buf;
  24. buf = '';
  25. } else {
  26. buf += char;
  27. }
  28. }
  29. }
  30. if (buf) {
  31. yield buf;
  32. }
  33. }
  34. const getReadableStream = (file: string | FileHandle): ReadableStream => {
  35. if (typeof file === 'string') {
  36. return Readable.toWeb(fs.createReadStream(file /* { encoding: 'utf-8' } */));
  37. }
  38. return file.readableWebStream();
  39. };
  40. // TODO: use FileHandle.readLine()
  41. export const readFileByLine: ((file: string | FileHandle) => AsyncIterable<string>) = enableTextLineStream
  42. ? (file: string | FileHandle) => getReadableStream(file).pipeThrough(new TextDecoderStream()).pipeThrough(new TextLineStream())
  43. : (file: string | FileHandle) => createTextLineAsyncIterableFromStreamSource(getReadableStream(file));
  44. const ensureResponseBody = (resp: Response) => {
  45. if (!resp.body) {
  46. throw new Error('Failed to fetch remote text');
  47. }
  48. if (resp.bodyUsed) {
  49. throw new Error('Body has already been consumed.');
  50. }
  51. return resp.body;
  52. };
  53. export const createReadlineInterfaceFromResponse: ((resp: Response) => AsyncIterable<string>) = enableTextLineStream
  54. ? (resp) => ensureResponseBody(resp).pipeThrough(new TextDecoderStream()).pipeThrough(new TextLineStream())
  55. : (resp) => createTextLineAsyncIterableFromStreamSource(ensureResponseBody(resp));
  56. export function fetchRemoteTextByLine(url: string | URL) {
  57. return fetchWithRetry(url, defaultRequestInit).then(createReadlineInterfaceFromResponse);
  58. }
  59. export async function readFileIntoProcessedArray(file: string | FileHandle) {
  60. const results = [];
  61. for await (const line of readFileByLine(file)) {
  62. if (processLine(line)) {
  63. results.push(line);
  64. }
  65. }
  66. return results;
  67. }