This script is used to download and decrypt a file that has been encrypted and distributed across a peer-to-peer network, specifically IPFS via Cloudflare gateway.
It first sets a filename, "A-001.Algha_Porthos.2022 年 3 月学军植树节.20220311.fd943c82.7z", and a debug mode, which is set to False.
It then defines a "Seed" class, which contains a key, cid, and checksum that are used to identify and access the encrypted file on the network.
It also defines a "KeyDerivation" class, which is used to generate a series of keys that are used to decrypt the file. This class uses the sha3_512 algorithm, which is a cryptographic hashing function that is considered to be more secure than its predecessor, sha512.
The script then defines a "decrypt" function, which uses the AESGCM algorithm to decrypt the data using the key. It also creates a session object to handle the request to the peer-to-peer network, and a "fetch" function, which retrieves the data from the network using the cid and checksum.
The "extract" function is then defined, which uses the "fetch" function to retrieve the data, compares the checksum of the retrieved data to the original checksum, and uses the "decrypt" function to decrypt the data using the key.
In the
if __name__ == "__main__"
block, the script uses the "extract" function to download and decrypt the seed file, and uses the "loads" function from the json library to load the data as a Python object. It then creates an instance of the "KeyDerivation" class using the data from the seed file, and depending on the value of the "debug" variable, either prints the data or writes it to the file specified in the "filename" variable.
import os
from base64 import urlsafe_b64decode
from hashlib import sha3_256, sha3_512
from json import loads
from random import random
from time import sleep
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from requests.sessions import Session
filename: str = r"A-001.Algha_Porthos.2022 年 3 月学军植树节.20220311.fd943c82.7z"
debug: bool = False
class Seed:
key: bytes = urlsafe_b64decode(
b"abRV-FWfMHiYroCY51hpc_X6Ph1VlPztDFvLn6pFA4kgDWMJVtSucWiLxOYlhS2fbA0Q113AGZfwKVOaS2SpkA=="
)
cid: str = "bafkreigoxzlhpiwk2r2qsqhbc44ivu43lyx6lu3khmk5eejvsyfk4wgv54"
checksum: str = "3HPXRCUjWlT0ozUMN5EnZ9yw27cWAg4jeifR2sLaQNQ="
class KeyDerivation:
__init_discard__: int = 1048576
__call_discard__: int = 65536
def __discard__(self, round: int) -> None:
for _ in range(round):
self.__next__ = sha3_512(self.__next__).digest()
self.__generator__.update(self.__next__)
self.__generator__.update(self.__generator__.digest())
def __init__(self, iv: bytes, next: bytes) -> None:
assert len(iv) == 256, "iv must be 2048 bits"
assert len(next) == 64, "next must be 512 bits"
self.__generator__ = sha3_512(iv)
self.__next__ = next
self.__discard__(self.__init_discard__)
def __call__(self) -> bytes:
self.__discard__(self.__call_discard__)
return self.__generator__.digest()
def decrypt(data: bytes, key: bytes) -> bytes:
assert len(key) == 64, "key must be 512 bits"
return AESGCM(key=key[:32]).decrypt(
nonce=key[-12:], data=data, associated_data=key[32:-12]
)
session = Session()
def fetch(cid: str) -> bytes:
while True:
try:
resp = session.get(
f"https://cloudflare-ipfs.com/ipfs/{cid}",
params={"nonce": random().hex()},
timeout=180.0,
)
resp.raise_for_status()
return resp.content
except Exception as err:
print("error", err)
sleep(2.0)
def extract(key: bytes, cid: str, checksum: str) -> bytes:
print("cid", cid)
checksum = urlsafe_b64decode(checksum)
checked = False
while not checked:
data = fetch(cid)
checked = checksum == (sha3_256(data).digest())
print("len", len(data))
return decrypt(data=data, key=key)
if __name__ == "__main__":
seed = loads(extract(key=Seed.key, cid=Seed.cid, checksum=Seed.checksum))
keyDerivation = KeyDerivation(
iv=urlsafe_b64decode(seed["crypto"]["iv"]),
next=urlsafe_b64decode(seed["crypto"]["next"]),
)
print("debug", debug)
if debug:
for obj in seed["store"]:
key = keyDerivation()
extract(key=key, cid=obj["cid"], checksum=obj["checksum"])
else:
import sys
if getattr(sys, "frozen", False):
current = os.path.dirname(sys.executable)
else:
current = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(current, filename), "wb") as fout:
for obj in seed["store"]:
key = keyDerivation()
fout.write(extract(key=key, cid=obj["cid"], checksum=obj["checksum"]))