Skip to content

Commit a40bee5

Browse files
committed
Fixes for correct archiving with low RAM
1 parent 4352ad8 commit a40bee5

File tree

5 files changed

+84
-51
lines changed

5 files changed

+84
-51
lines changed

archive.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
1+
#!/bin/bash
2+
set -euo pipefail
3+
4+
echo "Exporting documents"
15
python3 -m scripts.mongo_to_jsonl --output-path data/raw_new_docs.jsonl
6+
7+
echo "Cleaning documents"
28
python3 -m scripts.clean_docs --input-path data/raw_new_docs.jsonl --output-path data/new_docs.jsonl
39
python3 -m scripts.clean_docs --input-path data/raw_old_docs.jsonl --output-path data/old_docs.jsonl
410

511
cp data/old_docs.jsonl data/all_docs.jsonl
612
cat data/new_docs.jsonl >> data/all_docs.jsonl
713

14+
echo "Filtering documents"
815
python3 -m scripts.filter_documents data/all_docs.jsonl data/documents.jsonl
916

17+
echo "Exporting clusters"
1018
python3 -m scripts.clusters_to_jsonl --output-path data/new_clusters.jsonl
1119
cp data/old_clusters.jsonl data/all_clusters.jsonl
1220
cat data/new_clusters.jsonl >> data/all_clusters.jsonl
21+
22+
echo "Filtering clusters"
1323
python3 -m scripts.filter_posted_clusers data/all_clusters.jsonl data/clusters.jsonl data/documents.jsonl
1424

25+
echo "Packing"
1526
cp channels.json data/channels.json
1627
rm data/nyan_archive.tar.gz
1728
cd data && tar -czvf nyan_archive.tar.gz clusters.jsonl documents.jsonl channels.json

