Implement git pulling with Tokio (#408)

This commit is contained in:
Roey Darwish Dror
2020-05-15 11:11:28 +03:00
committed by GitHub
parent a981aad3df
commit f58b2a0c20
4 changed files with 175 additions and 94 deletions

View File

@@ -1,19 +1,19 @@
use crate::error::{SkipStep, TopgradeError};
use crate::error::SkipStep;
use crate::execution_context::ExecutionContext;
use crate::executor::{CommandExt, RunType};
use crate::terminal::print_separator;
use crate::utils::{which, PathExt};
use anyhow::Result;
use console::style;
use futures::future::try_join_all;
use glob::{glob_with, MatchOptions};
use log::{debug, error};
use std::collections::HashSet;
use std::io;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::thread::sleep;
use std::time::Duration;
use std::process::{Command, Output};
use tokio::process::Command as AsyncCommand;
use tokio::runtime;
#[cfg(windows)]
static PATH_PREFIX: &str = "\\\\?\\";
@@ -23,18 +23,77 @@ pub struct Git {
git: Option<PathBuf>,
}
struct PullProcess {
child: Child,
repo: String,
before_revision: Option<String>,
}
pub struct Repositories<'a> {
git: &'a Git,
repositories: HashSet<String>,
glob_match_options: MatchOptions,
}
fn check_output(output: Output) -> std::result::Result<(), String> {
if !(output.status.success()) {
let stderr = String::from_utf8(output.stderr).unwrap();
Err(stderr)
} else {
Ok(())
}
}
async fn pull_repository(repo: String, git: &PathBuf, ctx: &ExecutionContext<'_>) -> Result<()> {
let path = repo.to_string();
let before_revision = get_head_revision(git, &repo);
println!("{} {}", style("Pulling").cyan().bold(), path);
let mut command = AsyncCommand::new(git);
command.args(&["pull", "--ff-only"]).current_dir(&repo);
if let Some(extra_arguments) = ctx.config().git_arguments() {
command.args(extra_arguments.split_whitespace());
}
let pull_output = command.output().await?;
let submodule_output = AsyncCommand::new(git)
.args(&["submodule", "update", "--recursive"])
.current_dir(&repo)
.output()
.await?;
let result = check_output(pull_output).and_then(|_| check_output(submodule_output));
if let Err(message) = result {
println!("{} pulling {}", style("Failed").red().bold(), &repo);
print!("{}", message);
} else {
let after_revision = get_head_revision(&git, &repo);
match (&before_revision, &after_revision) {
(Some(before), Some(after)) if before != after => {
println!("{} {}:", style("Changed").yellow().bold(), &repo);
Command::new(&git)
.current_dir(&repo)
.args(&[
"--no-pager",
"log",
"--no-decorate",
"--oneline",
&format!("{}..{}", before, after),
])
.spawn()
.unwrap()
.wait()
.unwrap();
println!();
}
_ => {
println!("{} {}", style("Up-to-date").green().bold(), &repo);
}
}
}
Ok(())
}
fn get_head_revision(git: &Path, repo: &str) -> Option<String> {
Command::new(git)
.args(&["rev-parse", "HEAD"])
@@ -130,7 +189,7 @@ impl Git {
return Ok(());
}
let mut processes: Vec<_> = repositories
let futures: Vec<_> = repositories
.repositories
.iter()
.filter(|repo| match has_remotes(git, repo) {
@@ -144,87 +203,13 @@ impl Git {
}
_ => true, // repo has remotes or command to check for remotes has failed. proceed to pull anyway.
})
.filter_map(|repo| {
let repo = repo.clone();
let path = repo.to_string();
let before_revision = get_head_revision(git, &repo);
println!("{} {}", style("Pulling").cyan().bold(), path);
let mut command = Command::new(git);
command.args(&["pull", "--ff-only"]).current_dir(&repo);
if let Some(extra_arguments) = ctx.config().git_arguments() {
command.args(extra_arguments.split_whitespace());
}
command
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.map(|child| PullProcess {
child,
repo,
before_revision,
})
.ok()
})
.map(|repo| pull_repository(repo.clone(), &git, ctx))
.collect();
let mut success = true;
while !processes.is_empty() {
let mut remaining_processes = Vec::<PullProcess>::with_capacity(processes.len());
for mut p in processes {
if let Some(status) = p.child.try_wait().unwrap() {
if status.success() {
let after_revision = get_head_revision(&git, &p.repo);
let mut basic_rt = runtime::Runtime::new()?;
basic_rt.block_on(async { try_join_all(futures).await })?;
match (&p.before_revision, &after_revision) {
(Some(before), Some(after)) if before != after => {
println!("{} {}:", style("Changed").yellow().bold(), &p.repo);
Command::new(&git)
.current_dir(&p.repo)
.args(&[
"--no-pager",
"log",
"--no-decorate",
"--oneline",
&format!("{}..{}", before, after),
])
.spawn()
.unwrap()
.wait()
.unwrap();
println!();
}
_ => {
println!("{} {}", style("Up-to-date").green().bold(), &p.repo);
}
}
} else {
success = false;
println!("{} pulling {}", style("Failed").red().bold(), &p.repo);
let mut stderr = String::new();
if p.child.stderr.unwrap().read_to_string(&mut stderr).is_ok() {
print!("{}", stderr);
}
}
} else {
remaining_processes.push(p);
}
}
processes = remaining_processes;
sleep(Duration::from_millis(200));
}
if !success {
Err(TopgradeError::PullFailed.into())
} else {
Ok(())
}
Ok(())
}
}