vendor/ruflin/elastica/src/Client.php line 545

Open in your IDE?
  1. <?php
  2. namespace Elastica;
  3. use Elastica\Bulk\Action;
  4. use Elastica\Bulk\ResponseSet;
  5. use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
  6. use Elastica\Exception\ClientException;
  7. use Elastica\Exception\ConnectionException;
  8. use Elastica\Exception\InvalidException;
  9. use Elastica\Exception\ResponseException;
  10. use Elastica\Script\AbstractScript;
  11. use Elasticsearch\Endpoints\AbstractEndpoint;
  12. use Elasticsearch\Endpoints\ClosePointInTime;
  13. use Elasticsearch\Endpoints\Indices\ForceMerge;
  14. use Elasticsearch\Endpoints\Indices\Refresh;
  15. use Elasticsearch\Endpoints\Update;
  16. use Psr\Log\LoggerInterface;
  17. use Psr\Log\NullLogger;
  18. /**
  19.  * Client to connect the the elasticsearch server.
  20.  *
  21.  * @author Nicolas Ruflin <spam@ruflin.com>
  22.  */
  23. class Client
  24. {
  25.     /**
  26.      * @var ClientConfiguration
  27.      */
  28.     protected $_config;
  29.     /**
  30.      * @var callable
  31.      */
  32.     protected $_callback;
  33.     /**
  34.      * @var Connection\ConnectionPool
  35.      */
  36.     protected $_connectionPool;
  37.     /**
  38.      * @var Request|null
  39.      */
  40.     protected $_lastRequest;
  41.     /**
  42.      * @var Response|null
  43.      */
  44.     protected $_lastResponse;
  45.     /**
  46.      * @var LoggerInterface
  47.      */
  48.     protected $_logger;
  49.     /**
  50.      * @var string
  51.      */
  52.     protected $_version;
  53.     /**
  54.      * Creates a new Elastica client.
  55.      *
  56.      * @param array|string  $config   OPTIONAL Additional config or DSN of options
  57.      * @param callable|null $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down)
  58.      *
  59.      * @throws InvalidException
  60.      */
  61.     public function __construct($config = [], ?callable $callback null, ?LoggerInterface $logger null)
  62.     {
  63.         if (\is_string($config)) {
  64.             $configuration ClientConfiguration::fromDsn($config);
  65.         } elseif (\is_array($config)) {
  66.             $configuration ClientConfiguration::fromArray($config);
  67.         } else {
  68.             throw new InvalidException('Config parameter must be an array or a string.');
  69.         }
  70.         $this->_config $configuration;
  71.         $this->_callback $callback;
  72.         $this->_logger $logger ?? new NullLogger();
  73.         $this->_initConnections();
  74.     }
  75.     /**
  76.      * Get current version.
  77.      *
  78.      * @throws ClientException
  79.      * @throws ConnectionException
  80.      * @throws ResponseException
  81.      */
  82.     public function getVersion(): string
  83.     {
  84.         if ($this->_version) {
  85.             return $this->_version;
  86.         }
  87.         $data $this->request('/')->getData();
  88.         return $this->_version $data['version']['number'];
  89.     }
  90.     /**
  91.      * Sets specific config values (updates and keeps default values).
  92.      *
  93.      * @param array $config Params
  94.      */
  95.     public function setConfig(array $config): self
  96.     {
  97.         foreach ($config as $key => $value) {
  98.             $this->_config->set($key$value);
  99.         }
  100.         return $this;
  101.     }
  102.     /**
  103.      * Returns a specific config key or the whole config array if not set.
  104.      *
  105.      * @throws InvalidException if the given key is not found in the configuration
  106.      *
  107.      * @return array|bool|string
  108.      */
  109.     public function getConfig(string $key '')
  110.     {
  111.         return $this->_config->get($key);
  112.     }
  113.     /**
  114.      * Sets / overwrites a specific config value.
  115.      *
  116.      * @param mixed $value Value
  117.      */
  118.     public function setConfigValue(string $key$value): self
  119.     {
  120.         return $this->setConfig([$key => $value]);
  121.     }
  122.     /**
  123.      * @param array|string $keys    config key or path of config keys
  124.      * @param mixed        $default default value will be returned if key was not found
  125.      *
  126.      * @return mixed
  127.      */
  128.     public function getConfigValue($keys$default null)
  129.     {
  130.         $value $this->_config->getAll();
  131.         foreach ((array) $keys as $key) {
  132.             if (isset($value[$key])) {
  133.                 $value $value[$key];
  134.             } else {
  135.                 return $default;
  136.             }
  137.         }
  138.         return $value;
  139.     }
  140.     /**
  141.      * Returns the index for the given connection.
  142.      */
  143.     public function getIndex(string $name): Index
  144.     {
  145.         return new Index($this$name);
  146.     }
  147.     /**
  148.      * Adds a HTTP Header.
  149.      */
  150.     public function addHeader(string $headerstring $value): self
  151.     {
  152.         if ($this->_config->has('headers')) {
  153.             $headers $this->_config->get('headers');
  154.         } else {
  155.             $headers = [];
  156.         }
  157.         $headers[$header] = $value;
  158.         $this->_config->set('headers'$headers);
  159.         return $this;
  160.     }
  161.     /**
  162.      * Remove a HTTP Header.
  163.      */
  164.     public function removeHeader(string $header): self
  165.     {
  166.         if ($this->_config->has('headers')) {
  167.             $headers $this->_config->get('headers');
  168.             unset($headers[$header]);
  169.             $this->_config->set('headers'$headers);
  170.         }
  171.         return $this;
  172.     }
  173.     /**
  174.      * Uses _bulk to send documents to the server.
  175.      *
  176.      * Array of \Elastica\Document as input. Index has to be set inside the
  177.      * document, because for bulk settings documents, documents can belong to
  178.      * any index
  179.      *
  180.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  181.      *
  182.      * @param array|Document[] $docs Array of Elastica\Document
  183.      *
  184.      * @throws InvalidException      If docs is empty
  185.      * @throws ClientException
  186.      * @throws ConnectionException
  187.      * @throws ResponseException
  188.      * @throws BulkResponseException
  189.      */
  190.     public function updateDocuments(array $docs, array $requestParams = []): ResponseSet
  191.     {
  192.         if (!$docs) {
  193.             throw new InvalidException('Array has to consist of at least one element');
  194.         }
  195.         $bulk = new Bulk($this);
  196.         $bulk->addDocuments($docsAction::OP_TYPE_UPDATE);
  197.         foreach ($requestParams as $key => $value) {
  198.             $bulk->setRequestParam($key$value);
  199.         }
  200.         return $bulk->send();
  201.     }
  202.     /**
  203.      * Uses _bulk to send documents to the server.
  204.      *
  205.      * Array of \Elastica\Document as input. Index has to be set inside the
  206.      * document, because for bulk settings documents, documents can belong to
  207.      * any index
  208.      *
  209.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  210.      *
  211.      * @param array|Document[] $docs Array of Elastica\Document
  212.      *
  213.      * @throws InvalidException      If docs is empty
  214.      * @throws ClientException
  215.      * @throws ConnectionException
  216.      * @throws ResponseException
  217.      * @throws BulkResponseException
  218.      */
  219.     public function addDocuments(array $docs, array $requestParams = []): ResponseSet
  220.     {
  221.         if (!$docs) {
  222.             throw new InvalidException('Array has to consist of at least one element');
  223.         }
  224.         $bulk = new Bulk($this);
  225.         $bulk->addDocuments($docs);
  226.         foreach ($requestParams as $key => $value) {
  227.             $bulk->setRequestParam($key$value);
  228.         }
  229.         return $bulk->send();
  230.     }
  231.     /**
  232.      * Update document, using update script. Requires elasticsearch >= 0.19.0.
  233.      *
  234.      * @param int|string                    $id      document id
  235.      * @param AbstractScript|array|Document $data    raw data for request body
  236.      * @param string                        $index   index to update
  237.      * @param array                         $options array of query params to use for query. For possible options check es api
  238.      *
  239.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
  240.      *
  241.      * @throws ClientException
  242.      * @throws ConnectionException
  243.      * @throws ResponseException
  244.      */
  245.     public function updateDocument($id$data$index, array $options = []): Response
  246.     {
  247.         $endpoint = new Update();
  248.         $endpoint->setId($id);
  249.         $endpoint->setIndex($index);
  250.         if ($data instanceof AbstractScript) {
  251.             $requestData $data->toArray();
  252.         } elseif ($data instanceof Document) {
  253.             $requestData = ['doc' => $data->getData()];
  254.             if ($data->getDocAsUpsert()) {
  255.                 $requestData['doc_as_upsert'] = true;
  256.             }
  257.             $docOptions $data->getOptions(
  258.                 [
  259.                     'consistency',
  260.                     'parent',
  261.                     'percolate',
  262.                     'refresh',
  263.                     'replication',
  264.                     'retry_on_conflict',
  265.                     'routing',
  266.                     'timeout',
  267.                 ]
  268.             );
  269.             $options += $docOptions;
  270.         } else {
  271.             $requestData $data;
  272.         }
  273.         // If an upsert document exists
  274.         if ($data instanceof AbstractScript || $data instanceof Document) {
  275.             if ($data->hasUpsert()) {
  276.                 $requestData['upsert'] = $data->getUpsert()->getData();
  277.             }
  278.         }
  279.         $endpoint->setBody($requestData);
  280.         $endpoint->setParams($options);
  281.         $response $this->requestEndpoint($endpoint);
  282.         if ($response->isOk()
  283.             && $data instanceof Document
  284.             && ($data->isAutoPopulate() || $this->getConfigValue(['document''autoPopulate'], false))
  285.         ) {
  286.             $data->setVersionParams($response->getData());
  287.         }
  288.         return $response;
  289.     }
  290.     /**
  291.      * Bulk deletes documents.
  292.      *
  293.      * @param array|Document[] $docs
  294.      *
  295.      * @throws InvalidException
  296.      * @throws ClientException
  297.      * @throws ConnectionException
  298.      * @throws ResponseException
  299.      * @throws BulkResponseException
  300.      */
  301.     public function deleteDocuments(array $docs, array $requestParams = []): ResponseSet
  302.     {
  303.         if (!$docs) {
  304.             throw new InvalidException('Array has to consist of at least one element');
  305.         }
  306.         $bulk = new Bulk($this);
  307.         $bulk->addDocuments($docsAction::OP_TYPE_DELETE);
  308.         foreach ($requestParams as $key => $value) {
  309.             $bulk->setRequestParam($key$value);
  310.         }
  311.         return $bulk->send();
  312.     }
  313.     /**
  314.      * Returns the status object for all indices.
  315.      *
  316.      * @return Status
  317.      */
  318.     public function getStatus()
  319.     {
  320.         return new Status($this);
  321.     }
  322.     /**
  323.      * Returns the current cluster.
  324.      *
  325.      * @return Cluster
  326.      */
  327.     public function getCluster()
  328.     {
  329.         return new Cluster($this);
  330.     }
  331.     /**
  332.      * Establishes the client connections.
  333.      */
  334.     public function connect()
  335.     {
  336.         $this->_initConnections();
  337.     }
  338.     /**
  339.      * @return $this
  340.      */
  341.     public function addConnection(Connection $connection)
  342.     {
  343.         $this->_connectionPool->addConnection($connection);
  344.         return $this;
  345.     }
  346.     /**
  347.      * Determines whether a valid connection is available for use.
  348.      *
  349.      * @return bool
  350.      */
  351.     public function hasConnection()
  352.     {
  353.         return $this->_connectionPool->hasConnection();
  354.     }
  355.     /**
  356.      * @throws ClientException
  357.      *
  358.      * @return Connection
  359.      */
  360.     public function getConnection()
  361.     {
  362.         return $this->_connectionPool->getConnection();
  363.     }
  364.     /**
  365.      * @return Connection[]
  366.      */
  367.     public function getConnections()
  368.     {
  369.         return $this->_connectionPool->getConnections();
  370.     }
  371.     /**
  372.      * @return \Elastica\Connection\Strategy\StrategyInterface
  373.      */
  374.     public function getConnectionStrategy()
  375.     {
  376.         return $this->_connectionPool->getStrategy();
  377.     }
  378.     /**
  379.      * @param array|Connection[] $connections
  380.      *
  381.      * @return $this
  382.      */
  383.     public function setConnections(array $connections)
  384.     {
  385.         $this->_connectionPool->setConnections($connections);
  386.         return $this;
  387.     }
  388.     /**
  389.      * Deletes documents with the given ids, index, type from the index.
  390.      *
  391.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  392.      *
  393.      * @param array        $ids     Document ids
  394.      * @param Index|string $index   Index name
  395.      * @param bool|string  $routing Optional routing key for all ids
  396.      *
  397.      * @throws InvalidException
  398.      * @throws ClientException
  399.      * @throws ConnectionException
  400.      * @throws ResponseException
  401.      * @throws BulkResponseException
  402.      */
  403.     public function deleteIds(array $ids$index$routing false): ResponseSet
  404.     {
  405.         if (!$ids) {
  406.             throw new InvalidException('Array has to consist of at least one id');
  407.         }
  408.         $bulk = new Bulk($this);
  409.         $bulk->setIndex($index);
  410.         foreach ($ids as $id) {
  411.             $action = new Action(Action::OP_TYPE_DELETE);
  412.             $action->setId($id);
  413.             if (!empty($routing)) {
  414.                 $action->setRouting($routing);
  415.             }
  416.             $bulk->addAction($action);
  417.         }
  418.         return $bulk->send();
  419.     }
  420.     /**
  421.      * Bulk operation.
  422.      *
  423.      * Every entry in the params array has to exactly on array
  424.      * of the bulk operation. An example param array would be:
  425.      *
  426.      * array(
  427.      *         array('index' => array('_index' => 'test', '_id' => '1')),
  428.      *         array('field1' => 'value1'),
  429.      *         array('delete' => array('_index' => 'test', '_id' => '2')),
  430.      *         array('create' => array('_index' => 'test', '_id' => '3')),
  431.      *         array('field1' => 'value3'),
  432.      *         array('update' => array('_id' => '1', '_index' => 'test')),
  433.      *         array('doc' => array('field2' => 'value2')),
  434.      * );
  435.      *
  436.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  437.      *
  438.      * @throws ResponseException
  439.      * @throws InvalidException
  440.      * @throws ClientException
  441.      * @throws ConnectionException
  442.      * @throws BulkResponseException
  443.      */
  444.     public function bulk(array $params): ResponseSet
  445.     {
  446.         if (!$params) {
  447.             throw new InvalidException('Array has to consist of at least one param');
  448.         }
  449.         $bulk = new Bulk($this);
  450.         $bulk->addRawData($params);
  451.         return $bulk->send();
  452.     }
  453.     /**
  454.      * Makes calls to the elasticsearch server based on this index.
  455.      *
  456.      * It's possible to make any REST query directly over this method
  457.      *
  458.      * @param string       $path        Path to call
  459.      * @param string       $method      Rest method to use (GET, POST, DELETE, PUT)
  460.      * @param array|string $data        OPTIONAL Arguments as array or pre-encoded string
  461.      * @param array        $query       OPTIONAL Query params
  462.      * @param string       $contentType Content-Type sent with this request
  463.      *
  464.      * @throws ClientException
  465.      * @throws ConnectionException
  466.      * @throws ResponseException
  467.      */
  468.     public function request(string $pathstring $method Request::GET$data = [], array $query = [], string $contentType Request::DEFAULT_CONTENT_TYPE): Response
  469.     {
  470.         $connection $this->getConnection();
  471.         $request $this->_lastRequest = new Request($path$method$data$query$connection$contentType);
  472.         $this->_lastResponse null;
  473.         try {
  474.             $response $this->_lastResponse $request->send();
  475.         } catch (ConnectionException $e) {
  476.             $this->_connectionPool->onFail($connection$e$this);
  477.             $this->_logger->error('Elastica Request Failure', [
  478.                 'exception' => $e,
  479.                 'request' => $e->getRequest()->toArray(),
  480.                 'retry' => $this->hasConnection(),
  481.             ]);
  482.             // In case there is no valid connection left, throw exception which caused the disabling of the connection.
  483.             if (!$this->hasConnection()) {
  484.                 throw $e;
  485.             }
  486.             return $this->request($path$method$data$query);
  487.         }
  488.         $this->_logger->debug('Elastica Request', [
  489.             'request' => $request->toArray(),
  490.             'response' => $response->getData(),
  491.             'responseStatus' => $response->getStatus(),
  492.         ]);
  493.         return $response;
  494.     }
  495.     /**
  496.      * Makes calls to the elasticsearch server with usage official client Endpoint.
  497.      *
  498.      * @throws ClientException
  499.      * @throws ConnectionException
  500.      * @throws ResponseException
  501.      */
  502.     public function requestEndpoint(AbstractEndpoint $endpoint): Response
  503.     {
  504.         return $this->request(
  505.             \ltrim($endpoint->getURI(), '/'),
  506.             $endpoint->getMethod(),
  507.             $endpoint->getBody() ?? [],
  508.             $endpoint->getParams()
  509.         );
  510.     }
  511.     /**
  512.      * Force merges all search indices.
  513.      *
  514.      * @param array $args OPTIONAL Optional arguments
  515.      *
  516.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
  517.      *
  518.      * @throws ClientException
  519.      * @throws ConnectionException
  520.      * @throws ResponseException
  521.      */
  522.     public function forcemergeAll($args = []): Response
  523.     {
  524.         $endpoint = new ForceMerge();
  525.         $endpoint->setParams($args);
  526.         return $this->requestEndpoint($endpoint);
  527.     }
  528.     /**
  529.      * Closes the given PointInTime.
  530.      *
  531.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html#close-point-in-time-api
  532.      *
  533.      * @throws ClientException
  534.      * @throws ConnectionException
  535.      * @throws ResponseException
  536.      */
  537.     public function closePointInTime(string $pointInTimeId): Response
  538.     {
  539.         $endpoint = new ClosePointInTime();
  540.         $endpoint->setBody(['id' => $pointInTimeId]);
  541.         return $this->requestEndpoint($endpoint);
  542.     }
  543.     /**
  544.      * Refreshes all search indices.
  545.      *
  546.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
  547.      *
  548.      * @throws ClientException
  549.      * @throws ConnectionException
  550.      * @throws ResponseException
  551.      */
  552.     public function refreshAll(): Response
  553.     {
  554.         return $this->requestEndpoint(new Refresh());
  555.     }
  556.     public function getLastRequest(): ?Request
  557.     {
  558.         return $this->_lastRequest;
  559.     }
  560.     public function getLastResponse(): ?Response
  561.     {
  562.         return $this->_lastResponse;
  563.     }
  564.     /**
  565.      * Replace the existing logger.
  566.      *
  567.      * @return $this
  568.      */
  569.     public function setLogger(LoggerInterface $logger)
  570.     {
  571.         $this->_logger $logger;
  572.         return $this;
  573.     }
  574.     /**
  575.      * Inits the client connections.
  576.      */
  577.     protected function _initConnections(): void
  578.     {
  579.         $connections = [];
  580.         foreach ($this->getConfig('connections') as $connection) {
  581.             $connections[] = Connection::create($this->_prepareConnectionParams($connection));
  582.         }
  583.         if ($this->_config->has('servers')) {
  584.             $servers $this->_config->get('servers');
  585.             foreach ($servers as $server) {
  586.                 $connections[] = Connection::create($this->_prepareConnectionParams($server));
  587.             }
  588.         }
  589.         // If no connections set, create default connection
  590.         if (!$connections) {
  591.             $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
  592.         }
  593.         if (!$this->_config->has('connectionStrategy')) {
  594.             if (true === $this->getConfig('roundRobin')) {
  595.                 $this->setConfigValue('connectionStrategy''RoundRobin');
  596.             } else {
  597.                 $this->setConfigValue('connectionStrategy''Simple');
  598.             }
  599.         }
  600.         $strategy Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));
  601.         $this->_connectionPool = new Connection\ConnectionPool($connections$strategy$this->_callback);
  602.     }
  603.     /**
  604.      * Creates a Connection params array from a Client or server config array.
  605.      */
  606.     protected function _prepareConnectionParams(array $config): array
  607.     {
  608.         $params = [];
  609.         $params['config'] = [];
  610.         foreach ($config as $key => $value) {
  611.             if (\in_array($key, ['bigintConversion''curl''headers''url'])) {
  612.                 $params['config'][$key] = $value;
  613.             } else {
  614.                 $params[$key] = $value;
  615.             }
  616.         }
  617.         return $params;
  618.     }
  619. }