scripts/clusters_to_jsonl.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
def main(
1010
output_path,
11-
mongo_config
11+
mongo_config,
12+
clid_start,
13+
clid_end,
14+
batch_size
1215
):
1316
with open(mongo_config) as r:
1417
config = json.load(r)
@@ -18,19 +21,36 @@ def main(
1821
clusters_collection_name = config["clusters_collection_name"]
1922
collection = client[database_name][clusters_collection_name]
2023

21-
clusters = list(collection.find({}))
22-
clusters.sort(key=lambda x: x["annotation_doc"]["pub_time"])
24+
if not clid_start:
25+
first_cluster = collection.find_one(sort=[("clid", 1)])
26+
clid_start = first_cluster["clid"]
27+
if not clid_end:
28+
last_cluster = collection.find_one(sort=[("clid", -1)])
29+
clid_end = last_cluster["clid"] + 1
30+
print(f"Start clid: {clid_start}")
31+
print(f"End clid: {clid_end}")
32+
33+
current_clid_start = clid_start
2334
with open(output_path, "w") as w:
24-
for cluster in clusters:
25-
cluster.pop("_id")
26-
cluster["annotation_doc"].pop("embedding", None)
27-
cluster["annotation_doc"].pop("embedded_images", None)
28-
w.write(json.dumps(cluster, ensure_ascii=False) + "\n")
35+
while current_clid_start < clid_end:
36+
print(clid_end - current_clid_start)
37+
current_clid_end = current_clid_start + batch_size
38+
clusters = list(collection.find({"clid": {"$gte": current_clid_start, "$lt": current_clid_end}}))
39+
clusters.sort(key=lambda x: x["annotation_doc"]["pub_time"])
40+
for cluster in clusters:
41+
cluster.pop("_id")
42+
cluster["annotation_doc"].pop("embedding", None)
43+
cluster["annotation_doc"].pop("embedded_images", None)
44+
w.write(json.dumps(cluster, ensure_ascii=False) + "\n")
45+
current_clid_start = current_clid_end
2946

3047

3148
if __name__ == "__main__":
3249
parser = argparse.ArgumentParser()
3350
parser.add_argument("--output-path", type=str, default="data/clusters.jsonl")
3451
parser.add_argument("--mongo-config", type=str, default="configs/mongo_config.json")
52+
parser.add_argument("--clid-start", type=int, default=None)
53+
parser.add_argument("--clid-end", type=int, default=None)
54+
parser.add_argument("--batch-size", type=int, default=1000)
3555
args = parser.parse_args()
3656
main(**vars(args))

scripts/filter_documents.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,32 @@
11
import sys
22
import json
33
from collections import Counter
4+
from tqdm import tqdm
45

56
input_path = sys.argv[1]
67
output_path = sys.argv[2]
78

8-
with open(input_path) as r, open(output_path, "w") as w:
9-
documents = [json.loads(line) for line in r]
10-
documents.sort(key=lambda x: x["pub_time"])
9+
skip_idx = set()
10+
with open(input_path) as r:
11+
records = []
12+
for idx, line in enumerate(tqdm(r)):
13+
record = json.loads(line)
14+
record = {k: v for k, v in record.items() if k in ("pub_time", "url")}
15+
record["idx"] = idx
16+
records.append(record)
17+
records.sort(key=lambda x: x["pub_time"])
1118
used_urls = set()
12-
for doc in documents:
19+
for doc in tqdm(records):
1320
if doc["url"] in used_urls:
21+
skip_idx.add(doc["idx"])
1422
continue
1523
used_urls.add(doc["url"])
24+
print("Found {} duplicates".format(len(skip_idx)))
25+
26+
27+
with open(input_path) as r, open(output_path, "w") as w:
28+
for idx, line in enumerate(tqdm(r)):
29+
if idx in skip_idx:
30+
continue
31+
doc = json.loads(line)
1632
w.write(json.dumps(doc, ensure_ascii=False).strip() + "\n")

scripts/filter_posted_clusers.py

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,51 +9,34 @@
99
docs_path = sys.argv[3]
1010

1111
with open(input_path) as r, open(output_path, "w") as w, open(docs_path, "r") as df:
12-
docs = [json.loads(line) for line in df]
13-
url2doc = {doc["url"]: doc for doc in docs}
14-
clusters = [json.loads(line) for line in r]
15-
16-
filtered_clusters = []
12+
urls = {json.loads(line)["url"] for line in df}
1713
seen = set()
18-
for cluster in clusters:
14+
for line in r:
15+
cluster = json.loads(line)
1916
if isinstance(cluster["annotation_doc"], str):
2017
url = cluster["annotation_doc"]
21-
if url not in url2doc:
18+
if url not in urls:
2219
continue
23-
cluster["annotation_doc"] = url2doc[url]
24-
2520
url = cluster["first_doc"]
26-
if url not in url2doc:
21+
if url not in urls:
2722
continue
28-
cluster["first_doc"] = url2doc[url]
29-
30-
cluster["docs"] = [url2doc[url] for url in cluster["docs"] if url in url2doc]
31-
32-
url = cluster["annotation_doc"]["url"]
33-
if url not in url2doc or url in seen:
34-
continue
35-
seen.add(url)
36-
filtered_clusters.append(cluster)
37-
clusters = filtered_clusters
23+
cluster["docs"] = [url for url in cluster["docs"] if url in urls]
24+
else:
25+
annot_doc = cluster["annotation_doc"]
26+
if annot_doc["url"] not in urls:
27+
continue
28+
cluster["annotation_doc"] = annot_doc["url"]
3829

39-
clusters.sort(key=lambda x: x["first_doc"]["pub_time"])
40-
for cluster in tqdm(clusters):
41-
fixed_docs = []
42-
for doc in cluster["docs"]:
43-
new_doc = url2doc.get(doc["url"])
44-
if not new_doc:
30+
first_doc = cluster["first_doc"]
31+
if first_doc["url"] not in urls:
4532
continue
46-
fixed_docs.append(doc["url"])
47-
cluster["docs"] = fixed_docs
33+
cluster["first_doc"] = first_doc["url"]
4834

49-
annot_doc = cluster["annotation_doc"]
50-
if annot_doc["url"] not in url2doc:
51-
continue
52-
cluster["annotation_doc"] = annot_doc["url"]
35+
fixed_docs = [doc["url"] for doc in cluster["docs"] if doc["url"] in urls]
36+
cluster["docs"] = fixed_docs
5337

54-
first_doc = cluster["first_doc"]
55-
if first_doc["url"] not in url2doc:
38+
url = cluster["annotation_doc"]
39+
if url in seen:
5640
continue
57-
cluster["first_doc"] = first_doc["url"]
58-
41+
seen.add(url)
5942
w.write(json.dumps(cluster, ensure_ascii=False).strip() + "\n")

scripts/mongo_to_jsonl.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
def main(
1010
output_path,
1111
mongo_config,
12-
annotated
12+
annotated,
13+
ts_start
1314
):
1415
with open(mongo_config) as r:
1516
config = json.load(r)
@@ -22,8 +23,9 @@ def main(
2223
documents_collection_name = config["documents_collection_name"]
2324
collection = client[database_name][documents_collection_name]
2425

25-
first_doc = collection.find_one(sort=[("pub_time", 1)])
26-
ts_start = first_doc["pub_time"]
26+
if not ts_start:
27+
first_doc = collection.find_one(sort=[("pub_time", 1)])
28+
ts_start = first_doc["pub_time"]
2729
print(f"Start timestamp: {ts_start}")
2830
ts_end = get_current_ts()
2931
print(f"End timestamp: {ts_end}")
@@ -46,6 +48,7 @@ def main(
4648
parser = argparse.ArgumentParser()
4749
parser.add_argument("--output-path", type=str, default="data/docs.jsonl")
4850
parser.add_argument("--annotated", action="store_true")
51+
parser.add_argument("--ts-start", type=int, default=None)
4952
parser.add_argument("--mongo-config", type=str, default="configs/mongo_config.json")
5053
args = parser.parse_args()
5154
main(**vars(args))

0 commit comments

Comments
 (0)