<?php
namespace Elastica;
use Elastica\Bulk\Action;
use Elastica\Bulk\ResponseSet;
use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
use Elastica\Exception\ClientException;
use Elastica\Exception\ConnectionException;
use Elastica\Exception\InvalidException;
use Elastica\Exception\ResponseException;
use Elastica\Script\AbstractScript;
use Elasticsearch\Endpoints\AbstractEndpoint;
use Elasticsearch\Endpoints\ClosePointInTime;
use Elasticsearch\Endpoints\Indices\ForceMerge;
use Elasticsearch\Endpoints\Indices\Refresh;
use Elasticsearch\Endpoints\Update;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
/**
* Client to connect the the elasticsearch server.
*
* @author Nicolas Ruflin <spam@ruflin.com>
*/
class Client
{
/**
* @var ClientConfiguration
*/
protected $_config;
/**
* @var callable
*/
protected $_callback;
/**
* @var Connection\ConnectionPool
*/
protected $_connectionPool;
/**
* @var Request|null
*/
protected $_lastRequest;
/**
* @var Response|null
*/
protected $_lastResponse;
/**
* @var LoggerInterface
*/
protected $_logger;
/**
* @var string
*/
protected $_version;
/**
* Creates a new Elastica client.
*
* @param array|string $config OPTIONAL Additional config or DSN of options
* @param callable|null $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down)
*
* @throws InvalidException
*/
public function __construct($config = [], ?callable $callback = null, ?LoggerInterface $logger = null)
{
if (\is_string($config)) {
$configuration = ClientConfiguration::fromDsn($config);
} elseif (\is_array($config)) {
$configuration = ClientConfiguration::fromArray($config);
} else {
throw new InvalidException('Config parameter must be an array or a string.');
}
$this->_config = $configuration;
$this->_callback = $callback;
$this->_logger = $logger ?? new NullLogger();
$this->_initConnections();
}
/**
* Get current version.
*
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
*/
public function getVersion(): string
{
if ($this->_version) {
return $this->_version;
}
$data = $this->request('/')->getData();
return $this->_version = $data['version']['number'];
}
/**
* Sets specific config values (updates and keeps default values).
*
* @param array $config Params
*/
public function setConfig(array $config): self
{
foreach ($config as $key => $value) {
$this->_config->set($key, $value);
}
return $this;
}
/**
* Returns a specific config key or the whole config array if not set.
*
* @throws InvalidException if the given key is not found in the configuration
*
* @return array|bool|string
*/
public function getConfig(string $key = '')
{
return $this->_config->get($key);
}
/**
* Sets / overwrites a specific config value.
*
* @param mixed $value Value
*/
public function setConfigValue(string $key, $value): self
{
return $this->setConfig([$key => $value]);
}
/**
* @param array|string $keys config key or path of config keys
* @param mixed $default default value will be returned if key was not found
*
* @return mixed
*/
public function getConfigValue($keys, $default = null)
{
$value = $this->_config->getAll();
foreach ((array) $keys as $key) {
if (isset($value[$key])) {
$value = $value[$key];
} else {
return $default;
}
}
return $value;
}
/**
* Returns the index for the given connection.
*/
public function getIndex(string $name): Index
{
return new Index($this, $name);
}
/**
* Adds a HTTP Header.
*/
public function addHeader(string $header, string $value): self
{
if ($this->_config->has('headers')) {
$headers = $this->_config->get('headers');
} else {
$headers = [];
}
$headers[$header] = $value;
$this->_config->set('headers', $headers);
return $this;
}
/**
* Remove a HTTP Header.
*/
public function removeHeader(string $header): self
{
if ($this->_config->has('headers')) {
$headers = $this->_config->get('headers');
unset($headers[$header]);
$this->_config->set('headers', $headers);
}
return $this;
}
/**
* Uses _bulk to send documents to the server.
*
* Array of \Elastica\Document as input. Index has to be set inside the
* document, because for bulk settings documents, documents can belong to
* any index
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
*
* @param array|Document[] $docs Array of Elastica\Document
*
* @throws InvalidException If docs is empty
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
* @throws BulkResponseException
*/
public function updateDocuments(array $docs, array $requestParams = []): ResponseSet
{
if (!$docs) {
throw new InvalidException('Array has to consist of at least one element');
}
$bulk = new Bulk($this);
$bulk->addDocuments($docs, Action::OP_TYPE_UPDATE);
foreach ($requestParams as $key => $value) {
$bulk->setRequestParam($key, $value);
}
return $bulk->send();
}
/**
* Uses _bulk to send documents to the server.
*
* Array of \Elastica\Document as input. Index has to be set inside the
* document, because for bulk settings documents, documents can belong to
* any index
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
*
* @param array|Document[] $docs Array of Elastica\Document
*
* @throws InvalidException If docs is empty
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
* @throws BulkResponseException
*/
public function addDocuments(array $docs, array $requestParams = []): ResponseSet
{
if (!$docs) {
throw new InvalidException('Array has to consist of at least one element');
}
$bulk = new Bulk($this);
$bulk->addDocuments($docs);
foreach ($requestParams as $key => $value) {
$bulk->setRequestParam($key, $value);
}
return $bulk->send();
}
/**
* Update document, using update script. Requires elasticsearch >= 0.19.0.
*
* @param int|string $id document id
* @param AbstractScript|array|Document $data raw data for request body
* @param string $index index to update
* @param array $options array of query params to use for query. For possible options check es api
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
*
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
*/
public function updateDocument($id, $data, $index, array $options = []): Response
{
$endpoint = new Update();
$endpoint->setId($id);
$endpoint->setIndex($index);
if ($data instanceof AbstractScript) {
$requestData = $data->toArray();
} elseif ($data instanceof Document) {
$requestData = ['doc' => $data->getData()];
if ($data->getDocAsUpsert()) {
$requestData['doc_as_upsert'] = true;
}
$docOptions = $data->getOptions(
[
'consistency',
'parent',
'percolate',
'refresh',
'replication',
'retry_on_conflict',
'routing',
'timeout',
]
);
$options += $docOptions;
} else {
$requestData = $data;
}
// If an upsert document exists
if ($data instanceof AbstractScript || $data instanceof Document) {
if ($data->hasUpsert()) {
$requestData['upsert'] = $data->getUpsert()->getData();
}
}
$endpoint->setBody($requestData);
$endpoint->setParams($options);
$response = $this->requestEndpoint($endpoint);
if ($response->isOk()
&& $data instanceof Document
&& ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false))
) {
$data->setVersionParams($response->getData());
}
return $response;
}
/**
* Bulk deletes documents.
*
* @param array|Document[] $docs
*
* @throws InvalidException
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
* @throws BulkResponseException
*/
public function deleteDocuments(array $docs, array $requestParams = []): ResponseSet
{
if (!$docs) {
throw new InvalidException('Array has to consist of at least one element');
}
$bulk = new Bulk($this);
$bulk->addDocuments($docs, Action::OP_TYPE_DELETE);
foreach ($requestParams as $key => $value) {
$bulk->setRequestParam($key, $value);
}
return $bulk->send();
}
/**
* Returns the status object for all indices.
*
* @return Status
*/
public function getStatus()
{
return new Status($this);
}
/**
* Returns the current cluster.
*
* @return Cluster
*/
public function getCluster()
{
return new Cluster($this);
}
/**
* Establishes the client connections.
*/
public function connect()
{
$this->_initConnections();
}
/**
* @return $this
*/
public function addConnection(Connection $connection)
{
$this->_connectionPool->addConnection($connection);
return $this;
}
/**
* Determines whether a valid connection is available for use.
*
* @return bool
*/
public function hasConnection()
{
return $this->_connectionPool->hasConnection();
}
/**
* @throws ClientException
*
* @return Connection
*/
public function getConnection()
{
return $this->_connectionPool->getConnection();
}
/**
* @return Connection[]
*/
public function getConnections()
{
return $this->_connectionPool->getConnections();
}
/**
* @return \Elastica\Connection\Strategy\StrategyInterface
*/
public function getConnectionStrategy()
{
return $this->_connectionPool->getStrategy();
}
/**
* @param array|Connection[] $connections
*
* @return $this
*/
public function setConnections(array $connections)
{
$this->_connectionPool->setConnections($connections);
return $this;
}
/**
* Deletes documents with the given ids, index, type from the index.
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
*
* @param array $ids Document ids
* @param Index|string $index Index name
* @param bool|string $routing Optional routing key for all ids
*
* @throws InvalidException
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
* @throws BulkResponseException
*/
public function deleteIds(array $ids, $index, $routing = false): ResponseSet
{
if (!$ids) {
throw new InvalidException('Array has to consist of at least one id');
}
$bulk = new Bulk($this);
$bulk->setIndex($index);
foreach ($ids as $id) {
$action = new Action(Action::OP_TYPE_DELETE);
$action->setId($id);
if (!empty($routing)) {
$action->setRouting($routing);
}
$bulk->addAction($action);
}
return $bulk->send();
}
/**
* Bulk operation.
*
* Every entry in the params array has to exactly on array
* of the bulk operation. An example param array would be:
*
* array(
* array('index' => array('_index' => 'test', '_id' => '1')),
* array('field1' => 'value1'),
* array('delete' => array('_index' => 'test', '_id' => '2')),
* array('create' => array('_index' => 'test', '_id' => '3')),
* array('field1' => 'value3'),
* array('update' => array('_id' => '1', '_index' => 'test')),
* array('doc' => array('field2' => 'value2')),
* );
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
*
* @throws ResponseException
* @throws InvalidException
* @throws ClientException
* @throws ConnectionException
* @throws BulkResponseException
*/
public function bulk(array $params): ResponseSet
{
if (!$params) {
throw new InvalidException('Array has to consist of at least one param');
}
$bulk = new Bulk($this);
$bulk->addRawData($params);
return $bulk->send();
}
/**
* Makes calls to the elasticsearch server based on this index.
*
* It's possible to make any REST query directly over this method
*
* @param string $path Path to call
* @param string $method Rest method to use (GET, POST, DELETE, PUT)
* @param array|string $data OPTIONAL Arguments as array or pre-encoded string
* @param array $query OPTIONAL Query params
* @param string $contentType Content-Type sent with this request
*
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
*/
public function request(string $path, string $method = Request::GET, $data = [], array $query = [], string $contentType = Request::DEFAULT_CONTENT_TYPE): Response
{
$connection = $this->getConnection();
$request = $this->_lastRequest = new Request($path, $method, $data, $query, $connection, $contentType);
$this->_lastResponse = null;
try {
$response = $this->_lastResponse = $request->send();
} catch (ConnectionException $e) {
$this->_connectionPool->onFail($connection, $e, $this);
$this->_logger->error('Elastica Request Failure', [
'exception' => $e,
'request' => $e->getRequest()->toArray(),
'retry' => $this->hasConnection(),
]);
// In case there is no valid connection left, throw exception which caused the disabling of the connection.
if (!$this->hasConnection()) {
throw $e;
}
return $this->request($path, $method, $data, $query);
}
$this->_logger->debug('Elastica Request', [
'request' => $request->toArray(),
'response' => $response->getData(),
'responseStatus' => $response->getStatus(),
]);
return $response;
}
/**
* Makes calls to the elasticsearch server with usage official client Endpoint.
*
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
*/
public function requestEndpoint(AbstractEndpoint $endpoint): Response
{
return $this->request(
\ltrim($endpoint->getURI(), '/'),
$endpoint->getMethod(),
$endpoint->getBody() ?? [],
$endpoint->getParams()
);
}
/**
* Force merges all search indices.
*
* @param array $args OPTIONAL Optional arguments
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
*
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
*/
public function forcemergeAll($args = []): Response
{
$endpoint = new ForceMerge();
$endpoint->setParams($args);
return $this->requestEndpoint($endpoint);
}
/**
* Closes the given PointInTime.
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html#close-point-in-time-api
*
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
*/
public function closePointInTime(string $pointInTimeId): Response
{
$endpoint = new ClosePointInTime();
$endpoint->setBody(['id' => $pointInTimeId]);
return $this->requestEndpoint($endpoint);
}
/**
* Refreshes all search indices.
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
*
* @throws ClientException
* @throws ConnectionException
* @throws ResponseException
*/
public function refreshAll(): Response
{
return $this->requestEndpoint(new Refresh());
}
public function getLastRequest(): ?Request
{
return $this->_lastRequest;
}
public function getLastResponse(): ?Response
{
return $this->_lastResponse;
}
/**
* Replace the existing logger.
*
* @return $this
*/
public function setLogger(LoggerInterface $logger)
{
$this->_logger = $logger;
return $this;
}
/**
* Inits the client connections.
*/
protected function _initConnections(): void
{
$connections = [];
foreach ($this->getConfig('connections') as $connection) {
$connections[] = Connection::create($this->_prepareConnectionParams($connection));
}
if ($this->_config->has('servers')) {
$servers = $this->_config->get('servers');
foreach ($servers as $server) {
$connections[] = Connection::create($this->_prepareConnectionParams($server));
}
}
// If no connections set, create default connection
if (!$connections) {
$connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
}
if (!$this->_config->has('connectionStrategy')) {
if (true === $this->getConfig('roundRobin')) {
$this->setConfigValue('connectionStrategy', 'RoundRobin');
} else {
$this->setConfigValue('connectionStrategy', 'Simple');
}
}
$strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));
$this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback);
}
/**
* Creates a Connection params array from a Client or server config array.
*/
protected function _prepareConnectionParams(array $config): array
{
$params = [];
$params['config'] = [];
foreach ($config as $key => $value) {
if (\in_array($key, ['bigintConversion', 'curl', 'headers', 'url'])) {
$params['config'][$key] = $value;
} else {
$params[$key] = $value;
}
}
return $params;
}
}