/
www
/
wwwroot
/
alo88.autos
/
wp-content
/
plugins
/
wp-content-crawler
/
app
/
vendor
/
google
/
cloud-core
/
src
/
Batch
/
Upload File
HOME
<?php /** * Copyright 2017 Google Inc. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Google\Cloud\Core\Batch; use Google\Cloud\Core\SysvTrait; /** * Represent batch jobs. * * @experimental The experimental flag means that while we believe this method * or class is ready for use, it may change before release in backwards- * incompatible ways. Please use with caution, and test thoroughly when * upgrading. */ class BatchJob implements JobInterface { const DEFAULT_BATCH_SIZE = 100; const DEFAULT_CALL_PERIOD = 2.0; const DEFAULT_WORKERS = 1; use JobTrait; use SysvTrait; use InterruptTrait; use HandleFailureTrait; /** * @var callable The batch job handler. This callable accepts an array * of items and should return a boolean. */ private $func; /** * @var int The size of the batch. */ private $batchSize; /** * @var float The period in seconds from the last execution to force * executing the job. */ private $callPeriod; /** * @param string $identifier Unique identifier of the job. * @param callable $func Any Callable except for Closure. The callable * should accept an array of items as the first argument. * @param int $idNum A numeric id for the job. * @param array $options [optional] { * Configuration options. * * @type int $batchSize The size of the batch. **Defaults to** `100`. * @type float $callPeriod The period in seconds from the last execution * to force executing the job. **Defaults to** `2.0`. * @type int $numWorkers The number of child processes. It only takes * effect with the {@see \Google\Cloud\Core\Batch\BatchDaemon}. * **Defaults to** `1`. * @type string $bootstrapFile A file to load before executing the * job. It's needed for registering global functions. * } */ public function __construct( $identifier, $func, $idNum, array $options = [] ) { $options += [ 'batchSize' => self::DEFAULT_BATCH_SIZE, 'callPeriod' => self::DEFAULT_CALL_PERIOD, 'bootstrapFile' => null, 'numWorkers' => self::DEFAULT_WORKERS ]; $this->identifier = $identifier; $this->func = $func; $this->id = $idNum; $this->batchSize = $options['batchSize']; $this->callPeriod = $options['callPeriod']; $this->bootstrapFile = $options['bootstrapFile']; $this->numWorkers = $options['numWorkers']; $this->initFailureFile(); } /** * Run the job. */ public function run() { $this->setupSignalHandlers(); $sysvKey = $this->getSysvKey($this->id); $q = msg_get_queue($sysvKey); $items = []; $lastInvoked = microtime(true); if (!is_null($this->bootstrapFile)) { require_once($this->bootstrapFile); } while (true) { // Fire SIGALRM after 1 second to unblock the blocking call. pcntl_alarm(1); if (msg_receive( $q, 0, $type, 8192, $message, true, 0, // blocking mode $errorcode )) { if ($type === self::$typeDirect) { $items[] = $message; } elseif ($type === self::$typeFile) { $items[] = unserialize(file_get_contents($message)); @unlink($message); } } pcntl_signal_dispatch(); // It runs the job when // 1. Number of items reaches the batchSize. // 2-a. Count is >0 and the current time is larger than lastInvoked + period. // 2-b. Count is >0 and the shutdown flag is true. if ((count($items) >= $this->batchSize) || (count($items) > 0 && (microtime(true) > $lastInvoked + $this->callPeriod || $this->shutdown))) { printf( 'Running the job with %d items' . PHP_EOL, count($items) ); $this->flush($items); $items = []; $lastInvoked = microtime(true); } gc_collect_cycles(); if ($this->shutdown) { return; } } } /** * Finish any pending activity for this job. * * @param array $items * @return bool */ public function flush(array $items = []) { if (! $this->callFunc($items)) { $this->handleFailure($this->id, $items); return false; } return true; } /** * Finish any pending activity for this job. * * @access private * @internal * * @param array $items * @return bool */ public function callFunc(array $items = []) { return call_user_func_array($this->func, [$items]); } /** * Returns the period in seconds from the last execution to force * executing the job. * * @return float */ public function getCallPeriod() { return $this->callPeriod; } /** * Returns the batch size. * * @return int */ public function getBatchSize() { return $this->batchSize; } }