1
0
mirror of https://github.com/Yubico/yubikey-val.git synced 2025-02-08 03:54:20 +01:00

implement paralell syncing with curl_multi

This commit is contained in:
Klas Lindfors 2016-03-07 15:42:33 +01:00
parent a4f8c24877
commit 2a0f74c78d

View File

@ -279,17 +279,40 @@ class SyncLib
/* Loop over all unique servers in queue */ /* Loop over all unique servers in queue */
$queued_limit = time()-$older_than; $queued_limit = time()-$older_than;
$server_res = $this->db->customQuery("select distinct server from queue WHERE queued < " . $queued_limit . " or queued is null"); $server_res = $this->db->customQuery("select distinct server from queue WHERE queued < " . $queued_limit . " or queued is null");
$server_list = array();
$mh = curl_multi_init();
$ch = array();
$entries = array();
$handles = 0;
while ($my_server = $this->db->fetchArray($server_res)) while ($my_server = $this->db->fetchArray($server_res))
{ {
$this->log(LOG_DEBUG, "Processing queue for server " . $my_server['server']); $server = $my_server['server'];
$this->log(LOG_DEBUG, "Processing queue for server " . $server);
$res = $this->db->customQuery("select * from queue WHERE (queued < " . $queued_limit . " or queued is null) and server='" . $my_server['server'] . "'"); $res = $this->db->customQuery("select * from queue WHERE (queued < " . $queued_limit . " or queued is null) and server='" . $server . "'");
$ch = curl_init(); $list = array();
while ($entry = $this->db->fetchArray($res)) while ($entry = $this->db->fetchArray($res))
{ {
$list[] = $entry;
}
$server_list[$server] = $list;
$handle = curl_init();
curl_setopt($handle, CURLOPT_PRIVATE, $server);
$ch[$server] = $handle;
$this->db->closeCursor($res);
}
$this->db->closeCursor($server_res);
/* add one entry for each server we're going to sync */
foreach ($server_list as $server) {
$entry = array_shift($server);
if(count($server) == 0) {
unset($server_list[$entry['server']]);
}
$handle = $ch[$entry['server']];
$this->log(LOG_INFO, "server=" . $entry['server'] . ", server_nonce=" . $entry['server_nonce'] . ", info=" . $entry['info']); $this->log(LOG_INFO, "server=" . $entry['server'] . ", server_nonce=" . $entry['server_nonce'] . ", info=" . $entry['info']);
$url = $entry['server'] . $url = $entry['server'] .
@ -297,18 +320,24 @@ class SyncLib
"&modified=" . $entry['modified'] . "&modified=" . $entry['modified'] .
"&" . $this->otpPartFromInfoString($entry['info']); "&" . $this->otpPartFromInfoString($entry['info']);
/* Send out sync request */ curl_settings($this, 'YK-VAL resync', $handle, $url, $timeout, $this->curlopts);
$entries[$entry['server']] = $entry;
curl_settings($this, 'YK-VAL resync', $ch, $url, $timeout, $this->curlopts); curl_multi_add_handle($mh, $handle);
$handles++;
$response = curl_exec($ch);
if ($response == False)
{
$this->log(LOG_NOTICE, 'Timeout. Stopping queue resync for server ' . $entry['server']);
break;
} }
while($handles > 0) {
while (curl_multi_exec($mh, $active) == CURLM_CALL_MULTI_PERFORM);
while ($info = curl_multi_info_read($mh)) {
$handle = $info['handle'];
$server = curl_getinfo($handle, CURLINFO_PRIVATE);
$entry = $entries[$server];
$this->log(LOG_DEBUG, "handle indicated to be for $server.");
curl_multi_remove_handle($mh, $handle);
$handles--;
if ($info['result'] === CURLE_OK) {
$response = curl_multi_getcontent($handle);
if (preg_match('/status=OK/', $response)) if (preg_match('/status=OK/', $response))
{ {
$resParams = $this->parseParamsFromMultiLineString($response); $resParams = $this->parseParamsFromMultiLineString($response);
@ -395,14 +424,37 @@ class SyncLib
$this->log(LOG_ERR, 'Remote server refused our sync request. Check remote server logs.'); $this->log(LOG_ERR, 'Remote server refused our sync request. Check remote server logs.');
} }
} /* End of loop over each queue entry for a server */ if($server_list[$server]) {
$entry = array_shift($server_list[$server]);
if(count($server_list[$server]) == 0) {
$this->log(LOG_DEBUG, "All entries for $server synced.");
unset($server_list[$server]);
}
$this->log(LOG_INFO, "server=" . $entry['server'] . ", server_nonce=" . $entry['server_nonce'] . ", info=" . $entry['info']);
$url = $entry['server'] .
"?otp=" . $entry['otp'] .
"&modified=" . $entry['modified'] .
"&" . $this->otpPartFromInfoString($entry['info']);
curl_settings($this, 'YK-VAL resync', $handle, $url, $timeout, $this->curlopts);
$entries[$server] = $entry;
curl_multi_add_handle($mh, $handle);
$handles++;
}
} else {
$this->log(LOG_NOTICE, 'Timeout. Stopping queue resync for server ' . $entry['server']);
unset($server_list[$server]);
}
}
}
foreach ($ch as $handle) {
curl_close($ch); curl_close($ch);
$this->db->closeCursor($res); }
} /* End of loop over each distinct server in queue */ curl_multi_close($mh);
$this->db->closeCursor($server_res);
return true; return true;
} }