DataCollectionClient.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. <?php
  2. /*
  3. * Licensed to the Apache Software Foundation (ASF) under one
  4. * or more contributor license agreements. See the NOTICE file
  5. * distributed with this work for additional information
  6. * regarding copyright ownership. The ASF licenses this file
  7. * to you under the Apache License, Version 2.0 (the
  8. * "License"); you may not use this file except in compliance
  9. * with the License. You may obtain a copy of the License at
  10. *
  11. * http://www.apache.org/licenses/LICENSE-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing,
  14. * software distributed under the License is distributed on an
  15. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. * KIND, either express or implied. See the License for the
  17. * specific language governing permissions and limitations
  18. * under the License.
  19. */
  20. namespace OpenSearch\Client;
  21. use OpenSearch\Generated\DataCollection\Command;
  22. use OpenSearch\Generated\DataCollection\Constant;
  23. use OpenSearch\Generated\DataCollection\DataCollectionServiceIf;
  24. use OpenSearch\Client\OpenSearchClient;
  25. /**
  26. * 数据采集文档推送类。
  27. *
  28. * 管理搜索应用的数据采集文档推送,包含单条推送文档、批量推送文档等。
  29. *
  30. */
  31. class DataCollectionClient implements DataCollectionServiceIf {
  32. private $openSearchClient;
  33. private $docBuffer = array();
  34. /**
  35. * 构造方法。
  36. *
  37. * @param \OpenSearch\Client\OpenSearchClient $openSearchClient 基础类,负责计算签名,和服务端进行交互和返回结果。
  38. * @return void
  39. */
  40. public function __construct($openSearchClient) {
  41. $this->openSearchClient = $openSearchClient;
  42. }
  43. /**
  44. * 增加一条文档。
  45. *
  46. * > Note:
  47. * >
  48. * > 这条文档只是增加到 SDK Client buffer 中,没有正式提交到服务端;只有调用了 commit 方法才会被提交到服务端。
  49. * > 你可以多次 add 然后调用 commit() 统一提交。
  50. *
  51. * @param array $fields 一条行为数据(或用户数据、物品数据)文档的所有字段,例如array("user_id" => "1021468", "bhv_type" => "click");
  52. * @return \OpenSearch\Generated\Common\OpenSearchResult
  53. */
  54. public function add(array $fields = []) {
  55. $record = array_merge(self::getInternalFields(), $fields);
  56. $doc = array(
  57. Constant::get('DOC_KEY_CMD') => Command::$__names[Command::ADD],
  58. Constant::get('DOC_KEY_FIELDS') => $record
  59. );
  60. $this->docBuffer[] = $doc;
  61. }
  62. /**
  63. * 把 SDK Client buffer 中的文档发布到服务端。
  64. *
  65. * > Note:
  66. * >
  67. * > 在发送之前会把 buffer 中的文档清空,所以如果服务端返回错误需要重试的情况下,需要重新生成文档并 commit,避免丢数据的可能。
  68. *
  69. * @param string $searchAppName 关联的搜索应用名
  70. * @param string $dataCollectionName 数据采集名称,开通时控制台会返回该名称
  71. * @param string $dataCollectionType 数据采集类型:user、item_info、behavior、industry_specific
  72. * @return \OpenSearch\Generated\Common\OpenSearchResult
  73. */
  74. public function commit($searchAppName, $dataCollectionName, $dataCollectionType) {
  75. $docJson = json_encode($this->docBuffer);
  76. $this->docBuffer = array();
  77. return $this->doPush($docJson, $searchAppName, $dataCollectionName, $dataCollectionType);
  78. }
  79. /**
  80. * 批量推送文档。
  81. *
  82. * > Note:
  83. * >
  84. * > 此操作会同步发送文档到服务端。
  85. *
  86. * @param string $docJson 文档 list,为 JSON 格式
  87. * @param string $searchAppName 关联的搜索应用名
  88. * @param string $dataCollectionName 数据采集名称,开通时控制台会返回该名称
  89. * @param string $dataCollectionType 数据采集类型:user、item_info、behavior、industry_specific
  90. * @return \OpenSearch\Generated\Common\OpenSearchResult
  91. */
  92. public function push($docJson, $searchAppName, $dataCollectionName, $dataCollectionType) {
  93. $docs = json_decode($docJson, true);
  94. foreach ($docs as &$doc) {
  95. $fieldsName = Constant::get('DOC_KEY_FIELDS');
  96. $doc[$fieldsName] = array_merge(self::getInternalFields(), $doc[$fieldsName]);
  97. }
  98. return $this->doPush(json_encode($docs), $searchAppName, $dataCollectionName, $dataCollectionType);
  99. }
  100. private function doPush($docJson, $searchAppName, $dataCollectionName, $dataCollectionType) {
  101. $path = self::createPushPath($searchAppName, $dataCollectionName, $dataCollectionType);
  102. return $this->openSearchClient->post($path, $docJson);
  103. }
  104. private static function createPushPath($searchAppName, $dataCollectionName, $dataCollectionType) {
  105. return sprintf("/app-groups/%s/data-collections/%s/data-collection-type/%s/actions/bulk", $searchAppName, $dataCollectionName, $dataCollectionType);
  106. }
  107. private static function getInternalFields() {
  108. return array(
  109. 'sdk_type' => OpenSearchClient::SDK_TYPE,
  110. 'sdk_version' => OpenSearchClient::SDK_VERSION,
  111. 'reach_time' => date("YmdHis", time())
  112. );
  113. }
  114. }