mirror of
https://github.com/flatpak/flatpak.git
synced 2026-03-27 19:33:06 -04:00
This just does a GET, which is not quite right, but will work. This is needed for the authenticator.
269 lines
8.0 KiB
Python
Executable File
269 lines
8.0 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
from urllib.parse import parse_qs
|
|
import http.server as http_server
|
|
|
|
repositories = {}
|
|
icons = {}
|
|
|
|
def get_index():
|
|
results = []
|
|
for repo_name in sorted(repositories.keys()):
|
|
repo = repositories[repo_name]
|
|
results.append({
|
|
'Name': repo_name,
|
|
'Images': repo['images'],
|
|
'Lists': [],
|
|
})
|
|
|
|
return json.dumps({
|
|
'Registry': '/',
|
|
'Results': results
|
|
}, indent=4)
|
|
|
|
def cache_icon(data_uri):
|
|
prefix = 'data:image/png;base64,'
|
|
assert data_uri.startswith(prefix)
|
|
data = base64.b64decode(data_uri[len(prefix):])
|
|
h = hashlib.sha256()
|
|
h.update(data)
|
|
digest = h.hexdigest()
|
|
filename = digest + '.png'
|
|
icons[filename] = data
|
|
|
|
return '/icons/' + filename
|
|
|
|
serial = 0
|
|
server_start_time = int(time.time())
|
|
|
|
def get_etag():
|
|
return str(server_start_time) + '-' + str(serial)
|
|
|
|
def modified():
|
|
global serial
|
|
serial += 1
|
|
|
|
def parse_http_date(date):
|
|
parsed = parsedate(date)
|
|
if parsed is not None:
|
|
return timegm(parsed)
|
|
else:
|
|
return None
|
|
|
|
class RequestHandler(http_server.BaseHTTPRequestHandler):
|
|
def check_route(self, route):
|
|
parts = self.path.split('?', 1)
|
|
path = parts[0].split('/')
|
|
|
|
result = []
|
|
|
|
route_path = route.split('/')
|
|
print((route_path, path))
|
|
if len(route_path) != len(path):
|
|
return False
|
|
|
|
matches = {}
|
|
for i in range(1, len(route_path)):
|
|
if route_path[i][0] == '@':
|
|
matches[route_path[i][1:]] = path[i]
|
|
elif route_path[i] != path[i]:
|
|
return False
|
|
|
|
self.matches = matches
|
|
if len(parts) == 1:
|
|
self.query = {}
|
|
else:
|
|
self.query = parse_qs(parts[1], keep_blank_values=True)
|
|
|
|
return True
|
|
|
|
def do_GET(self):
|
|
response = 200
|
|
response_string = None
|
|
response_content_type = "application/octet-stream"
|
|
response_file = None
|
|
|
|
add_headers = {}
|
|
|
|
if self.check_route('/v2/@repo_name/blobs/@digest'):
|
|
repo_name = self.matches['repo_name']
|
|
digest = self.matches['digest']
|
|
response_file = repositories[repo_name]['blobs'][digest]
|
|
elif self.check_route('/v2/@repo_name/manifests/@ref'):
|
|
repo_name = self.matches['repo_name']
|
|
ref = self.matches['ref']
|
|
response_file = repositories[repo_name]['manifests'][ref]
|
|
elif self.check_route('/index/static') or self.check_route('/index/dynamic'):
|
|
etag = get_etag()
|
|
if self.headers.get("If-None-Match") == etag:
|
|
response = 304
|
|
else:
|
|
response_string = get_index()
|
|
add_headers['Etag'] = etag
|
|
elif self.check_route('/icons/@filename') :
|
|
response_string = icons[self.matches['filename']]
|
|
response_content_type = 'image/png'
|
|
else:
|
|
response = 404
|
|
|
|
self.send_response(response)
|
|
for k, v in list(add_headers.items()):
|
|
self.send_header(k, v)
|
|
|
|
if response == 200:
|
|
self.send_header("Content-Type", response_content_type)
|
|
|
|
if response == 200 or response == 304:
|
|
self.send_header('Cache-Control', 'no-cache')
|
|
|
|
self.end_headers()
|
|
|
|
if response == 200:
|
|
if response_file:
|
|
with open(response_file, 'rb') as f:
|
|
response_string = f.read()
|
|
|
|
if isinstance(response_string, bytes):
|
|
self.wfile.write(response_string)
|
|
else:
|
|
self.wfile.write(response_string.encode('utf-8'))
|
|
|
|
def do_HEAD(self):
|
|
return self.do_GET()
|
|
|
|
def do_POST(self):
|
|
if self.check_route('/testing/@repo_name/@tag'):
|
|
repo_name = self.matches['repo_name']
|
|
tag = self.matches['tag']
|
|
d = self.query['d'][0]
|
|
detach_icons = 'detach-icons' in self.query
|
|
|
|
repo = repositories.setdefault(repo_name, {})
|
|
blobs = repo.setdefault('blobs', {})
|
|
manifests = repo.setdefault('manifests', {})
|
|
images = repo.setdefault('images', [])
|
|
|
|
with open(os.path.join(d, 'index.json')) as f:
|
|
index = json.load(f)
|
|
|
|
manifest_digest = index['manifests'][0]['digest']
|
|
manifest_path = os.path.join(d, 'blobs', *manifest_digest.split(':'))
|
|
manifests[manifest_digest] = manifest_path
|
|
manifests[tag] = manifest_path
|
|
|
|
with open(manifest_path) as f:
|
|
manifest = json.load(f)
|
|
|
|
config_digest = manifest['config']['digest']
|
|
config_path = os.path.join(d, 'blobs', *config_digest.split(':'))
|
|
|
|
with open(config_path) as f:
|
|
config = json.load(f)
|
|
|
|
for dig in os.listdir(os.path.join(d, 'blobs', 'sha256')):
|
|
digest = 'sha256:' + dig
|
|
path = os.path.join(d, 'blobs', 'sha256', dig)
|
|
if digest != manifest_digest:
|
|
blobs[digest] = path
|
|
|
|
if detach_icons:
|
|
for size in (64, 128):
|
|
annotation = 'org.freedesktop.appstream.icon-{}'.format(size)
|
|
icon = manifest.get('annotations', {}).get(annotation)
|
|
if icon:
|
|
path = cache_icon(icon)
|
|
manifest['annotations'][annotation] = path
|
|
else:
|
|
icon = config.get('config', {}).get('Labels', {}).get(annotation)
|
|
if icon:
|
|
path = cache_icon(icon)
|
|
config['config']['Labels'][annotation] = path
|
|
|
|
image = {
|
|
"Tags": [tag],
|
|
"Digest": manifest_digest,
|
|
"MediaType": "application/vnd.oci.image.manifest.v1+json",
|
|
"OS": config['os'],
|
|
"Architecture": config['architecture'],
|
|
"Annotations": manifest.get('annotations', {}),
|
|
"Labels": config.get('config', {}).get('Labels', {}),
|
|
}
|
|
|
|
# Delete old versions
|
|
for i in images:
|
|
if tag in i['Tags']:
|
|
images.remove(i)
|
|
del manifests[i['Digest']]
|
|
|
|
images.append(image)
|
|
|
|
modified()
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
return
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
return
|
|
|
|
def do_DELETE(self):
|
|
if self.check_route('/testing/@repo_name/@ref'):
|
|
repo_name = self.matches['repo_name']
|
|
ref = self.matches['ref']
|
|
|
|
repo = repositories.setdefault(repo_name, {})
|
|
blobs = repo.setdefault('blobs', {})
|
|
manifests = repo.setdefault('manifests', {})
|
|
images = repo.setdefault('images', [])
|
|
|
|
image = None
|
|
for i in images:
|
|
if i['Digest'] == ref or ref in i['Tags']:
|
|
image = i
|
|
break
|
|
|
|
assert image
|
|
|
|
images.remove(image)
|
|
del manifests[image['Digest']]
|
|
for t in image['Tags']:
|
|
del manifests[t]
|
|
|
|
modified()
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
return
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
return
|
|
|
|
def run(dir):
|
|
RequestHandler.protocol_version = "HTTP/1.0"
|
|
httpd = http_server.HTTPServer( ("127.0.0.1", 0), RequestHandler)
|
|
host, port = httpd.socket.getsockname()[:2]
|
|
with open("httpd-port", 'w') as file:
|
|
file.write("%d" % port)
|
|
try:
|
|
os.write(3, bytes("Started\n", 'utf-8'));
|
|
except:
|
|
pass
|
|
print("Serving HTTP on port %d" % port);
|
|
if dir:
|
|
os.chdir(dir)
|
|
httpd.serve_forever()
|
|
|
|
if __name__ == '__main__':
|
|
dir = None
|
|
if len(sys.argv) >= 2 and len(sys.argv[1]) > 0:
|
|
dir = sys.argv[1]
|
|
|
|
run(dir)
|