CI: refactor Squash and Merge with simplified branch merging (#726)

* Refactor squash and merge script for improved simplicity

Simplified the squash_and_merge.py script by replacing redundant utility functions and consolidating logic. Enhanced usability by aligning command-line arguments and leveraging streamlined git operations to improve maintainability and reliability.

* Fix argument names in squash PR script

Renamed CLI arguments from '--base' and '--source' to '--target' and '--base' to align with expected input format. This ensures the script runs correctly with proper argument mapping.

* Fix incorrect base branch argument in squash script

    Updated the `--base` argument to use `source_branch` instead of `branch` to ensure the squash script processes the correct base branch. Also adjusted the command to include `branch` as a separate argument for clarity and correctness.

* Reset to a clean state after squash error.

Add a `git reset --hard` command to ensure the repository returns to a clean state after encountering errors during the squash and merge process. This prevents lingering changes from affecting subsequent operations.

* Improve error handling in squash_and_merge_prs.py

Capture and display both stdout and stderr in error cases to provide more informative feedback. Adjust the PR comment to include available output for better debugging.

* Refactor PR squash process to enhance error handling.

Modify subprocess handling to use `result.returncode` for error checks instead of relying on exceptions. Consolidate error output retrieval and logging for better clarity, while maintaining the workflow for resetting changes on failure.

* Fix incorrect return in PR processing loop

Replaced `return` with `continue` to ensure all PRs in the loop are processed before exiting. This prevents premature termination of the function and ensures accurate success count reporting.

* Simplify subprocess output handling in squash_and_merge.py

Replaced labeled print statements with direct output of stdout and stderr. This change ensures cleaner logs and remains consistent with the function's purpose of output handling during subprocess execution.

* Update subprocess.run calls to use capture_output parameter

Replaced `stdout` and `stderr` with the `capture_output` parameter for cleaner and more concise subprocess handling. Also removed extraneous whitespace for improved code readability.

* testing moving the squash script given that it's called iteratively and switching branch might miss it

* format

---------

Co-authored-by: Jason Wen <haibin.wen3@gmail.com>
This commit is contained in:
DevTekVE
2025-03-29 22:54:49 +01:00
committed by GitHub
parent 4268d7a19c
commit 6b3f75bbf0
3 changed files with 59 additions and 361 deletions

View File

@@ -36,7 +36,7 @@ jobs:
run: |
git config --global user.name 'github-actions[bot]'
git config --global user.email 'github-actions[bot]@users.noreply.github.com'
- name: Set up SSH
uses: webfactory/ssh-agent@v0.9.0
with:
@@ -63,10 +63,10 @@ jobs:
echo "Source branch ${{ inputs.source_branch || env.DEFAULT_SOURCE_BRANCH }} does not exist!"
exit 1
fi
# Make sure we have the latest source branch
git fetch origin ${{ inputs.source_branch || env.DEFAULT_SOURCE_BRANCH }}
# Check if target branch exists
if ! git ls-remote --heads origin ${{ inputs.target_branch || env.DEFAULT_TARGET_BRANCH }} | grep -q "${{ inputs.target_branch || env.DEFAULT_TARGET_BRANCH }}"; then
echo "Target branch ${{ inputs.target_branch || env.DEFAULT_TARGET_BRANCH }} does not exist, creating it from ${{ inputs.source_branch || env.DEFAULT_SOURCE_BRANCH }}"
@@ -110,17 +110,19 @@ jobs:
}
}
}' -F label="is:pr is:open label:${PR_LABEL} sort:created-asc")
echo "PR_LIST=${PR_LIST}" >> $GITHUB_OUTPUT
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Process PRs
run: |
cp ${{ github.workspace }}/release/ci/squash_and_merge.py /tmp/squash_and_merge.py && \
chmod +x /tmp/squash_and_merge.py && \
python3 ${{ github.workspace }}/release/ci/squash_and_merge_prs.py \
--pr-data '${{ steps.get-prs.outputs.PR_LIST }}' \
--target-branch ${{ inputs.target_branch || env.DEFAULT_TARGET_BRANCH }} \
--squash-script-path '${{ github.workspace }}/release/ci/squash_and_merge.py'
--squash-script-path '/tmp/squash_and_merge.py'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -1,360 +1,53 @@
#!/usr/bin/env python3
import argparse
import subprocess
import sys
import shutil
import signal
import contextlib
import tempfile
import os
import argparse
def run_command(command: str) -> tuple[int, str, str]:
"""Run a shell command and return exit code, stdout, and stderr."""
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate()
return process.returncode, stdout.strip(), stderr.strip()
def is_gh_available() -> bool:
"""Check if GitHub CLI is available."""
return shutil.which('gh') is not None
def get_current_branch() -> str | None:
"""Get the name of the current git branch."""
code, output, error = run_command("git rev-parse --abbrev-ref HEAD")
if code != 0:
print(f"Error getting current branch: {error}")
return None
return output
def backup_branch(branch_name: str) -> bool:
"""Create a backup of the current branch."""
backup_name = f"{branch_name}-backup-$(date +%Y%m%d_%H%M%S)"
code, _, error = run_command(f"git branch {backup_name}")
if code != 0:
print(f"Error creating backup branch: {error}")
return False
print(f"Created backup branch: {backup_name}")
return True
def get_commit_messages(source_branch: str, target_branch: str) -> list[str] | None:
"""Get all commit messages between source and target branches."""
code, output, error = run_command(f"git log {target_branch}..{source_branch} --format=%B")
if code != 0:
print(f"Error getting commit messages: {error}")
return None
return [msg.strip() for msg in output.splitlines() if msg and not msg.startswith('Merge')]
def get_pr_info(branch_name: str) -> str | None:
"""Get PR title using GitHub CLI."""
if not is_gh_available():
print("Warning: GitHub CLI not found. Install it to auto-fetch PR titles:")
print(" https://cli.github.com/")
return None
# Try to get PR info using gh cli
code, output, error = run_command(f"gh pr view --json title --jq .title {branch_name}")
if code != 0:
print(f"No open PR found for branch '{branch_name}'")
return None
return output
def create_squash_message(pr_title: str | None, commit_messages: list[str], source_branch: str) -> str:
"""Create a squash commit message from PR title and commit messages."""
parts = []
# Add PR title if provided
if pr_title:
parts.append(pr_title)
else:
parts.append(f"Squashed changes from {source_branch}")
parts.append("") # Empty line after title
# Add original commits section
if commit_messages:
parts.append("Original commits:")
parts.append("") # Empty line before list
parts.extend(f"* {msg}" for msg in commit_messages)
return '\n'.join(parts)
def prompt_for_title() -> str:
"""Prompt user for a commit title."""
return input("Enter commit title (or press Enter to use default): ").strip()
@contextlib.contextmanager
def workspace_manager(original_branch: str):
"""Context manager to handle workspace state and cleanup."""
stash_created = False
stash_restored = False
temp_branch: str | None = None
def cleanup_handler(signum=None, frame=None):
"""Clean up workspace state."""
nonlocal temp_branch, stash_created, stash_restored
try:
if signum and stash_restored:
# If we're handling Ctrl+C but stash was already restored,
# just clean up branches and exit
current = get_current_branch()
if current and current != original_branch:
run_command(f"git checkout {original_branch}")
if temp_branch:
run_command(f"git branch -D {temp_branch}")
print("\nOperation interrupted, but changes were already restored.")
sys.exit(3)
# First, switch back to original branch
current = get_current_branch()
if current and current != original_branch:
run_command(f"git checkout {original_branch}")
# Then clean up temp branch
if temp_branch:
run_command(f"git branch -D {temp_branch}")
# Finally, restore stash if needed - AFTER switching branches
if stash_created and not stash_restored:
print("Restoring your uncommitted changes...")
code, stash_list, _ = run_command("git stash list")
if code == 0 and "Automatic stash by squash script" in stash_list:
run_command("git stash pop")
stash_restored = True
stash_created = False
if signum:
print("\nOperation interrupted. Cleaned up and restored original state.")
sys.exit(4)
except Exception as e:
print(f"Error during cleanup: {e}")
if signum:
sys.exit(5)
try:
# Set up signal handlers
signal.signal(signal.SIGINT, cleanup_handler)
signal.signal(signal.SIGTERM, cleanup_handler)
# Check for changes (including untracked files)
code, output, _ = run_command("git status --porcelain")
if output:
print("Stashing uncommitted changes...")
run_command("git stash push -u -m 'Automatic stash by squash script'")
stash_created = True
yield lambda x: setattr(x, 'temp_branch', temp_branch)
except Exception as e:
print(f"\nError occurred: {str(e)}")
cleanup_handler()
raise
finally:
cleanup_handler()
def create_commit_with_message(message: str) -> bool:
"""Create a commit with the given message using a temporary file."""
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write(message)
temp_path = f.name
# Use the temporary file for the commit message
code, _, error = run_command(f"git commit -F {temp_path}")
os.unlink(temp_path) # Clean up the temp file
if code != 0:
print(f"Error creating commit: {error}")
return False
return True
except Exception as e:
print(f"Error handling commit message: {e}")
if os.path.exists(temp_path):
os.unlink(temp_path)
return False
def squash_and_merge(source_branch: str, target_branch: str, manual_title: str | None, backup: bool = False, push: bool = False) -> bool:
def run_git_command(command, check=True):
"""
Squash the source branch and merge into target branch.
Runs a git command and returns the trimmed stdout output.
Exits the script if the command fails.
"""
# Get original branch right away
original_branch = get_current_branch()
if not original_branch:
return False
class State:
temp_branch: str | None = None
state = State()
with workspace_manager(original_branch) as set_temp_branch:
# Validate source branch exists
code, _, error = run_command(f"git rev-parse --verify {source_branch}")
if code != 0:
print(f"Error: Source branch {source_branch} not found")
return False
if source_branch == target_branch:
print(f"Error: Source and target branches cannot be the same ({source_branch})")
return False
# Ensure target branch exists
code, _, error = run_command(f"git rev-parse --verify {target_branch}")
if code != 0:
print(f"Error: Target branch {target_branch} not found")
return False
# Find merge base
code, merge_base, error = run_command(f"git merge-base {target_branch} {source_branch}")
if code != 0:
print(f"Error finding merge base: {error}")
return False
# Create backup unless explicitly skipped
if backup and not backup_branch(source_branch):
return False
# Get commit messages
commit_messages = get_commit_messages(source_branch, target_branch)
if commit_messages is None:
return False
# Get title (priority: manual title > PR title > prompt user)
title = manual_title
if not title:
title = get_pr_info(source_branch)
if not title:
title = prompt_for_title()
try:
# Create and switch to temporary branch
temp_branch = f"temp-squash-{source_branch}"
state.temp_branch = temp_branch
set_temp_branch(state)
print(f"\nCreating temporary branch {temp_branch}...")
code, _, error = run_command(f"git checkout -b {temp_branch} {source_branch}")
if code != 0:
print(f"Error creating temp branch: {error}")
return False
print("Preparing squash by resetting temporary branch to merge base...")
code, _, error = run_command(f"git reset --soft {merge_base}")
if code != 0:
print(f"Error resetting for squash: {error}")
return False
# Create commit with message
print("Creating squash commit...")
squash_message = create_squash_message(title, commit_messages, source_branch)
if not create_commit_with_message(squash_message):
return False
# Switch to target and try merge
print(f"\nSwitching to target branch {target_branch}...")
code, _, error = run_command(f"git checkout {target_branch}")
if code != 0:
print(f"Error checking out target branch: {error}")
return False
print(f"Attempting to merge changes from {temp_branch}...")
code, _, error = run_command(f"git rebase {temp_branch}")
if code != 0:
print(f"\nMerge failed with error: {error}")
print("\nThe squash was successful, and your changes are preserved in the temporary branch.")
print("To complete the merge manually, follow these steps:")
print(f"\n1. Your squashed changes are in branch: '{temp_branch}'")
print(f"2. The target branch is: '{target_branch}'")
print("\nTo resolve the conflicts:")
print(f" git checkout {target_branch}")
print(f" git merge {temp_branch}")
print(" # resolve conflicts in your editor")
print(" git add <resolved-files>")
print(" git commit")
print(f" git push origin {target_branch} # when ready to push")
print("\nTo clean up after successful merge:")
print(f" git branch -D {temp_branch}")
# Make sure to abort the merge
print("\nAborting current merge attempt...")
run_command("git merge --abort")
# Return to original branch, but keep temp branch
print(f"Returning to {original_branch}...")
run_command(f"git checkout {original_branch}")
return False
# Clean up temp branch on success
run_command(f"git branch -D {temp_branch}")
# Push if requested
if push:
code, _, error = run_command(f"git push origin {target_branch}")
if code != 0:
print(f"Error pushing to {target_branch}: {error}")
return False
print(f"Successfully pushed to {target_branch}")
else:
print(f"Changes squashed and merged into {target_branch} locally")
print(f"To push the changes: git push origin {target_branch}")
# Return to original branch
code, _, error = run_command(f"git checkout {original_branch}")
if code != 0:
print(f"Warning: Failed to return to original branch: {error}")
return False
return True
except Exception as e:
print(f"Error during squash process: {e}")
return False
print(f"Running: {' '.join(command)}")
result = subprocess.run(command, capture_output=True, text=True)
if check and result.returncode != 0:
print(result.stdout.strip())
print(result.stderr.strip())
sys.exit(result.returncode)
return result.stdout.strip()
def main():
parser = argparse.ArgumentParser(
description='Squash branch and merge into target branch'
)
parser.add_argument('--target', '-t', required=True,
help='Target branch to merge changes into')
parser.add_argument('--source', '-s',
help='Source branch to squash (default: current branch)')
parser.add_argument('--title', '-m',
help='Optional manual title (overrides PR title)')
parser.add_argument('--backup', action='store_true',
help='Creates a backup branch for the source branch')
parser.add_argument('--push', action='store_true',
help='Push changes to remote after squashing')
parser = argparse.ArgumentParser(description="Merge multiple branches with squash merges.")
parser.add_argument("--base", required=True, help="The base branch name from which the target branch will be created.")
parser.add_argument("--target", required=True, help="The target branch name to merge into.")
parser.add_argument("--title", required=False, help="Title for the commit")
args, unknown = parser.parse_known_args()
parser.add_argument("branches", nargs="+", help="List of branch names to merge into the target branch.")
args = parser.parse_args()
# Determine source branch early
source_branch = args.source
if not source_branch:
source_branch = get_current_branch()
if not source_branch:
sys.exit(1)
# Checkout the base branch to ensure a common starting point.
run_git_command(["git", "checkout", args.base])
if not squash_and_merge(source_branch, args.target, args.title, args.backup, args.push):
sys.exit(2)
# Check if the target branch exists. If not, create it from the base branch.
branch_list = run_git_command(["git", "branch"], check=False)
branch_names = [line.strip("* ").strip() for line in branch_list.splitlines()]
if args.target in branch_names:
run_git_command(["git", "checkout", args.target])
else:
run_git_command(["git", "checkout", "-b", args.target])
# Iterate over each branch, merging it with a squash merge.
for branch in args.branches:
print(f"Merging branch '{branch}' with a squash merge.")
# Merge the branch without creating a merge commit.
run_git_command(["git", "merge", "--squash", branch])
# Commit the squashed changes with an appropriate message.
commit_message = args.title or f"Squashed merge of branch '{branch}'"
run_git_command(["git", "commit", "-m", commit_message])
print(f"All branches have been merged with squashed commits into '{args.target}'.")
if __name__ == "__main__":

