#!/usr/bin/env python3
# encoding: utf-8
Python Utils for releasing the Checker Framework
This contains no main method only utility functions
Created by Jonathan Burke 11/21/2012
Copyright (c) 2012 University of Washington
import urllib.request
import urllib.error
import urllib.parse
import re
import subprocess
import os
import os.path
import shutil
from release_vars import execute
# =========================================================================================
# Parse Args Utils # TODO: Perhaps use argparse module
def read_command_line_option(argv, argument):
"""Returns True if the given command line arguments contain the specified
argument, False otherwise."""
for index in range(1, len(argv)):
if argv[index] == argument:
return True
return False
# =========================================================================================
# Command utils
def execute_write_to_file(
command_args, output_file_path, halt_if_fail=True, working_dir=None
"""Execute the given command, capturing the output to the given file."""
print("Executing: %s" % (command_args))
import shlex
args = shlex.split(command_args) if isinstance(command_args, str) else command_args
output_file = open(output_file_path, "w+")
process = subprocess.Popen(
args, stdout=output_file, stderr=output_file, cwd=working_dir
if process.returncode != 0 and halt_if_fail:
raise Exception(
"Error %s while executing %s" % (process.returncode, command_args)
def check_command(command):
"""Executes the UNIX \"which\" command to determine whether the given command
is installed and on the PATH."""
p = execute(["which", command], False)
if p:
raise AssertionError("command not found: %s" % command)
def prompt_yes_no(msg, default=False):
"""Prints the given message and continually prompts the user until they
answer yes or no. Returns true if the answer was yes, false otherwise."""
default_str = "no"
if default:
default_str = "yes"
result = prompt_w_default(msg, default_str, "^(Yes|yes|No|no)$")
if result == "yes" or result == "Yes":
return True
return False
def prompt_yn(msg):
"""Prints the given message and continually prompts the user until they
answer y or n. Returns true if the answer was y, false otherwise."""
y_or_n = "z"
while y_or_n != "y" and y_or_n != "n":
print(msg + " [y|n]")
y_or_n = input().lower()
return y_or_n == "y"
def prompt_to_continue():
"Prompts the user to continue, until they enter yes."
while not prompt_yes_no("Continue?"):
def prompt_w_default(msg, default, valid_regex=None):
"""Only accepts answers that match valid_regex.
If default is None, requires an answer."""
answer = None
while answer is None:
answer = input(msg + " (%s): " % default)
if answer is None or answer == "":
answer = default
answer = answer.strip()
if valid_regex is not None:
m = re.match(valid_regex, answer)
if m is None:
answer = None
print("Invalid answer. Validating regex: " + valid_regex)
answer = default
return answer
def check_tools(tools):
"""Given an array specifying a set of tools, verify that the tools are
installed and on the PATH."""
print("\nChecking to make sure the following programs are installed:")
print(", ".join(tools))
"Note: If you are NOT working on then you "
+ "likely need to change the variables that are set in\n"
+ 'Search for "Set environment variables".'
list(map(check_command, tools))
def continue_or_exit(msg):
"Prompts the user whether to continue executing the script."
continue_script = prompt_w_default(
msg + " Continue ('no' will exit the script)?", "yes", "^(Yes|yes|No|no)$"
if continue_script == "no" or continue_script == "No":
raise Exception("User elected NOT to continue at prompt: " + msg)
# =========================================================================================
# Version Utils
# From, but doesn't strip trailing zeroes
def version_number_to_array(version_num):
"""Given a version number, return an array of the elements, as integers."""
return [int(x) for x in version_num.split(".")]
def increment_version(version_num):
Returns the next incremental version after the argument.
# Drop the fourth and subsequent parts if present
version_array = version_number_to_array(version_num)[:3]
version_array[-1] = version_array[-1] + 1
return ".".join(str(x) for x in version_array)
def test_increment_version():
"""Run test cases to ensure that increment_version works correctly."""
assert increment_version("1.0.3") == "1.0.4"
assert increment_version("1.0.9") == "1.0.10"
assert increment_version("1.1.9") == "1.1.10"
assert increment_version("1.3.0") == "1.3.1"
assert increment_version("1.3.1") == "1.3.2"
assert increment_version("1.9.9") == "1.9.10"
assert increment_version("3.6.22") == "3.6.23"
assert increment_version("3.22.6") == "3.22.7"
assert increment_version("") == "1.0.4"
assert increment_version("") == "1.0.10"
assert increment_version("") == "1.1.10"
assert increment_version("") == "1.3.1"
assert increment_version("") == "1.3.2"
assert increment_version("") == "1.9.10"
assert increment_version("") == "3.6.23"
assert increment_version("") == "3.22.7"
def current_distribution_by_website(site):
Reads the checker framework version from the checker framework website and
returns the version of the current release.
print("Looking up checker-framework-version from %s\n" % site)
ver_re = re.compile(
r"<!-- checker-framework-zip-version -->checker-framework-(.*)\.zip"
text = urllib.request.urlopen(url=site).read().decode("utf-8")
result =
# =========================================================================================
# Git Utils
def git_bare_repo_exists_at_path(
): # Bare git repos have no .git directory but they have a refs directory
"Returns whether a bare git repository exists at the given filesystem path."
if os.path.isdir(repo_root + "/refs"):
return True
return False
def git_repo_exists_at_path(repo_root):
"""Returns whether a (bare or non-bare) git repository exists at the given
filesystem path."""
return os.path.isdir(repo_root + "/.git") or git_bare_repo_exists_at_path(repo_root)
def push_changes_prompt_if_fail(repo_root):
"""Attempt to push changes, including tags, that were committed to the
repository at the given filesystem path. In case of failure, ask the user
if they would like to try again. Loop until pushing changes succeeds or the
user answers opts to not try again."""
while True:
cmd = "(cd %s && git push --tags)" % repo_root
result = os.system(cmd)
cmd = "(cd %s && git push origin master)" % repo_root
result = os.system(cmd)
if result == 0:
"Could not push from: "
+ repo_root
+ "; result="
+ str(result)
+ " for command: "
+ cmd
+ "` in "
+ os.getcwd()
if not prompt_yn(
"Try again (responding 'n' will skip this push command but will not exit the script) ?"
def push_changes(repo_root):
"""Pushes changes, including tags, that were committed to the repository at
the given filesystem path."""
execute("git push --tags", working_dir=repo_root)
execute("git push origin master", working_dir=repo_root)
def update_repo(path, bareflag):
"""Pull the latest changes to the given repo and update. The bareflag
parameter indicates whether the updated repo must be a bare git repo."""
if bareflag:
execute("git fetch origin master:master", working_dir=path)
execute("git pull", working_dir=path)
def commit_tag_and_push(version, path, tag_prefix):
"""Commit the changes made for this release, add a tag for this release, and
push these changes."""
execute('git commit -a -m "new release %s"' % (version), working_dir=path)
execute("git tag %s%s" % (tag_prefix, version), working_dir=path)
def clone_from_scratch_or_update(src_repo, dst_repo, clone_from_scratch, bareflag):
"""If the clone_from_scratch flag is True, clone the given git or
Mercurial repo from scratch into the filesystem path specified by dst_repo,
deleting it first if the repo is present on the filesystem.
Otherwise, if a repo exists at the filesystem path given by dst_repo, pull
the latest changes to it and update it. If the repo does not exist, clone it
from scratch. The bareflag parameter indicates whether the cloned/updated
repo must be a bare git repo."""
if clone_from_scratch:
delete_and_clone(src_repo, dst_repo, bareflag)
if os.path.exists(dst_repo):
update_repo(dst_repo, bareflag)
clone(src_repo, dst_repo, bareflag)
def delete_and_clone(src_repo, dst_repo, bareflag):
"""Clone the given git or Mercurial repo from scratch into the filesystem
path specified by dst_repo. If a repo exists at the filesystem path given
by dst_repo, delete it first. The bareflag parameter indicates whether
the cloned repo must be a bare git repo."""
clone(src_repo, dst_repo, bareflag)
def clone(src_repo, dst_repo, bareflag):
"""Clone the given git or Mercurial repo from scratch into the filesystem
path specified by dst_repo. The bareflag parameter indicates whether the
cloned repo must be a bare git repo."""
flags = ""
if bareflag:
flags = "--bare"
execute("git clone --quiet %s %s %s" % (flags, src_repo, dst_repo))
def is_repo_cleaned_and_updated(repo):
"""IMPORTANT: this function is not known to be fully reliable in ensuring
that a repo is fully clean of all changes, such as committed tags. To be
certain of success throughout the release_build and release_push process,
the best option is to clone repositories from scratch.
Returns whether the repository at the given filesystem path is clean (i.e.
there are no committed changes and no untracked files in the working tree)
and up-to-date with respect to the repository it was cloned from."""
# The idiom "not execute(..., capture_output=True)" evaluates to True when the captured output is empty.
if git_bare_repo_exists_at_path(repo):
execute("git fetch origin", working_dir=repo)
is_updated = not execute(
"git diff master..FETCH_HEAD", working_dir=repo, capture_output=True
return is_updated
# Could add "--untracked-files=no" to this command
is_clean = not execute(
"git status --porcelain", working_dir=repo, capture_output=True
execute("git fetch origin", working_dir=repo)
is_updated = not execute(
"git diff origin/master..master", working_dir=repo, capture_output=True
return is_clean and is_updated
def check_repos(repos, fail_on_error, is_intermediate_repo_list):
"""Fail if the repository is not clean and up to date."""
for repo in repos:
if git_repo_exists_at_path(repo):
if not is_repo_cleaned_and_updated(repo):
if is_intermediate_repo_list:
"\nWARNING: Intermediate repository "
+ repo
+ " is not up to date with respect to the live repository.\n"
+ "A separate warning will not be issued for a build repository that is cloned off of the intermediate repository."
if fail_on_error:
raise Exception("repo %s is not cleaned and updated!" % repo)
if not prompt_yn(
"%s is not clean and up to date! Continue (answering 'n' will exit the script)?"
% repo
raise Exception(
"%s is not clean and up to date! Halting!" % repo
def get_tag_line(lines, revision, tag_prefixes):
"""Get the revision hash for the tag matching the given project revision in
the given lines containing revision hashes. Uses the given array of tag
prefix strings if provided. For example, given an array of tag prefixes
[\"checker-framework-\", \"checkers-\"] and project revision \"2.0.0\", the
tags named \"checker-framework-2.0.0\" and \"checkers-2.0.0\" are sought."""
for line in lines:
for prefix in tag_prefixes:
full_tag = prefix + revision
if line.startswith(full_tag):
return line
return None
def get_commit_for_tag(revision, repo_file_path, tag_prefixes):
"""Get the commit hash for the tag matching the given project revision of
the Git repository at the given filesystem path. Uses the given array of
tag prefix strings if provided. For example, given an array of tag prefixes
[\"checker-framework-\", \"checkers-\"] and project revision \"2.0.0\", the
tags named \"checker-framework-2.0.0\" and \"checkers-2.0.0\" are sought."""
# assume the first is the most recent
tags = execute(
"git rev-list " + tag_prefixes[0] + revision,
lines = tags.splitlines()
commit = lines[0]
if commit is None:
msg = "Could not find revision %s in repo %s using tags %s " % (
raise Exception(msg)
return commit
# =========================================================================================
# File Utils
def wget_file(source_url, destination_dir):
"""Download a file from the source URL to the given destination directory.
Useful since download_binary does not seem to work on source files."""
print("DEST DIR: " + destination_dir)
execute("wget %s" % source_url, True, False, destination_dir)
def download_binary(source_url, destination):
"""Download a file from the given URL and save its contents to the
destination filename."""
http_response = urllib.request.urlopen(url=source_url)
content_length = http_response.headers["content-length"]
if content_length is None:
raise Exception("No content-length when downloading: " + source_url)
dest_file = open(destination, "wb")
def read_first_line(file_path):
"Return the first line in the given file. Assumes the file exists."
infile = open(file_path, "r")
first_line = infile.readline()
return first_line
def ensure_group_access(path):
"Give group access to all files and directories under the specified path"
# Errs for any file not owned by this user.
# But, the point is to set group writeability of any *new* files.
execute("chmod -f -R g+rw %s" % path, halt_if_fail=False)
def ensure_user_access(path):
"Give the user access to all files and directories under the specified path"
execute("chmod -f -R u+rwx %s" % path, halt_if_fail=True)
def set_umask():
'Equivalent to executing "umask g+rw" from the command line.'
os.umask(os.umask(0) & 0b001111)
def delete(file_to_delete):
"Delete the specified file."
def delete_if_exists(file_to_delete):
"Check if the specified file exists, and if so, delete it."
if os.path.exists(file_to_delete):
def delete_path(path):
"Delete all files and directories under the specified path."
def delete_path_if_exists(path):
"""Check if the specified path exists, and if so, delete all files and
directories under it."""
if os.path.exists(path):
def are_in_file(file_path, strs_to_find):
"""Returns true if every string in the given strs_to_find array is found in
at least one line in the given file. In particular, returns true if
strs_to_find is empty. Note that the strs_to_find parameter is mutated."""
infile = open(file_path)
for line in infile:
if len(strs_to_find) == 0:
return True
index = 0
while index < len(strs_to_find):
if strs_to_find[index] in line:
del strs_to_find[index]
index = index + 1
return len(strs_to_find) == 0
def insert_before_line(to_insert, file_path, line):
"""Insert the given line to the given file before the given 0-indexed line
mid_line = line - 1
with open(file_path) as infile:
content = infile.readlines()
output = open(file_path, "w")
for i in range(0, mid_line):
for i in range(mid_line, len(content)):
def create_empty_file(file_path):
"Creates an empty file with the given filename."
dest_file = open(file_path, "wb")
# =========================================================================================
# Misc. Utils
def print_step(step):
"Print a step in the release_build or release_push script."
dashStr = ""
for dummy in range(0, len(step)):
dashStr += "-"
def get_announcement_email(version):
"""Return the template for the e-mail announcing a new release of the
Checker Framework."""
return """
Subject: Release %s of the Checker Framework
We have released a new version of the Checker Framework.
The Checker Framework lets you create and/or run pluggable type checkers, in order to detect and prevent bugs in your code.
You can find documentation and download links at:
Changes for Checker Framework version %s:
<<Insert latest Checker Framework changelog entry, omitting the first line with the release version and date, and with hard line breaks removed>>
""" % (
# =========================================================================================
# Testing
def test_release_utils():
"Test that critical methods in this file work as expected."
# Tests run every time this file is loaded