remove unused replication code (moved to pve-manager)
This commit is contained in:
128
PVE/CLI/pvesr.pm
128
PVE/CLI/pvesr.pm
@ -1,128 +0,0 @@
|
||||
package PVE::CLI::pvesr;
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
use POSIX qw(strftime);
|
||||
|
||||
use PVE::JSONSchema qw(get_standard_option);
|
||||
use PVE::INotify;
|
||||
use PVE::RPCEnvironment;
|
||||
use PVE::Tools qw(extract_param);
|
||||
use PVE::SafeSyslog;
|
||||
use PVE::CLIHandler;
|
||||
|
||||
use PVE::API2::Storage::Replication;
|
||||
|
||||
use base qw(PVE::CLIHandler);
|
||||
|
||||
my $nodename = PVE::INotify::nodename();
|
||||
|
||||
sub setup_environment {
|
||||
PVE::RPCEnvironment->setup_default_cli_env();
|
||||
}
|
||||
|
||||
my $print_job_list = sub {
|
||||
my ($list) = @_;
|
||||
|
||||
printf("%-10s%-20s%-20s%-5s%-10s%-5s\n",
|
||||
"VMID", "DEST", "LAST SYNC","IVAL", "STATE", "FAIL");
|
||||
|
||||
foreach my $job (sort { $a->{vmid} <=> $b->{vmid} } @$list) {
|
||||
|
||||
my $timestr = strftime("%Y-%m-%d_%H:%M:%S", localtime($job->{lastsync}));
|
||||
|
||||
printf("%-9s ", $job->{vmid});
|
||||
printf("%-19s ", $job->{tnode});
|
||||
printf("%-19s ", $timestr);
|
||||
printf("%-4s ", $job->{interval});
|
||||
printf("%-9s ", $job->{state});
|
||||
printf("%-9s\n", $job->{fail});
|
||||
}
|
||||
};
|
||||
|
||||
sub set_list {
|
||||
my ($list, $synctime, $vmid) = @_;
|
||||
|
||||
if (defined($list->{$synctime})) {
|
||||
$list = set_list($list,$synctime+1, $vmid);
|
||||
} else {
|
||||
$list->{$synctime} = $vmid;
|
||||
}
|
||||
return $list;
|
||||
}
|
||||
|
||||
my $get_replica_list = sub {
|
||||
|
||||
my $jobs = PVE::ReplicationTools::read_state();
|
||||
my $list = {};
|
||||
|
||||
foreach my $vmid (keys %$jobs) {
|
||||
my $job = $jobs->{$vmid};
|
||||
my $lastsync = $job->{lastsync};
|
||||
|
||||
# interval in min
|
||||
my $interval = $job->{interval};
|
||||
my $now = time();
|
||||
my $fail = $job->{fail};
|
||||
|
||||
my $synctime = $lastsync + $interval * 60;
|
||||
|
||||
if ($now >= $synctime && $job->{state} eq 'ok') {
|
||||
$list = set_list($list, $synctime, $vmid);
|
||||
} elsif ($job->{state} eq 'sync') {
|
||||
|
||||
my $synctime += $interval * ($job->{fail}+1);
|
||||
$list = set_list($list, $synctime, $vmid)
|
||||
if ($now >= $synctime);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return $list;
|
||||
};
|
||||
|
||||
my $replicate_vms = sub {
|
||||
my ($list) = @_;
|
||||
|
||||
my @sorted_times = reverse sort keys %$list;
|
||||
|
||||
foreach my $synctime (@sorted_times) {
|
||||
eval {
|
||||
PVE::ReplicationTools::sync_guest($list->{$synctime});
|
||||
};
|
||||
if (my $err = $@) {
|
||||
syslog ('err', $err );
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
__PACKAGE__->register_method ({
|
||||
name => 'run',
|
||||
path => 'run',
|
||||
method => 'POST',
|
||||
description => "This method will run by the systemd-timer and sync all jobs",
|
||||
protected => 1,
|
||||
parameters => {
|
||||
additionalProperties => 0,
|
||||
properties => {
|
||||
},
|
||||
},
|
||||
returns => { type => 'null' },
|
||||
code => sub {
|
||||
|
||||
my $list = &$get_replica_list();
|
||||
&$replicate_vms($list);
|
||||
|
||||
return undef;
|
||||
}});
|
||||
|
||||
our $cmddef = {
|
||||
jobs => [ 'PVE::API2::Storage::Replication', 'jobs' , [],
|
||||
{node => $nodename}, $print_job_list ],
|
||||
|
||||
destroyjob => [ 'PVE::API2::Storage::Replication', 'destroy_job' , ['vmid']],
|
||||
|
||||
run => [ __PACKAGE__ , 'run'],
|
||||
};
|
||||
|
||||
1;
|
||||
@ -1,523 +0,0 @@
|
||||
package PVE::ReplicationTools;
|
||||
|
||||
use warnings;
|
||||
use strict;
|
||||
use Data::Dumper;
|
||||
use JSON;
|
||||
|
||||
use PVE::INotify;
|
||||
use PVE::Tools;
|
||||
use PVE::Cluster;
|
||||
use PVE::QemuConfig;
|
||||
use PVE::QemuServer;
|
||||
use PVE::LXC::Config;
|
||||
use PVE::LXC;
|
||||
use PVE::Storage;
|
||||
|
||||
my $STATE_DIR = '/var/lib/pve-replica';
|
||||
my $STATE_PATH = "$STATE_DIR/pve-replica.state";
|
||||
|
||||
my $get_ssh_cmd = sub {
|
||||
my ($ip) = @_;
|
||||
|
||||
return ['ssh', '-o', 'Batchmode=yes', "root\@$ip" ];
|
||||
};
|
||||
|
||||
sub get_guest_config {
|
||||
my ($vmid) = @_;
|
||||
|
||||
my $vms = PVE::Cluster::get_vmlist();
|
||||
|
||||
die "no such guest '$vmid'\n" if !defined($vms->{ids}->{$vmid});
|
||||
|
||||
my $vm_type = $vms->{ids}->{$vmid}->{type};
|
||||
|
||||
my $conf;
|
||||
my $running;
|
||||
|
||||
if ($vm_type eq 'qemu') {
|
||||
$conf = PVE::QemuConfig->load_config($vmid);
|
||||
$running = PVE::QemuServer::check_running($vmid);
|
||||
} elsif ($vm_type eq 'lxc') {
|
||||
$conf = PVE::LXC::Config->load_config($vmid);
|
||||
$running = PVE::LXC::check_running($vmid);
|
||||
} else {
|
||||
die "internal error";
|
||||
}
|
||||
|
||||
return ($conf, $vm_type, $running);
|
||||
}
|
||||
|
||||
sub write_state {
|
||||
my ($state) = @_;
|
||||
|
||||
mkdir $STATE_DIR;
|
||||
|
||||
PVE::Tools::file_set_contents($STATE_PATH, encode_json($state));
|
||||
}
|
||||
|
||||
sub read_state {
|
||||
|
||||
return {} if ! -e $STATE_PATH;
|
||||
|
||||
my $raw = PVE::Tools::file_get_contents($STATE_PATH);
|
||||
|
||||
return {} if $raw eq '';
|
||||
|
||||
return decode_json($raw);
|
||||
}
|
||||
|
||||
sub get_node_ip {
|
||||
my ($nodename) = @_;
|
||||
|
||||
my $remoteip = PVE::Cluster::remote_node_ip($nodename);
|
||||
|
||||
my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
|
||||
if (my $network = $dc_conf->{storage_replication_network}) {
|
||||
|
||||
my $cmd = $get_ssh_cmd->($remoteip);
|
||||
|
||||
push @$cmd, '--', 'pvecm', 'mtunnel', '--get_migration_ip', '--migration_network', $network;
|
||||
|
||||
PVE::Tools::run_command($cmd, outfunc => sub {
|
||||
my $line = shift;
|
||||
|
||||
if ($line =~ m/^ip: '($PVE::Tools::IPRE)'$/) {
|
||||
$remoteip = $1;
|
||||
}
|
||||
});
|
||||
}
|
||||
return $remoteip;
|
||||
}
|
||||
|
||||
sub get_all_jobs {
|
||||
|
||||
my $vms = PVE::Cluster::get_vmlist();
|
||||
|
||||
my $state = read_state();
|
||||
|
||||
my $jobs = {};
|
||||
|
||||
my $local_node = PVE::INotify::nodename();
|
||||
|
||||
foreach my $vmid (keys %{$vms->{ids}}) {
|
||||
next if $vms->{ids}->{$vmid}->{node} ne $local_node;
|
||||
my $vm_state = $state->{$vmid};
|
||||
next if !defined($vm_state);
|
||||
|
||||
my $job = {};
|
||||
|
||||
$job->{limit} = $vm_state->{limit};
|
||||
$job->{interval} = $vm_state->{interval};
|
||||
$job->{tnode} = $vm_state->{tnode};
|
||||
$job->{lastsync} = $vm_state->{lastsync};
|
||||
$job->{state} = $vm_state->{state};
|
||||
$job->{fail} = $vm_state->{fail};
|
||||
|
||||
$jobs->{$vmid} = $job;
|
||||
}
|
||||
|
||||
return $jobs;
|
||||
}
|
||||
|
||||
sub sync_guest {
|
||||
my ($vmid, $param) = @_;
|
||||
|
||||
my $local_node = PVE::INotify::nodename();
|
||||
|
||||
my $jobs = read_state();
|
||||
$jobs->{$vmid}->{state} = 'sync';
|
||||
write_state($jobs);
|
||||
|
||||
my ($guest_conf, $vm_type, $running) = get_guest_config($vmid);
|
||||
my $qga = 0;
|
||||
|
||||
my $job = $jobs->{$vmid};
|
||||
my $tnode = $job->{tnode};
|
||||
|
||||
if ($vm_type eq 'qemu' && defined($guest_conf->{agent}) ) {
|
||||
$qga = PVE::QemuServer::qga_check_running($vmid)
|
||||
if $running;
|
||||
}
|
||||
|
||||
my $storecfg = PVE::Storage::config();
|
||||
# will not die if a disk is not syncable
|
||||
my $disks = get_replicatable_volumes($storecfg, $guest_conf, $vm_type);
|
||||
|
||||
# check if all nodes have the storage availible
|
||||
foreach my $volid (keys %$disks) {
|
||||
my ($storeid) = PVE::Storage::parse_volume_id($volid);
|
||||
|
||||
my $store = $storecfg->{ids}->{$storeid};
|
||||
die "Storage $storeid not availible on node: $tnode\n"
|
||||
if $store->{nodes} && !$store->{nodes}->{$tnode};
|
||||
die "Storage $storeid not availible on node: $local_node\n"
|
||||
if $store->{nodes} && !$store->{nodes}->{$local_node};
|
||||
|
||||
}
|
||||
|
||||
my $limit = $param->{limit};
|
||||
$limit = $guest_conf->{replica_rate_limit}
|
||||
if (!defined($limit));
|
||||
|
||||
my $snap_time = time();
|
||||
|
||||
die "Invalid synctime format: $job->{lastsync}."
|
||||
if $job->{lastsync} !~ m/^(\d+)$/;
|
||||
|
||||
my $lastsync = $1;
|
||||
my $incremental_snap = $lastsync ? "replica_$lastsync" : undef;
|
||||
|
||||
# freeze filesystem for data consistency
|
||||
if ($qga) {
|
||||
print "Freeze guest filesystem\n";
|
||||
|
||||
eval {
|
||||
PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
|
||||
};
|
||||
}
|
||||
|
||||
my $snapname = "replica_$snap_time";
|
||||
|
||||
my $disks_status = {};
|
||||
|
||||
my $sync_job = sub {
|
||||
|
||||
# make snapshot of all volumes
|
||||
foreach my $volid (keys %$disks) {
|
||||
|
||||
eval {
|
||||
PVE::Storage::volume_snapshot($storecfg, $volid, $snapname);
|
||||
};
|
||||
|
||||
if (my $err = $@) {
|
||||
if ($qga) {
|
||||
print "Unfreeze guest filesystem\n";
|
||||
eval {
|
||||
PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw");
|
||||
};
|
||||
warn $@ if $@;
|
||||
}
|
||||
cleanup_snapshot($disks_status, $snapname, $storecfg, $running);
|
||||
$jobs->{$vmid}->{state} = 'error';
|
||||
write_state($jobs);
|
||||
|
||||
die $err;
|
||||
}
|
||||
|
||||
$disks_status->{$volid}->{snapshot} = 1;
|
||||
}
|
||||
|
||||
if ($qga) {
|
||||
print "Unfreeze guest filesystem\n";
|
||||
eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
|
||||
warn $@ if $@;
|
||||
}
|
||||
|
||||
my $ip = get_node_ip($tnode);
|
||||
|
||||
foreach my $volid (keys %$disks) {
|
||||
|
||||
eval {
|
||||
PVE::Storage::volume_send($storecfg, $volid, $snapname,
|
||||
$ip, $incremental_snap,
|
||||
$param->{verbose}, $limit);
|
||||
$job->{fail} = 0;
|
||||
};
|
||||
|
||||
if (my $err = $@) {
|
||||
cleanup_snapshot($disks_status, $snapname, $storecfg, $running, $ip);
|
||||
$job->{fail}++;
|
||||
$job->{state} = 'error' if $job->{fail} > 3;
|
||||
|
||||
$jobs->{$vmid} = $job;
|
||||
write_state($jobs);
|
||||
die $err;
|
||||
}
|
||||
|
||||
$disks_status->{$volid}->{synced} = 1;
|
||||
}
|
||||
|
||||
# delete old snapshot if exists
|
||||
cleanup_snapshot($disks_status, $snapname, $storecfg, $running, $ip, $lastsync) if
|
||||
$lastsync != 0;
|
||||
|
||||
$job->{lastsync} = $snap_time;
|
||||
$job->{state} = "ok";
|
||||
$jobs->{$vmid} = $job;
|
||||
write_state($jobs);
|
||||
};
|
||||
|
||||
PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $sync_job);
|
||||
die $@ if $@;
|
||||
|
||||
return $snap_time;
|
||||
}
|
||||
|
||||
sub send_image {
|
||||
my ($vol, $param, $ip, $all_snaps_in_delta, $alter_path) = @_;
|
||||
|
||||
my $plugin = $vol->{plugin};
|
||||
$plugin->send_image($vol, $param, $ip, $all_snaps_in_delta, $alter_path);
|
||||
}
|
||||
|
||||
sub job_enable {
|
||||
my ($vmid, $no_sync, $target) = @_;
|
||||
|
||||
my $local_node = PVE::INotify::nodename();
|
||||
|
||||
my $update_state = sub {
|
||||
my ($state) = @_;
|
||||
|
||||
my $jobs = read_state();
|
||||
my $job = $jobs->{$vmid};
|
||||
my ($config) = get_guest_config($vmid);
|
||||
my $param = {};
|
||||
|
||||
$job->{interval} = $config->{replica_interval} || 15;
|
||||
|
||||
$job->{tnode} = $target || $config->{replica_target};
|
||||
die "Replication target must be set\n" if !defined($job->{tnode});
|
||||
|
||||
die "Target and source node can't be the same\n"
|
||||
if $job->{tnode} eq $local_node;
|
||||
|
||||
$job->{fail} = 0;
|
||||
if (!defined($job->{lastsync})) {
|
||||
|
||||
if ( my $lastsync = get_lastsync($vmid)) {
|
||||
$job->{lastsync} = $lastsync;
|
||||
} else {
|
||||
$job->{lastsync} = 0;
|
||||
}
|
||||
}
|
||||
|
||||
$param->{verbose} = 1;
|
||||
|
||||
$job->{state} = 'ok';
|
||||
$jobs->{$vmid} = $job;
|
||||
write_state($jobs);
|
||||
|
||||
eval{
|
||||
sync_guest($vmid, $param) if !defined($no_sync);
|
||||
};
|
||||
if (my $err = $@) {
|
||||
$jobs->{$vmid}->{state} = 'error';
|
||||
write_state($jobs);
|
||||
die $err;
|
||||
}
|
||||
};
|
||||
|
||||
PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
|
||||
die $@ if $@;
|
||||
}
|
||||
|
||||
sub job_disable {
|
||||
my ($vmid) = @_;
|
||||
|
||||
my $update_state = sub {
|
||||
|
||||
my $jobs = read_state();
|
||||
|
||||
if (defined($jobs->{$vmid})) {
|
||||
$jobs->{$vmid}->{state} = 'off';
|
||||
write_state($jobs);
|
||||
} else {
|
||||
print "No replica service for $vmid\n";
|
||||
}
|
||||
};
|
||||
|
||||
PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
|
||||
die $@ if $@;
|
||||
}
|
||||
|
||||
sub job_remove {
|
||||
my ($vmid) = @_;
|
||||
|
||||
my $update_state = sub {
|
||||
|
||||
my $jobs = read_state();
|
||||
|
||||
if (defined($jobs->{$vmid})) {
|
||||
delete($jobs->{$vmid});
|
||||
write_state($jobs);
|
||||
} else {
|
||||
print "No replica service for $vmid\n";
|
||||
}
|
||||
};
|
||||
|
||||
PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
|
||||
die $@ if $@;
|
||||
}
|
||||
|
||||
sub get_replicatable_volumes {
|
||||
my ($storecfg, $conf, $vm_type, $noerr) = @_;
|
||||
|
||||
if ($vm_type eq 'qemu') {
|
||||
PVE::QemuConfig->get_replicatable_volumes($storecfg, $conf, $noerr);
|
||||
} elsif ($vm_type eq 'lxc') {
|
||||
PVE::LXC::Config->get_replicatable_volumes($storecfg, $conf, $noerr);
|
||||
} else {
|
||||
die "internal error";
|
||||
}
|
||||
}
|
||||
|
||||
sub destroy_all_snapshots {
|
||||
my ($vmid, $regex, $node) = @_;
|
||||
|
||||
my $ip = defined($node) ? get_node_ip($node) : undef;
|
||||
|
||||
my ($guest_conf, $vm_type, $running) = get_guest_config($vmid);
|
||||
|
||||
my $storecfg = PVE::Storage::config();
|
||||
my $disks = get_replicatable_volumes($storecfg, $guest_conf, $vm_type);
|
||||
|
||||
my $snapshots = {};
|
||||
foreach my $volid (keys %$disks) {
|
||||
$snapshots->{$volid} =
|
||||
PVE::Storage::volume_snapshot_list($storecfg, $volid, $regex, $node, $ip);
|
||||
}
|
||||
|
||||
foreach my $volid (keys %$snapshots) {
|
||||
|
||||
if (defined($regex)) {
|
||||
foreach my $snap (@{$snapshots->{$volid}}) {
|
||||
if ($ip) {
|
||||
PVE::Storage::volume_snapshot_delete_remote($storecfg, $volid, $snap, $ip);
|
||||
} else {
|
||||
PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap, $running);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if ($ip) {
|
||||
|
||||
my $cmd = $get_ssh_cmd->($ip);
|
||||
|
||||
push @$cmd, '--', 'pvesm', 'free', $volid;
|
||||
|
||||
PVE::Tools::run_command($cmd);
|
||||
} else {
|
||||
die "internal error";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
sub cleanup_snapshot {
|
||||
my ($disks, $snapname, $storecfg, $running, $ip, $lastsync_snap) = @_;
|
||||
|
||||
if ($lastsync_snap) {
|
||||
$snapname = "replica_$lastsync_snap";
|
||||
}
|
||||
|
||||
foreach my $volid (keys %$disks) {
|
||||
|
||||
if (defined($ip) && (defined($lastsync_snap) || $disks->{$volid}->{synced})) {
|
||||
PVE::Storage::volume_snapshot_delete_remote($storecfg, $volid, $snapname, $ip);
|
||||
}
|
||||
|
||||
if (defined($lastsync_snap) || $disks->{$volid}->{snapshot}) {
|
||||
PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname, $running);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub destroy_replica {
|
||||
my ($vmid) = @_;
|
||||
|
||||
my $code = sub {
|
||||
|
||||
my $jobs = read_state();
|
||||
|
||||
return if !defined($jobs->{$vmid});
|
||||
|
||||
my ($guest_conf, $vm_type) = get_guest_config($vmid);
|
||||
|
||||
destroy_all_snapshots($vmid, 'replica_');
|
||||
destroy_all_snapshots($vmid, undef, $guest_conf->{replica_target});
|
||||
|
||||
delete($jobs->{$vmid});
|
||||
|
||||
delete($guest_conf->{replica_rate_limit});
|
||||
delete($guest_conf->{replica_rate_interval});
|
||||
delete($guest_conf->{replica_target});
|
||||
delete($guest_conf->{replica});
|
||||
|
||||
if ($vm_type eq 'qemu') {
|
||||
PVE::QemuConfig->write_config($vmid, $guest_conf);
|
||||
} else {
|
||||
PVE::LXC::Config->write_config($vmid, $guest_conf);
|
||||
}
|
||||
write_state($jobs);
|
||||
};
|
||||
|
||||
PVE::Tools::lock_file_full($STATE_PATH, 30, 0 , $code);
|
||||
die $@ if $@;
|
||||
}
|
||||
|
||||
sub get_lastsync {
|
||||
my ($vmid) = @_;
|
||||
|
||||
my ($conf, $vm_type) = get_guest_config($vmid);
|
||||
|
||||
my $storecfg = PVE::Storage::config();
|
||||
my $sync_vol = get_replicatable_volumes($storecfg, $conf, $vm_type);
|
||||
|
||||
my $time;
|
||||
foreach my $volid (keys %$sync_vol) {
|
||||
my $list =
|
||||
PVE::Storage::volume_snapshot_list($storecfg, $volid, 'replica_');
|
||||
|
||||
if (my $tmp_snap = shift @$list) {
|
||||
$tmp_snap =~ m/^replica_(\d+)$/;
|
||||
die "snapshots are not coherent\n"
|
||||
if defined($time) && !($time eq $1);
|
||||
$time = $1;
|
||||
}
|
||||
}
|
||||
|
||||
return $time;
|
||||
}
|
||||
|
||||
sub get_last_replica_snap {
|
||||
my ($volid) = @_;
|
||||
|
||||
my $storecfg = PVE::Storage::config();
|
||||
my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid, 'replica_');
|
||||
|
||||
return shift @$list;
|
||||
}
|
||||
|
||||
sub update_conf {
|
||||
my ($vmid, $key, $value) = @_;
|
||||
|
||||
if ($key eq 'replica_target') {
|
||||
destroy_replica($vmid);
|
||||
job_enable($vmid, undef, $value);
|
||||
return;
|
||||
}
|
||||
|
||||
my $update = sub {
|
||||
my $jobs = read_state();
|
||||
|
||||
return if !defined($jobs->{$vmid});
|
||||
|
||||
if ($key eq 'replica_interval') {
|
||||
$jobs->{$vmid}->{interval} = $value || 15;
|
||||
} elsif ($key eq 'replica_rate_limit'){
|
||||
$jobs->{$vmid}->{limit} = $value ||
|
||||
delete $jobs->{$vmid}->{limit};
|
||||
} else {
|
||||
die "Config parameter $key not known";
|
||||
}
|
||||
|
||||
write_state($jobs);
|
||||
};
|
||||
|
||||
PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $update);
|
||||
die $@ if $@;
|
||||
}
|
||||
|
||||
|
||||
1;
|
||||
Reference in New Issue
Block a user