Source code for doc_curation.pdf

"""
Curate and process pdf files.
"""
import errno
import logging
import os
import shutil
import subprocess
from pathlib import Path

from pikepdf import Pdf

from curation_utils import list_helper

## To thwart DecompressionBombError: Image size (268216326 pixels) exceeds limit of 178956970 pixels, could be decompression bomb DOS attack.
from PIL import Image
Image.MAX_IMAGE_PIXELS = None


# Remove all handlers associated with the root logger object.
for handler in logging.root.handlers[:]:
  logging.root.removeHandler(handler)
logging.basicConfig(
  level=logging.DEBUG,
  format="%(levelname)s:%(asctime)s:%(module)s:%(lineno)d %(message)s")


def _get_ocr_dir(pdf_path, small_pdf_pages=None):
  if small_pdf_pages is None:
    return os.path.join(os.path.dirname(pdf_path), f"{Path(pdf_path).stem}_splits")
  else:
    return os.path.join(os.path.dirname(pdf_path), f"{Path(pdf_path).stem}_{small_pdf_pages}_splits")


[docs]def split_into_small_pdfs(pdf_path, output_directory=None, start_page=1, end_page=None, small_pdf_pages=25): logging.info("Splitting %s into segments of %d", pdf_path, small_pdf_pages) pdf_name_stem = Path(pdf_path).stem if output_directory == None: output_directory = _get_ocr_dir(pdf_path, small_pdf_pages) # noinspection PyArgumentList with Pdf.open(pdf_path) as pdf: if end_page == None: end_page = len(pdf.pages) pages = range(start_page, end_page + 1) page_sets = list_helper.divide_chunks(list_in=pages, n=small_pdf_pages) dest_pdfs = [] for page_set in page_sets: pages = [pdf.pages[i - 1] for i in page_set] dest_pdf_path = os.path.join(output_directory, "%s_%04d-%04d.pdf" % (pdf_name_stem, page_set[0], page_set[-1])) if not os.path.exists(dest_pdf_path): # noinspection PyArgumentList dest_pdf = Pdf.new() dest_pdf.pages.extend(pages) os.makedirs(os.path.dirname(dest_pdf_path), exist_ok=True) dest_pdf.save(filename_or_stream=dest_pdf_path) else: logging.warning("%s exists", dest_pdf_path) dest_pdfs.append(dest_pdf_path) return dest_pdfs
# Adapted from https://github.com/theeko74/pdfc/blob/master/pdf_compressor.py
[docs]def compress_with_gs(input_file_path, output_file_path, power=3): """Function to compress PDF and remove text via Ghostscript command line interface :param power: 0,1,2,3,4 """ quality = { 0: '/default', 1: '/prepress', 2: '/printer', 3: '/ebook', 4: '/screen' } # Basic controls # Check if valid path if not os.path.isfile(input_file_path): logging.fatal("Error: invalid path for input PDF file") return # Check if file is a PDF by extension if input_file_path.split('.')[-1].lower() != 'pdf': logging.fatal("Error: input file is not a PDF") return logging.info("Compress PDF...") initial_size = os.path.getsize(input_file_path) try: subprocess.call(['gs', '-sDEVICE=pdfwrite', '-dCompatibilityLevel=1.4', '-dPDFSETTINGS={}'.format(quality[power]), '-dFILTERTEXT', '-dNOPAUSE', '-dQUIET', '-dBATCH', '-sOutputFile={}'.format(output_file_path), input_file_path] ) except OSError as e: if e.errno == errno.ENOENT: # handle file not found error. logging.error("ghostscript not found. Proceeding without compression.") shutil.copyfile(input_file_path, output_file_path) return else: # Something else went wrong while trying to run the command raise final_size = os.path.getsize(output_file_path) ratio = 1 - (final_size / initial_size) logging.info("Compression by {0:.0%}.".format(ratio)) logging.info("Final file size is {0:.1f}MB".format(final_size / 1000000)) return ratio
[docs]def detext_via_ps(input_file_path, output_file_path): os.makedirs(os.path.dirname(output_file_path), exist_ok=True) ps_path = input_file_path.replace(".pdf", ".ps") subprocess.call(["pdf2ps", input_file_path, ps_path]) subprocess.call(["ps2pdf", ps_path, output_file_path])
[docs]def dump_images(input_file_path, output_path): from pdf2image import convert_from_path image_segments = [str(pdf_segment) for pdf_segment in Path(_get_ocr_dir(input_file_path, 1)).glob("*.jpg")] if len(image_segments) > 0: logging.info("%d images already exist! So not dumping afresh.", len(image_segments)) return logging.info("Splitting to images: %s to %s", input_file_path, output_path) convert_from_path(input_file_path, fmt="jpeg", output_folder=output_path, output_file=os.path.splitext(os.path.basename(input_file_path))[0])
[docs]def images_to_pdf(image_dir, output_path): import img2pdf with open(output_path,"wb") as f: imgs = [] image_files = os.listdir(image_dir) image_files.sort() for fname in image_files: if not fname.endswith(".jpg"): continue path = os.path.join(image_dir, fname) if os.path.isdir(path): continue imgs.append(path) f.write(img2pdf.convert(imgs))
[docs]def detext_via_jpg(input_file_path, output_file_path): image_directory = _get_ocr_dir(input_file_path, 1) os.makedirs(image_directory, exist_ok=True) dump_images(input_file_path, image_directory) images_to_pdf(image_directory, output_file_path)
[docs]def detext_with_pdfimages(input_file_path, output_file_path): """ Sometimes does not work satisfactorily - just outputs 2 pages of many. :param input_file_path: :param output_file_path: :return: """ image_directory = _get_ocr_dir(input_file_path, 1) os.makedirs(image_directory, exist_ok=True) subprocess.call(["pdfimages", "-j", input_file_path, image_directory + "/page"]) # subprocess.call(["convert", input_file_path, image_directory + "/page%04.jpg"]) subprocess.call(["convert", image_directory + "/*", output_file_path]) shutil.rmtree(image_directory)