This patch will include storage asynchronous replication.

It is possible to synchronise a volume to an other node in a defined interval.
So if a node fail there will be an copy of the volumes from a VM
on an other node.
With this copy it is possible to start the VM on this node.
This commit is contained in:
Wolfgang Link
2017-04-24 17:15:31 +02:00
committed by Wolfgang Bumiller
parent f189504ccb
commit 663510b86d
8 changed files with 868 additions and 3 deletions

View File

@ -33,15 +33,23 @@ pvesm.bash-completion:
perl -I. -T -e "use PVE::CLI::pvesm; PVE::CLI::pvesm->generate_bash_completions();" >$@.tmp
mv $@.tmp $@
pvesr.bash-completion:
perl -I. -T -e "use PVE::CLI::pvesr; PVE::CLI::pvesr->generate_bash_completions();" >$@.tmp
mv $@.tmp $@
.PHONY: install
install: pvesm.1 pvesm.bash-completion
install: pvesm.1 pvesm.bash-completion pvesr.bash-completion
install -d ${DESTDIR}${SBINDIR}
install -m 0755 pvesm ${DESTDIR}${SBINDIR}
install -m 0755 pvesr ${DESTDIR}${SBINDIR}
make -C PVE install
install -d ${DESTDIR}/var/lib/pve-replica
install -d ${DESTDIR}/usr/share/man/man1
install -m 0644 pvesm.1 ${DESTDIR}/usr/share/man/man1/
gzip -9 -n ${DESTDIR}/usr/share/man/man1/pvesm.1
install -m 0644 -D pvesm.bash-completion ${DESTDIR}${BASHCOMPLDIR}/pvesm
install -m 0644 -D pvesr.bash-completion ${DESTDIR}${BASHCOMPLDIR}/pverepm
.PHONY: deb
deb: ${DEB}
@ -65,7 +73,7 @@ ${DEB}:
.PHONY: clean
clean:
make cleanup-docgen
rm -rf debian *.deb ${PACKAGE}-*.tar.gz dist *.1 *.tmp pvesm.bash-completion
rm -rf debian *.deb ${PACKAGE}-*.tar.gz dist *.1 *.tmp pvesm.bash-completion pvesr.bash-completion
find . -name '*~' -exec rm {} ';'
.PHONY: distclean

View File

@ -3,4 +3,5 @@
.PHONY: install
install:
install -D -m 0644 Disks.pm ${DESTDIR}${PERLDIR}/PVE/API2/Disks.pm
install -D -m 0644 StorageReplication.pm ${DESTDIR}${PERLDIR}/PVE/API2/StorageReplication.pm
make -C Storage install

View File

@ -0,0 +1,49 @@
package PVE::API2::StorageReplication;
use warnings;
use strict;
use PVE::JSONSchema qw(get_standard_option);
use PVE::ReplicationTools;
use PVE::RESTHandler;
use base qw(PVE::RESTHandler);
__PACKAGE__->register_method ({
name => 'list',
path => 'list',
method => 'GET',
description => "List of all replication jobs.",
permissions => {
user => 'all',
},
protected => 1,
proxyto => 'node',
parameters => {
additionalProperties => 0,
properties => {
node => get_standard_option('pve-node'),
nodes => get_standard_option('pve-node-list' ,
{description => "Notes where the jobs is located.",
optional => 1}),
},
},
returns => { type => 'object' },
code => sub {
my ($param) = @_;
if ($param->{nodes}) {
foreach my $node (PVE::Tools::split_list($param->{nodes})) {
die "Node: $node does not exists.\n" if
!PVE::Cluster::check_node_exists($node);
}
}
my $nodes = $param->{nodes} ?
$param->{nodes} : $param->{node};
return PVE::ReplicationTools::get_all_jobs($nodes);
}});
1;

View File

@ -1,4 +1,4 @@
SOURCES=pvesm.pm
SOURCES=pvesm.pm pvesr.pm
.PHONY: install
install: ${SOURCES}

220
PVE/CLI/pvesr.pm Normal file
View File