View File

@@ -80,7 +80,6 @@ def add_pr_comment(pr_number, comment):
print(f"Failed to parse comments data for PR #{pr_number}")
def validate_pr(pr):
"""Validate a PR and return (is_valid, skip_reason)"""
pr_number = pr.get('number', 'UNKNOWN')
@@ -143,26 +142,30 @@ def process_pr(pr_data, source_branch, target_branch, squash_script_path):
subprocess.run(['git', 'branch', branch, f'origin/{branch}'], check=True)
# Run squash script
subprocess.run([
result = subprocess.run([
squash_script_path,
'--target', target_branch,
'--source', branch,
'--base', source_branch,
'--title', f"{title} (PR-{pr_number})",
], check=True)
branch,
], capture_output=True, text=True)
print(f"Successfully processed PR #{pr_number}")
success_count += 1
print(result.stdout)
if result.returncode == 0:
print(f"Successfully processed PR #{pr_number}")
success_count += 1
continue
except subprocess.CalledProcessError as e:
print(f"Error processing PR #{pr_number}:")
print(f"Command failed with exit code {e.returncode}")
error_output = getattr(e, 'stderr', 'No error output available')
print(f"Error output: {error_output}")
add_pr_comment(pr_number,
f"⚠️ Error during automated `{target_branch}` squash:\n```\n{error_output}\n```")
print(f"Command failed with exit code {result.returncode}")
output = result.stdout
print(f"Error output: {output}")
add_pr_comment(pr_number, f"⚠️ Error during automated `{target_branch}` squash:\n```\n{output}\n```")
subprocess.run(['git', 'reset', '--hard'], check=True)
continue
except Exception as e:
print(f"Unexpected error processing PR #{pr_number}: {str(e)}")
subprocess.run(['git', 'reset', '--hard'], check=True)
continue
return success_count