fetch-text-by-line.ts 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import type { BunFile } from 'bun';
  2. import { fetchWithRetry, defaultRequestInit } from './fetch-retry';
  3. import { TextLineStream } from './text-line-transform-stream';
  4. import { PolyfillTextDecoderStream } from './text-decoder-stream';
  5. // function createTextLineStreamFromStreamSource(stream: ReadableStream<Uint8Array>) {
  6. // return stream
  7. // .pipeThrough(new PolyfillTextDecoderStream())
  8. // .pipeThrough(new TextLineStream());
  9. // }
  10. const decoder = new TextDecoder('utf-8');
  11. async function *createTextLineAsyncGeneratorFromStreamSource(stream: ReadableStream<Uint8Array>): AsyncGenerator<string> {
  12. let buf = '';
  13. for await (const chunk of stream) {
  14. const chunkStr = decoder.decode(chunk).replaceAll('\r\n', '\n');
  15. for (let i = 0, len = chunkStr.length; i < len; i++) {
  16. const char = chunkStr[i];
  17. if (char === '\n') {
  18. yield buf;
  19. buf = '';
  20. } else {
  21. buf += char;
  22. }
  23. }
  24. }
  25. if (buf) {
  26. yield buf;
  27. }
  28. }
  29. export function readFileByLine(file: string | URL | BunFile) {
  30. if (typeof file === 'string') {
  31. file = Bun.file(file);
  32. } else if (!('writer' in file)) {
  33. file = Bun.file(file);
  34. }
  35. return createTextLineAsyncGeneratorFromStreamSource(file.stream());
  36. }
  37. export function createReadlineInterfaceFromResponse(this: void, resp: Response) {
  38. if (!resp.body) {
  39. throw new Error('Failed to fetch remote text');
  40. }
  41. if (resp.bodyUsed) {
  42. throw new Error('Body has already been consumed.');
  43. }
  44. return createTextLineAsyncGeneratorFromStreamSource(resp.body);
  45. }
  46. export function fetchRemoteTextByLine(url: string | URL) {
  47. return fetchWithRetry(url, defaultRequestInit).then(createReadlineInterfaceFromResponse);
  48. }