@ -0,0 +1,220 @@
package PVE::CLI::pvesr;
use strict;
use warnings;
use PVE::API2::StorageReplication;
use PVE::JSONSchema qw(get_standard_option);
use PVE::INotify;
use PVE::RPCEnvironment;
use PVE::Tools qw(extract_param);
use PVE::SafeSyslog;
use PVE::CLIHandler;
use POSIX qw(strftime);
use base qw(PVE::CLIHandler);
my $nodename = PVE::INotify::nodename();
sub setup_environment {
PVE::RPCEnvironment->setup_default_cli_env();
}
my $print_list = sub {
my ($conf, $json) = @_;
if (defined($json)) {
print JSON::encode_json($conf);
} else {
printf("%-10s%-20s%-20s%-5s%-10s%-5s\n",
"VMID", "DEST", "LAST SYNC","IVAL", "STATE", "FAIL");
foreach my $vmid (sort keys %$conf) {
my $job = $conf->{$vmid};
my $timestr = strftime("%Y-%m-%d_%H:%M:%S", localtime($job->{lastsync}));
printf("%-9s ", $vmid);
printf("%-19s ", $job->{tnode});
printf("%-19s ", $timestr);
printf("%-4s ", $job->{interval});
printf("%-9s ", $job->{state});
printf("%-9s\n", $job->{fail});
}
}
};
sub set_list {
my ($list, $synctime, $vmid) = @_;
if (defined($list->{$synctime})) {
$list = set_list($list,$synctime+1, $vmid);
} else {
$list->{$synctime} = $vmid;
}
return $list;
}
my $get_replica_list = sub {
my $jobs = PVE::ReplicationTools::read_state();
my $list = {};
foreach my $vmid (keys %$jobs) {
my $job = $jobs->{$vmid};
my $lastsync = $job->{lastsync};
# interval in min
my $interval = $job->{interval};
my $now = time();
my $fail = $job->{fail};
my $synctime = $lastsync + $interval * 60;
if ($now >= $synctime && $job->{state} eq 'ok') {
$list = set_list($list, $synctime, $vmid);
} elsif ($job->{state} eq 'sync') {
my $synctime += $interval * ($job->{fail}+1);
$list = set_list($list, $synctime, $vmid)
if ($now >= $synctime);
}
}
return $list;
};
my $replicate_vms = sub {
my ($list) = @_;
my @sorted_times = reverse sort keys %$list;
foreach my $synctime (@sorted_times) {
eval {
PVE::ReplicationTools::sync_guest($list->{$synctime});
};
if (my $err = $@) {
syslog ('err', $err );
}
}
};
__PACKAGE__->register_method ({
name => 'run',
path => 'run',
method => 'POST',
description => "This method will run by the systemd-timer and sync all jobs",
permissions => {
description => {
check => ['perm', '/', [ 'Sys.Console' ]],
},
},
protected => 1,
parameters => {
additionalProperties => 0,
properties => {
},
},
returns => { type => 'null' },
code => sub {
my $list = &$get_replica_list();
&$replicate_vms($list);
return undef;
}});
__PACKAGE__->register_method ({
name => 'destroyjob',
path => 'destroyjob',
method => 'DELETE',
description => "Destroy an async replication job",
permissions => {
description => {
check => ['perm', '/storage', ['Datastore.Allocate']],
},
},
protected => 1,
parameters => {
additionalProperties => 0,
properties => {
vmid => {
type => 'string', format => 'pve-vmid',
description => "The VMID of the guest.",
completion => \&PVE::Cluster::complete_local_vmid,
},
},
},
returns => { type => 'null' },
code => sub {
my ($param) = @_;
my $vmid = extract_param($param, 'vmid');
PVE::ReplicationTools::destroy_replica($vmid);
}});
__PACKAGE__->register_method ({
name => 'list',
path => 'list',
method => 'GET',
description => "List of all replication jobs.",
permissions => {
user => 'all'
},
protected => 1,
parameters => {
additionalProperties => 0,
properties => {
vmid => {
type => 'string', format => 'pve-vmid',
description => "The VMID of the guest.",
completion => \&PVE::Cluster::complete_local_vmid,
},
},
},
protected => 1,
proxyto => 'node',
parameters => {
additionalProperties => 0,
properties => {
node => get_standard_option('pve-node'),
nodes => get_standard_option('pve-node-list' ,
{description => "Notes where the jobs is located.",
optional => 1}),
json => {
optional => 1,
type => 'boolean',
description => "Output in JSON format.",
},
},
},
returns => { type => 'string' },
code => sub {
my ($param) = @_;
if ($param->{nodes}) {
foreach my $node (PVE::Tools::split_list($param->{nodes})) {
die "Node: $node does not exists.\n" if
!PVE::Cluster::check_node_exists($node);
}
}
my $nodes = $param->{nodes} ?
$param->{nodes} : $param->{node};
my $list = PVE::ReplicationTools::get_all_jobs($nodes);
&$print_list($list, $param->{json});
}});
our $cmddef = {
list => [ __PACKAGE__ , 'list' , [], {node => $nodename}],
run => [ __PACKAGE__ , 'run'],
destroyjob => [ __PACKAGE__ , 'destroyjob', ['vmid']],
};
1;

