fetch-text-by-line.ts 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import fs from 'fs';
  2. import { Readable } from 'stream';
  3. import type { BunFile } from 'bun';
  4. import { fetchWithRetry, defaultRequestInit } from './fetch-retry';
  5. import type { FileHandle } from 'fs/promises';
  6. import { TextLineStream } from './text-line-transform-stream';
  7. import { PolyfillTextDecoderStream } from './text-decoder-stream';
  8. import { TextDecoderStream as NodeTextDecoderStream } from 'stream/web';
  9. import { processLine } from './process-line';
  10. const enableTextLineStream = !!process.env.ENABLE_TEXT_LINE_STREAM;
  11. const decoder = new TextDecoder('utf-8');
  12. async function *createTextLineAsyncIterableFromStreamSource(stream: ReadableStream<Uint8Array>): AsyncIterable<string> {
  13. let buf = '';
  14. const reader = stream.getReader();
  15. while (true) {
  16. const res = await reader.read();
  17. if (res.done) {
  18. break;
  19. }
  20. const chunkStr = decoder.decode(res.value).replaceAll('\r\n', '\n');
  21. for (let i = 0, len = chunkStr.length; i < len; i++) {
  22. const char = chunkStr[i];
  23. if (char === '\n') {
  24. yield buf;
  25. buf = '';
  26. } else {
  27. buf += char;
  28. }
  29. }
  30. }
  31. if (buf) {
  32. yield buf;
  33. }
  34. }
  35. const getReadableStream = typeof Bun !== 'undefined'
  36. ? (file: string | BunFile | FileHandle): ReadableStream => {
  37. if (typeof file === 'string') {
  38. return Bun.file(file).stream();
  39. }
  40. if ('writer' in file) {
  41. return file.stream();
  42. }
  43. return file.readableWebStream();
  44. }
  45. : (file: string | BunFile | FileHandle): ReadableStream => {
  46. if (typeof file === 'string') {
  47. return Readable.toWeb(fs.createReadStream(file /* { encoding: 'utf-8' } */));
  48. }
  49. if ('writer' in file) {
  50. return file.stream();
  51. }
  52. return file.readableWebStream();
  53. };
  54. // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- On Bun, NodeTextDecoderStream is undefined
  55. const TextDecoderStream = NodeTextDecoderStream ?? PolyfillTextDecoderStream;
  56. // TODO: use FileHandle.readLine()
  57. export const readFileByLine: ((file: string | BunFile | FileHandle) => AsyncIterable<string>) = enableTextLineStream
  58. ? (file: string | BunFile | FileHandle) => getReadableStream(file).pipeThrough(new TextDecoderStream()).pipeThrough(new TextLineStream())
  59. : (file: string | BunFile | FileHandle) => createTextLineAsyncIterableFromStreamSource(getReadableStream(file));
  60. const ensureResponseBody = (resp: Response) => {
  61. if (!resp.body) {
  62. throw new Error('Failed to fetch remote text');
  63. }
  64. if (resp.bodyUsed) {
  65. throw new Error('Body has already been consumed.');
  66. }
  67. return resp.body;
  68. };
  69. export const createReadlineInterfaceFromResponse: ((resp: Response) => AsyncIterable<string>) = enableTextLineStream
  70. ? (resp) => ensureResponseBody(resp).pipeThrough(new TextDecoderStream()).pipeThrough(new TextLineStream())
  71. : (resp) => createTextLineAsyncIterableFromStreamSource(ensureResponseBody(resp));
  72. export function fetchRemoteTextByLine(url: string | URL) {
  73. return fetchWithRetry(url, defaultRequestInit).then(createReadlineInterfaceFromResponse);
  74. }
  75. export async function readFileIntoProcessedArray(file: string | BunFile | FileHandle) {
  76. const results = [];
  77. for await (const line of readFileByLine(file)) {
  78. if (processLine(line)) {
  79. results.push(line);
  80. }
  81. }
  82. return results;
  83. }