From 2a0f74c78d6cee6764e683bb4a9d9b931f9e65bd Mon Sep 17 00:00:00 2001 From: Klas Lindfors Date: Mon, 7 Mar 2016 15:42:33 +0100 Subject: [PATCH] implement paralell syncing with curl_multi --- ykval-synclib.php | 236 ++++++++++++++++++++++++++++------------------ 1 file changed, 144 insertions(+), 92 deletions(-) diff --git a/ykval-synclib.php b/ykval-synclib.php index d87833f..27feb33 100644 --- a/ykval-synclib.php +++ b/ykval-synclib.php @@ -279,130 +279,182 @@ class SyncLib /* Loop over all unique servers in queue */ $queued_limit = time()-$older_than; $server_res = $this->db->customQuery("select distinct server from queue WHERE queued < " . $queued_limit . " or queued is null"); + $server_list = array(); + $mh = curl_multi_init(); + $ch = array(); + $entries = array(); + $handles = 0; while ($my_server = $this->db->fetchArray($server_res)) { - $this->log(LOG_DEBUG, "Processing queue for server " . $my_server['server']); + $server = $my_server['server']; + $this->log(LOG_DEBUG, "Processing queue for server " . $server); - $res = $this->db->customQuery("select * from queue WHERE (queued < " . $queued_limit . " or queued is null) and server='" . $my_server['server'] . "'"); + $res = $this->db->customQuery("select * from queue WHERE (queued < " . $queued_limit . " or queued is null) and server='" . $server . "'"); - $ch = curl_init(); + $list = array(); while ($entry = $this->db->fetchArray($res)) { - $this->log(LOG_INFO, "server=" . $entry['server'] . ", server_nonce=" . $entry['server_nonce'] . ", info=" . $entry['info']); + $list[] = $entry; + } + $server_list[$server] = $list; + $handle = curl_init(); + curl_setopt($handle, CURLOPT_PRIVATE, $server); + $ch[$server] = $handle; + $this->db->closeCursor($res); + } + $this->db->closeCursor($server_res); - $url = $entry['server'] . - "?otp=" . $entry['otp'] . - "&modified=" . $entry['modified'] . - "&" . $this->otpPartFromInfoString($entry['info']); + /* add one entry for each server we're going to sync */ + foreach ($server_list as $server) { + $entry = array_shift($server); + if(count($server) == 0) { + unset($server_list[$entry['server']]); + } + $handle = $ch[$entry['server']]; + $this->log(LOG_INFO, "server=" . $entry['server'] . ", server_nonce=" . $entry['server_nonce'] . ", info=" . $entry['info']); - /* Send out sync request */ + $url = $entry['server'] . + "?otp=" . $entry['otp'] . + "&modified=" . $entry['modified'] . + "&" . $this->otpPartFromInfoString($entry['info']); - curl_settings($this, 'YK-VAL resync', $ch, $url, $timeout, $this->curlopts); + curl_settings($this, 'YK-VAL resync', $handle, $url, $timeout, $this->curlopts); + $entries[$entry['server']] = $entry; + curl_multi_add_handle($mh, $handle); + $handles++; + } - $response = curl_exec($ch); + while($handles > 0) { + while (curl_multi_exec($mh, $active) == CURLM_CALL_MULTI_PERFORM); - if ($response == False) - { - $this->log(LOG_NOTICE, 'Timeout. Stopping queue resync for server ' . $entry['server']); - break; - } - - if (preg_match('/status=OK/', $response)) - { - $resParams = $this->parseParamsFromMultiLineString($response); - $this->log(LOG_DEBUG, 'response contains ', $resParams); - - /* Update database counters */ - $this->updateDbCounters($resParams); - - /* Retrieve info from entry info string */ - - /* This is the counter values we had in our database *before* processing the current OTP. */ - $validationParams = $this->localParamsFromInfoString($entry['info']); - - /* This is the data from the current OTP. */ - $otpParams = $this->otpParamsFromInfoString($entry['info']); - - /* Fetch current information from our database */ - $localParams = $this->getLocalParams($otpParams['yk_publicname']); - - $this->log(LOG_DEBUG, 'validation params: ', $validationParams); - $this->log(LOG_DEBUG, 'OTP params: ', $otpParams); - - /* Check for warnings */ - - if ($this->countersHigherThan($validationParams, $resParams)) + while ($info = curl_multi_info_read($mh)) { + $handle = $info['handle']; + $server = curl_getinfo($handle, CURLINFO_PRIVATE); + $entry = $entries[$server]; + $this->log(LOG_DEBUG, "handle indicated to be for $server."); + curl_multi_remove_handle($mh, $handle); + $handles--; + if ($info['result'] === CURLE_OK) { + $response = curl_multi_getcontent($handle); + if (preg_match('/status=OK/', $response)) { - $this->log(LOG_NOTICE, 'Remote server out of sync compared to counters at validation request time. '); - } + $resParams = $this->parseParamsFromMultiLineString($response); + $this->log(LOG_DEBUG, 'response contains ', $resParams); - if ($this->countersHigherThan($resParams, $validationParams)) - { - if ($this->countersEqual($resParams, $otpParams)) + /* Update database counters */ + $this->updateDbCounters($resParams); + + /* Retrieve info from entry info string */ + + /* This is the counter values we had in our database *before* processing the current OTP. */ + $validationParams = $this->localParamsFromInfoString($entry['info']); + + /* This is the data from the current OTP. */ + $otpParams = $this->otpParamsFromInfoString($entry['info']); + + /* Fetch current information from our database */ + $localParams = $this->getLocalParams($otpParams['yk_publicname']); + + $this->log(LOG_DEBUG, 'validation params: ', $validationParams); + $this->log(LOG_DEBUG, 'OTP params: ', $otpParams); + + /* Check for warnings */ + + if ($this->countersHigherThan($validationParams, $resParams)) { - $this->log(LOG_INFO, 'Remote server had received the current counter values already. '); + $this->log(LOG_NOTICE, 'Remote server out of sync compared to counters at validation request time. '); } - else + + if ($this->countersHigherThan($resParams, $validationParams)) { - $this->log(LOG_NOTICE, 'Local server out of sync compared to counters at validation request time. '); + if ($this->countersEqual($resParams, $otpParams)) + { + $this->log(LOG_INFO, 'Remote server had received the current counter values already. '); + } + else + { + $this->log(LOG_NOTICE, 'Local server out of sync compared to counters at validation request time. '); + } } - } - if ($this->countersHigherThan($localParams, $resParams)) - { - $this->log(LOG_WARNING, 'Remote server out of sync compared to current local counters. '); - } + if ($this->countersHigherThan($localParams, $resParams)) + { + $this->log(LOG_WARNING, 'Remote server out of sync compared to current local counters. '); + } - if ($this->countersHigherThan($resParams, $localParams)) - { - $this->log(LOG_WARNING, 'Local server out of sync compared to current local counters. Local server updated. '); - } + if ($this->countersHigherThan($resParams, $localParams)) + { + $this->log(LOG_WARNING, 'Local server out of sync compared to current local counters. Local server updated. '); + } - if ($this->countersHigherThan($resParams, $otpParams)) - { - $this->log(LOG_ERR, 'Remote server has higher counters than OTP. This response would have marked the OTP as invalid. '); - } - elseif ($this->countersEqual($resParams, $otpParams) && $resParams['nonce'] != $otpParams['nonce']) - { - $this->log(LOG_ERR, 'Remote server has equal counters as OTP and nonce differs. This response would have marked the OTP as invalid.'); - } + if ($this->countersHigherThan($resParams, $otpParams)) + { + $this->log(LOG_ERR, 'Remote server has higher counters than OTP. This response would have marked the OTP as invalid. '); + } + elseif ($this->countersEqual($resParams, $otpParams) && $resParams['nonce'] != $otpParams['nonce']) + { + $this->log(LOG_ERR, 'Remote server has equal counters as OTP and nonce differs. This response would have marked the OTP as invalid.'); + } - /* Deletion */ - $this->log(LOG_DEBUG, 'deleting queue entry with modified=' . $entry['modified'] . + /* Deletion */ + $this->log(LOG_DEBUG, 'deleting queue entry with modified=' . $entry['modified'] . ' server_nonce=' . $entry['server_nonce'] . ' server=' . $entry['server']); - $this->db->deleteByMultiple('queue', array( - 'modified' => $entry['modified'], - 'server_nonce' => $entry['server_nonce'], - 'server' => $entry['server'] - )); + $this->db->deleteByMultiple('queue', array( + 'modified' => $entry['modified'], + 'server_nonce' => $entry['server_nonce'], + 'server' => $entry['server'] + )); - } - else if (preg_match('/status=BAD_OTP/', $response)) - { - $this->log(LOG_WARNING, 'Remote server says BAD_OTP, pointless to try again, removing from queue.'); - $this->db->deleteByMultiple('queue', array( - 'modified' => $entry['modified'], - 'server_nonce' => $entry['server_nonce'], - 'server' => $entry['server'] - )); - } - else - { - $this->log(LOG_ERR, 'Remote server refused our sync request. Check remote server logs.'); - } + } + else if (preg_match('/status=BAD_OTP/', $response)) + { + $this->log(LOG_WARNING, 'Remote server says BAD_OTP, pointless to try again, removing from queue.'); + $this->db->deleteByMultiple('queue', array( + 'modified' => $entry['modified'], + 'server_nonce' => $entry['server_nonce'], + 'server' => $entry['server'] + )); + } + else + { + $this->log(LOG_ERR, 'Remote server refused our sync request. Check remote server logs.'); + } - } /* End of loop over each queue entry for a server */ + if($server_list[$server]) { + $entry = array_shift($server_list[$server]); + if(count($server_list[$server]) == 0) { + $this->log(LOG_DEBUG, "All entries for $server synced."); + unset($server_list[$server]); + } + $this->log(LOG_INFO, "server=" . $entry['server'] . ", server_nonce=" . $entry['server_nonce'] . ", info=" . $entry['info']); + $url = $entry['server'] . + "?otp=" . $entry['otp'] . + "&modified=" . $entry['modified'] . + "&" . $this->otpPartFromInfoString($entry['info']); + + curl_settings($this, 'YK-VAL resync', $handle, $url, $timeout, $this->curlopts); + $entries[$server] = $entry; + curl_multi_add_handle($mh, $handle); + $handles++; + } + } else { + $this->log(LOG_NOTICE, 'Timeout. Stopping queue resync for server ' . $entry['server']); + unset($server_list[$server]); + } + } + } + + foreach ($ch as $handle) { curl_close($ch); - $this->db->closeCursor($res); + } - } /* End of loop over each distinct server in queue */ + curl_multi_close($mh); - $this->db->closeCursor($server_res); return true; }