View File

@ -3,6 +3,7 @@
.PHONY: install
install:
install -D -m 0644 Storage.pm ${DESTDIR}${PERLDIR}/PVE/Storage.pm
install -D -m 0644 ReplicationTools.pm ${DESTDIR}${PERLDIR}/PVE/ReplicationTools.pm
install -D -m 0644 Diskmanage.pm ${DESTDIR}${PERLDIR}/PVE/Diskmanage.pm
make -C Storage install
make -C API2 install

578
PVE/ReplicationTools.pm Normal file
View File

@ -0,0 +1,578 @@
package PVE::ReplicationTools;
use warnings;
use strict;
use PVE::Tools qw(run_command);
use PVE::Cluster;
use PVE::QemuConfig;
use PVE::LXC::Config;
use PVE::LXC;
use PVE::Storage;
use Time::Local;
use JSON;
use Data::Dumper qw(Dumper);
my $STATE_DIR = '/var/lib/pve-replica';
my $STATE_FILE = "/pve-replica.state";
my $STATE_PATH = $STATE_DIR.$STATE_FILE;
PVE::Cluster::cfs_update;
my $local_node = PVE::INotify::nodename();
my $cluster_nodes;
my $get_guestconfig = sub {
my ($vmid) = @_;
my $vms = PVE::Cluster::get_vmlist();
my $type = $vms->{ids}->{$vmid}->{type};
my $guestconf;
my $running;
if ($type =~ m/^qemu$/) {
$guestconf = PVE::QemuConfig->load_config($vmid);
$running = PVE::QemuServer::check_running($vmid);
} elsif ($type =~ m/^lxc$/) {
$guestconf = PVE::LXC::Config->load_config($vmid);
$running = PVE::LXC::check_running($vmid);
}
return ($guestconf, $type, $running);
};
sub write_state {
my ($state) = @_;
mkdir $STATE_DIR;
PVE::Tools::file_set_contents($STATE_PATH, JSON::encode_json($state));
}
sub read_state {
return {} if !(-e $STATE_PATH);
my $raw = PVE::Tools::file_get_contents($STATE_PATH);
return {} if $raw eq '';
return JSON::decode_json($raw);
}
sub get_node_ip {
my ($nodename) = @_;
my $remoteip = PVE::Cluster::remote_node_ip($nodename, 1);
my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
if (my $network = $dc_conf->{storage_replication_network}) {
my $cmd = ['ssh', '-o', 'Batchmode=yes', "root\@$remoteip", '--'
,'pvecm', 'mtunnel', '--get_migration_ip',
'--migration_network', $network];
PVE::Tools::run_command($cmd, outfunc => sub {
my $line = shift;
if ($line =~ m/^ip: '($PVE::Tools::IPRE)'$/) {
$remoteip = $1;
}
});
}
return $remoteip;
}
sub get_all_jobs {
my ($nodes) = @_;
my @nodelist = PVE::Tools::split_list($nodes);
my $vms = PVE::Cluster::get_vmlist();
my $state = read_state();
my $jobs = {};
my $outfunc = sub {
my $line = shift;
my $remote_jobs = JSON::decode_json($line);
foreach my $vmid (keys %$remote_jobs) {
$jobs->{$vmid} = $remote_jobs->{$vmid};
}
};
foreach my $node (@nodelist) {
if ($local_node ne $node) {
my $ip = get_node_ip($node);
$ip = [$ip] if Net::IP::ip_is_ipv6($ip);
my @cmd = ('ssh', '-o', 'Batchmode=yes', "root\@$ip", '--',
'pvesr', 'list', '--json');
run_command([@cmd], outfunc=>$outfunc)
} else {
foreach my $vmid (keys %{$vms->{ids}}) {
next if !($vms->{ids}->{$vmid}->{node} eq $local_node);
next if !defined($state->{$vmid});
my $vm_state = $state->{$vmid};
my $job = {};
$job->{limit} = $vm_state->{limit};
$job->{interval} = $vm_state->{interval};
$job->{tnode} = $vm_state->{tnode};
$job->{lastsync} = $vm_state->{lastsync};
$job->{state} = $vm_state->{state};
$job->{fail} = $vm_state->{fail};
$jobs->{$vmid} = $job;
}
}
}
return ($jobs);
}
sub sync_guest {
my ($vmid, $param) = @_;
my $jobs = read_state();
$jobs->{$vmid}->{state} = 'sync';
write_state($jobs);
my ($guest_conf, $vm_type, $running) = &$get_guestconfig($vmid);
my $qga = 0;
my $job = $jobs->{$vmid};
my $tnode = $job->{tnode};
if ($vm_type eq "qemu" && defined($guest_conf->{agent}) ) {
$qga = PVE::QemuServer::qga_check_running($vmid)
if $running;
}
# will not die if a disk is not syncable
my $disks = get_syncable_guestdisks($guest_conf, $vm_type);
# check if all nodes have the storage availible
my $storage_config = PVE::Storage::config();
foreach my $volid (keys %$disks) {
my ($storeid) = PVE::Storage::parse_volume_id($volid);
my $store = $storage_config->{ids}->{$storeid};
die "Storage not availible on node: $tnode\n"
if $store->{nodes} && !$store->{nodes}->{$tnode};
die "Storage not availible on node: $local_node\n"
if $store->{nodes} && !$store->{nodes}->{$local_node};
}
my $limit = $param->{limit};
$limit = $guest_conf->{replica_rate_limit}
if (!defined($limit));
my $snap_time = time();
die "Invalid synctime format: $job->{lastsync}."
if $job->{lastsync} !~ m/^(\d+)$/;
my $lastsync = $1;
my $incremental_snap = $lastsync ? "replica_$lastsync" : undef;
# freeze filesystem for data consistency
if ($qga) {
print "Freeze guest filesystem\n";
eval {
PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
};
}
my $snapname = "replica_$snap_time";
my $disks_status = { snapname => $snapname };
my $sync_job = sub {
# make snapshot of all volumes
foreach my $volid (keys %$disks) {
eval {
PVE::Storage::volume_snapshot($storage_config, $volid, $snapname);
};
if (my $err = $@) {
if ($qga) {
print "Unfreeze guest filesystem\n";
eval {
PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw");
};
warn $@ if $@;
}
cleanup_snapshot($disks_status, $snapname, $storage_config, $running);
$jobs->{$vmid}->{state} = 'error';
write_state($jobs);
die $err;
}
$disks_status->{$volid}->{snapshot} = 1;
}
if ($qga) {
print "Unfreeze guest filesystem\n";
eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
warn $@ if $@;
}
my $ip = get_node_ip($tnode);
foreach my $volid (keys %$disks) {
eval {
PVE::Storage::volume_send($storage_config, $volid, $snapname,
$ip, $incremental_snap,
$param->{verbose}, $limit);
$job->{fail} = 0;
};
if (my $err = $@) {
cleanup_snapshot($disks_status, $snapname, $storage_config, $running, $ip);
$job->{fail}++;
$job->{state} = 'error' if $job->{fail} > 3;
$jobs->{$vmid} = $job;
write_state($jobs);
die "$err";
}
$disks_status->{$volid}->{synced} = 1;
}
# delet old snapshot if exists
cleanup_snapshot($disks_status, $snapname, $storage_config, $running, $ip, $lastsync) if
$job->{lastsync} ne '0';
$job->{lastsync} = $snap_time;
$job->{state} = "ok";
$jobs->{$vmid} = $job;
write_state($jobs);
};
PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $sync_job);
die $@ if $@;
return $snap_time;
}
sub get_snapshots {
my ($vol, $prefix, $nodes) = @_;
my $plugin = $vol->{plugin};
return $plugin->get_snapshots($vol, $prefix, $nodes);
}
sub send_image {
my ($vol, $param, $ip, $all_snaps_in_delta, $alter_path) = @_;
my $plugin = $vol->{plugin};
$plugin->send_image($vol, $param, $ip, $all_snaps_in_delta, $alter_path);
}
sub job_enable {
my ($vmid, $no_sync, $target) = @_;
my $update_state = sub {
my ($state) = @_;
my $jobs = read_state();
my $job = $jobs->{$vmid};
my ($config) = &$get_guestconfig($vmid);
my $param = {};
$job->{interval} = $config->{replica_interval} || 15;
$job->{tnode} = $target || $config->{replica_target};
die "Replica Target must be set\n" if !defined($job->{tnode});
die "Target and source Node can't be the same\n"
if $job->{tnode} eq $local_node;
$job->{fail} = 0;
if (!defined($job->{lastsync})) {
if ( my $lastsync = get_lastsync($vmid)) {
$job->{lastsync} = $lastsync;
} else {
$job->{lastsync} = 0;
}
}
$param->{verbose} = 1;
$job->{state} = 'ok';
$jobs->{$vmid} = $job;
write_state($jobs);
eval{
sync_guest($vmid, $param) if !defined($no_sync);
};
if (my $err = $@) {
$jobs->{$vmid}->{state} = 'error';
write_state($jobs);
die $err;
}
};
PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
die $@ if $@;
}
sub job_disable {
my ($vmid) = @_;
my $update_state = sub {
my $jobs = read_state();
if (defined($jobs->{$vmid})) {
$jobs->{$vmid}->{state} = 'off';
write_state($jobs);
} else {
print "No replica service for $vmid\n";
}
};
PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
die $@ if $@;
}
sub job_remove {
my ($vmid) = @_;
my $update_state = sub {
my $jobs = read_state();
if (defined($jobs->{$vmid})) {
delete($jobs->{$vmid});
write_state($jobs);
} else {
print "No replica service for $vmid\n";
}
};
PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
die $@ if $@;
}
sub get_syncable_guestdisks {
my ($config, $vm_type, $running, $noerr) = @_;
my $syncable_disks = {};
my $cfg = PVE::Storage::config();
my $warnings = 0;
my $func = sub {
my ($id, $volume) = @_;
my $volname;
if ($vm_type eq 'qemu') {
$volname = $volume->{file};
} else {
$volname = $volume->{volume};
}
if( PVE::Storage::volume_has_feature($cfg, 'replicate', $volname , undef, $running)) {
$syncable_disks->{$volname} = 1;
} else {
warn "Can't sync Volume: $volname\n"
if !$noerr &&
(!defined($volume->{replica}) || $volume->{replica});
$warnings = 1;
}
};
if ($vm_type eq 'qemu') {
PVE::QemuServer::foreach_drive($config, $func);
} elsif ($vm_type eq 'lxc') {
PVE::LXC::Config->foreach_mountpoint($config, $func);
} else {
die "Unknown VM Type: $vm_type";
}
return wantarray ? ($warnings, $syncable_disks) : $syncable_disks;
}
sub destroy_all_snapshots {
my ($vmid, $regex, $node) = @_;
my $ip = defined($node) ? get_node_ip($node) : undef;
my ($guest_conf, $vm_type, $running) = &$get_guestconfig($vmid);
my $disks = get_syncable_guestdisks($guest_conf, $vm_type);
my $cfg = PVE::Storage::config();
my $snapshots = {};
foreach my $volid (keys %$disks) {
$snapshots->{$volid} =
PVE::Storage::volume_snapshot_list($cfg, $volid, $regex, $node, $ip);
}
foreach my $volid (keys %$snapshots) {
if (defined($regex)) {
foreach my $snap (@{$snapshots->{$volid}}) {
if ($ip) {
PVE::Storage::volume_snapshot_delete_remote($cfg, $volid, $snap, $ip);
} else {
PVE::Storage::volume_snapshot_delete($cfg, $volid, $snap, $running);
}
}
} else {
if ($ip) {
my $cmd = ['ssh', '-o', 'Batchmode=yes', "root\@$ip", '--'
,'pvesm', 'free', $volid];
PVE::Tools::run_command($cmd);
} else {
PVE::Storage::vdisk_free($cfg, $volid);
}
}
}
}
sub cleanup_snapshot {
my ($disks, $snapname, $cfg, $running, $ip, $lastsync_snap) = @_;
if ($lastsync_snap) {
$snapname = "replica_$lastsync_snap";
}
foreach my $volid (keys %$disks) {
next if $volid eq "snapname";
if (defined($lastsync_snap) || $disks->{$volid}->{synced}) {
PVE::Storage::volume_snapshot_delete_remote($cfg, $volid, $snapname, $ip);
}
if (defined($lastsync_snap) || $disks->{$volid}->{snapshot}) {
PVE::Storage::volume_snapshot_delete($cfg, $volid, $snapname, $running);
}
}
}
sub destroy_replica {
my ($vmid) = @_;
my $code = sub {
my $jobs = read_state();
return if !defined($jobs->{$vmid});
my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
destroy_all_snapshots($vmid, 'replica');
destroy_all_snapshots($vmid, undef, $guest_conf->{replica_target});
delete($jobs->{$vmid});
delete($guest_conf->{replica_rate_limit});
delete($guest_conf->{replica_rate_interval});
delete($guest_conf->{replica_target});
delete($guest_conf->{replica});
if ($vm_type eq 'qemu') {
PVE::QemuConfig->write_config($vmid, $guest_conf);
} else {
PVE::LXC::Config->write_config($vmid, $guest_conf);
}
write_state($jobs);
};
PVE::Tools::lock_file_full($STATE_PATH, 30, 0 , $code);
die $@ if $@;
}
sub get_lastsync {
my ($vmid) = @_;
my ($conf, $vm_type) = &$get_guestconfig($vmid);
my $sync_vol = get_syncable_guestdisks($conf, $vm_type);
my $cfg = PVE::Storage::config();
my $time;
foreach my $volid (keys %$sync_vol) {
my $list =
PVE::Storage::volume_snapshot_list($cfg, $volid, 'replica', $local_node);
if (my $tmp_snap = shift @$list) {
$tmp_snap =~ m/^replica_(\d+)$/;
die "snapshots are not coherent\n"
if defined($time) && !($time eq $1);
$time = $1;
}
}
return $time;
}
sub get_last_replica_snap {
my ($volid) = @_;
my $cfg = PVE::Storage::config();
my $list = PVE::Storage::volume_snapshot_list($cfg, $volid, 'replica_', $local_node);
return shift @$list;
}
sub check_guest_volumes_syncable {
my ($conf, $vm_type) = @_;
my ($warnings, $disks) = get_syncable_guestdisks($conf, $vm_type, 1);
return undef if $warnings || !%$disks;
return 1;
}
sub update_conf {
my ($vmid, $key, $value) = @_;
if ($key eq 'replica_target') {
destroy_replica($vmid);
job_enable($vmid, undef, $value);
return;
}
my $update = sub {
my $jobs = read_state();
return if !defined($jobs->{$vmid});
if ($key eq 'replica_interval') {
$jobs->{$vmid}->{interval} = $value || 15;
} elsif ($key eq 'replica_rate_limit'){
$jobs->{$vmid}->{limit} = $value ||
delet $jobs->{$vmid}->{limit};
} else {
die "Config parameter: $key not known";
}
write_state($jobs);
};
PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $update);
}
1;

8
pvesr Normal file
View File

@ -0,0 +1,8 @@
#!/usr/bin/perl
use strict;
use warnings;
use PVE::CLI::pvesr;
PVE::CLI::pvesr->run_cli_handler();