浏览代码

Chore: revert stream back to asyncgenerator

SukkaW 2 年之前
父节点
当前提交
da7c6764c5
共有 5 个文件被更改,包括 82 次插入15 次删除
  1. 3 3
      Build/index.ts
  2. 16 4
      Build/lib/create-file.ts
  3. 62 7
      Build/lib/fetch-remote-text-by-line.ts
  4. 二进制
      bun.lockb
  5. 1 1
      package.json

+ 3 - 3
Build/index.ts

@@ -9,7 +9,7 @@ import { buildTelegramCIDR } from './build-telegram-cidr';
 import { buildChnCidr } from './build-chn-cidr';
 import { buildSpeedtestDomainSet } from './build-speedtest-domainset';
 import { buildInternalCDNDomains } from './build-internal-cdn-rules';
-import { buildInternalChnDomains } from './build-internal-chn-domains';
+// import { buildInternalChnDomains } from './build-internal-chn-domains';
 import { buildDomesticRuleset } from './build-domestic-ruleset';
 import { buildStreamService } from './build-stream-service';
 import { buildRedirectModule } from './build-redirect-module';
@@ -64,7 +64,7 @@ import { buildPublicHtml } from './build-public';
     //   buildInternalReverseChnCIDRWorker.postMessage('build');
     // });
 
-    const buildInternalChnDomainsPromise = buildInternalChnDomains();
+    // const buildInternalChnDomainsPromise = buildInternalChnDomains();
     const buildDomesticRulesetPromise = downloadPreviousBuildPromise.then(() => buildDomesticRuleset());
 
     const buildRedirectModulePromise = downloadPreviousBuildPromise.then(() => buildRedirectModule());
@@ -84,7 +84,7 @@ import { buildPublicHtml } from './build-public';
       buildSpeedtestDomainSetPromise,
       buildInternalCDNDomainsPromise,
       // buildInternalReverseChnCIDRPromise,
-      buildInternalChnDomainsPromise,
+      // buildInternalChnDomainsPromise,
       buildDomesticRulesetPromise,
       buildRedirectModulePromise,
       buildStreamServicePromise

+ 16 - 4
Build/lib/create-file.ts

@@ -48,13 +48,25 @@ export async function compareAndWriteFile(linesA: string[], filePath: string) {
     return;
   }
 
-  const writer = file.writer();
+  console.log(`Writing ${filePath}...`);
 
-  for (let i = 0; i < linesALen; i++) {
-    writer.write(`${linesA[i]}\n`);
+  const start = Bun.nanoseconds();
+
+  if (linesALen < 10000) {
+    await Bun.write(file, `${linesA.join('\n')}\n`);
+  } else {
+    const writer = file.writer();
+
+    for (let i = 0; i < linesALen; i++) {
+      writer.write(linesA[i]);
+      writer.write('\n');
+    }
+
+    writer.flush();
+    await writer.end();
   }
 
-  return writer.end();
+  console.log(`Done writing ${filePath} in ${(Bun.nanoseconds() - start) / 1e6}ms`);
 }
 
 export const withBannerArray = (title: string, description: string[], date: Date, content: string[]) => {

+ 62 - 7
Build/lib/fetch-remote-text-by-line.ts

@@ -1,16 +1,54 @@
 import type { BunFile } from 'bun';
 import { fetchWithRetry, defaultRequestInit } from './fetch-retry';
-import { TextLineStream } from './text-line-transform-stream';
-import { PolyfillTextDecoderStream } from './text-decoder-stream';
+// import { TextLineStream } from './text-line-transform-stream';
+// import { PolyfillTextDecoderStream } from './text-decoder-stream';
 
-export function readFileByLine(file: string | BunFile) {
+// export function readFileByLine(file: string | BunFile) {
+//   if (typeof file === 'string') {
+//     file = Bun.file(file);
+//   }
+//   return file.stream().pipeThrough(new PolyfillTextDecoderStream()).pipeThrough(new TextLineStream());
+// }
+
+// export function createReadlineInterfaceFromResponse(resp: Response) {
+//   if (!resp.body) {
+//     throw new Error('Failed to fetch remote text');
+//   }
+//   if (resp.bodyUsed) {
+//     throw new Error('Body has already been consumed.');
+//   }
+
+//   return (resp.body as ReadableStream<Uint8Array>).pipeThrough(new PolyfillTextDecoderStream()).pipeThrough(new TextLineStream());
+// }
+
+const decoder = new TextDecoder('utf-8');
+
+export async function *readFileByLine(file: string | BunFile): AsyncGenerator<string> {
   if (typeof file === 'string') {
     file = Bun.file(file);
   }
-  return file.stream().pipeThrough(new PolyfillTextDecoderStream()).pipeThrough(new TextLineStream());
+
+  let buf = '';
+
+  for await (const chunk of file.stream()) {
+    const chunkStr = decoder.decode(chunk).replaceAll('\r\n', '\n');
+    for (let i = 0, len = chunkStr.length; i < len; i++) {
+      const char = chunkStr[i];
+      if (char === '\n') {
+        yield buf;
+        buf = '';
+      } else {
+        buf += char;
+      }
+    }
+  }
+
+  if (buf) {
+    yield buf;
+  }
 }
 
-export function createReadlineInterfaceFromResponse(resp: Response) {
+export async function *createReadlineInterfaceFromResponse(resp: Response): AsyncGenerator<string> {
   if (!resp.body) {
     throw new Error('Failed to fetch remote text');
   }
@@ -18,9 +56,26 @@ export function createReadlineInterfaceFromResponse(resp: Response) {
     throw new Error('Body has already been consumed.');
   }
 
-  return (resp.body as ReadableStream<Uint8Array>).pipeThrough(new PolyfillTextDecoderStream()).pipeThrough(new TextLineStream());
+  let buf = '';
+
+  for await (const chunk of resp.body) {
+    const chunkStr = decoder.decode(chunk).replaceAll('\r\n', '\n');
+    for (let i = 0, len = chunkStr.length; i < len; i++) {
+      const char = chunkStr[i];
+      if (char === '\n') {
+        yield buf;
+        buf = '';
+      } else {
+        buf += char;
+      }
+    }
+  }
+
+  if (buf) {
+    yield buf;
+  }
 }
 
 export function fetchRemoteTextAndCreateReadlineInterface(url: string | URL) {
-  return fetchWithRetry(url, defaultRequestInit).then(res => createReadlineInterfaceFromResponse(res));
+  return fetchWithRetry(url, defaultRequestInit).then(res => createReadlineInterfaceFromResponse(res as Response));
 }

二进制
bun.lockb


+ 1 - 1
package.json

@@ -15,7 +15,7 @@
   "license": "ISC",
   "dependencies": {
     "@cliqz/adblocker": "^1.26.12",
-    "@sukka/listdir": "^0.2.0",
+    "@sukka/listdir": "^0.3.1",
     "async-retry": "^1.3.3",
     "async-sema": "^3.1.1",
     "ci-info": "^4.0.0",