summaryrefslogtreecommitdiff
path: root/library
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2022-09-04 18:25:12 +0200
committerChristian Pointner <equinox@spreadspace.org>2022-09-04 18:25:20 +0200
commit822b5e0e2eff46fbd007024ad1323abe27c2ff70 (patch)
tree3dfb358e3bbfe59815c881989f3e06be20108a9a /library
parentubuntu/installer: improved handling of latest image (diff)
raspios: major refactoring for image fetching, TODO: customization
Diffstat (limited to 'library')
-rw-r--r--library/decompress.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/library/decompress.py b/library/decompress.py
new file mode 100644
index 00000000..e07eb949
--- /dev/null
+++ b/library/decompress.py
@@ -0,0 +1,226 @@
+#!/usr/bin/python
+# this is based on https://github.com/socratesx/Ansible-Decompress
+
+import zipfile
+import os
+import posixpath
+import shutil
+import xopen
+from ansible.module_utils.basic import AnsibleModule
+ANSIBLE_METADATA = {
+ 'metadata_version': '1.1',
+ 'status': ['preview'],
+ 'supported_by': 'community'
+}
+
+DOCUMENTATION = '''
+---
+module: decompress
+
+short_description: This is a simple module for decompressing unarchived files
+
+version_added: "2.4"
+
+description:
+ - "The module gets either a gz,bz2 or zip compressed file and just uncompress its content. Its purpose is to cover the case where a single file is just compressed such as a bootimage.iso.gz. but not archived. In case the file is archived, e.g. bootimage.tar.gz then the core module unarchive can handle it."
+
+options:
+ src:
+ description:
+ - This is the absolute path of the compressed file
+ required: true
+ dest:
+ description:
+ - The destination of the uncompressed file. This can be an absolute file path where the content will be saved as the specified filename, a directory where the filename will match the src filename without the (.gz|.bz2|.xz|.zip) extention or undefined. In the last case the file will be uncompressed in the same directory of the src with the same filename without the compress extention.
+ required: false
+ force:
+ description:
+ - Set this option to True to overwrite the extracted file in case it exists on destination.
+ required: false
+ default: false
+ update:
+ description:
+ - Set this to True to overwrite the file only if it has different size/
+ required: false
+ default: true
+
+extends_documentation_fragment:
+ - files
+
+author:
+ - Socrates Chouridis
+ - Christian Pointner
+'''
+
+EXAMPLES = '''
+# This will decompress the bootimage.iso.gz and move it to ~/isos/bootimage.iso
+- name: Decompress a gzipped iso image downloaded from the Internet
+ decompress:
+ src: '/tmp/bootimage.iso.gz'
+ dest: '~/isos/'
+
+# Setting the extracted file name explicitly
+- name: Decompress a bz2 iso image downloaded from the Internet
+ decompress:
+ src: '/tmp/bootimage.iso.bz2'
+ dest: '~/isos/newname_image.iso'
+
+# Extract multiple images using with_items
+- name: decompress multiple images
+ decompress:
+ src: "{{ item }}"
+ dest: '/my-images/'
+ force: true
+ with_items: "{{ compressed_isos_list }}"
+
+# Setting just the src will use the same name ommiting the extention, the result will be /tmp/bootimage.iso
+- name: Decompress in the same folder using the same name
+ decompress:
+ src: '/tmp/bootimage.iso.zip'
+
+
+'''
+
+RETURN = ''':
+message:
+ description: An information message regarding the decompression result.
+ returned: 'always'
+ type: 'str'
+files:
+ description: A list containing the files in the compressed file.
+ returned: success
+ type: list
+'''
+
+
+def decompress_file(data={}):
+ try:
+ destination = data['dest']
+ force = data['force']
+ update = data['update']
+ original_file_path = str(posixpath.abspath(data['src'])) # Original File Absolute Path
+ original_file_dir = os.path.dirname(original_file_path)
+ path_list = os.path.splitext(original_file_path) # List
+ ext = path_list[1]
+
+ if not destination:
+ extracted_filename = path_list[0]
+ elif not os.path.dirname(destination):
+ extracted_filename = os.path.join(original_file_dir, destination)
+ elif os.path.isdir(destination):
+ extracted_filename = os.path.join(destination, os.path.basename(path_list[0]))
+
+ if ext in [".gz", ".bz2", ".xz"]:
+ result = use_xopen(original_file_path, extracted_filename, force)
+ elif ext == ".zip":
+ result = use_unzip(original_file_path, os.path.dirname(extracted_filename), force, update)
+ else:
+ message = "The file type " + "\"" + ext + "\"" + " is not supported by this module. Supported File Formats: .gz, .bz2, .xz, .zip"
+ return True, False, message, [original_file_path]
+
+ return result[0], result[1], result[2], result[3]
+
+ except Exception as e:
+ message = "An error occured. Decompress Failed: " + str(e)
+ return True, False, message, []
+
+
+def use_xopen(src, dst, force):
+ dst_exists = False
+ if os.path.exists(dst):
+ dst_exists = True
+
+ with xopen.xopen(src, 'rb') as f_in, open(dst, 'wb') as f_out:
+ if not dst_exists:
+ shutil.copyfileobj(f_in, f_out)
+ message = "File Extracted Successfully: " + dst
+ return False, True, message, [dst]
+ else:
+ if force:
+ shutil.copyfileobj(f_in, f_out)
+ message = "File Extracted Successfully and replaced file (Force = True): " + dst
+ return False, True, message, [dst]
+ else:
+ message = "File Exists: skipping extraction (Use Force=True to Overwrite): " + dst
+ return False, False, message, [dst]
+
+
+def use_unzip(src, dst, force=False, update=True):
+ filelist = []
+ for root, dirs, files in os.walk(dst):
+ for d in dirs:
+ filelist.append(os.path.relpath(os.path.join(root, d), dst))
+ for f in files:
+ filelist.append(os.path.relpath(os.path.join(root, f), dst))
+
+ extracted_files = []
+ excluded_files = []
+ with zipfile.ZipFile(src, 'r') as zip_file:
+ for f in zip_file.namelist():
+ if not os.path.relpath(f) in filelist:
+ zip_file.extract(f, dst)
+ extracted_files.append(f)
+ else:
+ if force:
+ zip_file.extract(f, dst)
+ extracted_files.append(f)
+ else:
+ info = zip_file.getinfo(f)
+ if info.file_size != os.stat(dst + "/" + f).st_size:
+ if update:
+ zip_file.extract(f, dst)
+ extracted_files.append(f)
+ else:
+ excluded_files.append(f)
+ else:
+ excluded_files.append(f)
+ if not excluded_files:
+ message = " All files were extracted successfully"
+ for file in extracted_files:
+ if not force and update and file in filelist:
+ message = "Files with the same name on destination were replaced as they had different size (Update=True)"
+ elif force:
+ message = "All files were extracted successfully replacing any files on destination with the same name (Force=True)"
+ return False, True, message, [os.path.join(dst, file) for file in extracted_files]
+ else:
+ if not extracted_files:
+ message = "All Files Exist on Destination, Skipping Extraction... Use Force=True to Overwrite"
+ return False, False, message, [os.path.join(dst, file) for file in filelist]
+ else:
+ message = "Some Files Skipped Extraction as they Existed on destination. Use Force=True to Overwrite "
+ return False, True, message, [os.path.join(dst, file) for file in filelist]
+
+
+def run_module():
+ module_args = dict(
+ src=dict(type='str', required=True),
+ dest=dict(type='str', required=False),
+ force=dict(type='bool', required=False, default=False),
+ update=dict(type='bool', required=False, default=True),
+ )
+
+ result = dict(
+ changed=False,
+ message='',
+ failed=False,
+ files=[],
+ )
+
+ module = AnsibleModule(argument_spec=module_args)
+
+ (error, is_changed, message, files) = decompress_file(module.params)
+
+ result['changed'] = is_changed
+ result['message'] = message
+ result['failed'] = error
+ result['files'] = files
+
+ module.exit_json(**result)
+
+
+def main():
+ run_module()
+
+
+if __name__ == '__main__':
+ main()