ResumeUploader.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. <?php
  2. namespace Qiniu\Storage;
  3. use Qiniu\Config;
  4. use Qiniu\Http\Client;
  5. use Qiniu\Http\Error;
  6. use Qiniu\Enum\SplitUploadVersion;
  7. /**
  8. * 断点续上传类, 该类主要实现了断点续上传中的分块上传,
  9. * 以及相应地创建块和创建文件过程.
  10. *
  11. * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkblk.html
  12. * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkfile.html
  13. */
  14. final class ResumeUploader
  15. {
  16. private $upToken;
  17. private $key;
  18. private $inputStream;
  19. private $size;
  20. private $params;
  21. private $mime;
  22. private $contexts;
  23. private $finishedEtags;
  24. private $host;
  25. private $bucket;
  26. private $currentUrl;
  27. private $config;
  28. private $resumeRecordFile;
  29. private $version;
  30. private $partSize;
  31. /**
  32. * 上传二进制流到七牛
  33. *
  34. * @param string $upToken 上传凭证
  35. * @param string $key 上传文件名
  36. * @param string $inputStream 上传二进制流
  37. * @param string $size 上传流的大小
  38. * @param string $params 自定义变量
  39. * @param string $mime 上传数据的mimeType
  40. * @param Config $config
  41. * @param string $resumeRecordFile 断点续传的已上传的部分信息记录文件
  42. * @param string $version 分片上传版本 目前支持v1/v2版本 默认v1
  43. * @param string $partSize 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
  44. *
  45. * @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
  46. */
  47. public function __construct(
  48. $upToken,
  49. $key,
  50. $inputStream,
  51. $size,
  52. $params,
  53. $mime,
  54. $config,
  55. $resumeRecordFile = null,
  56. $version = 'v1',
  57. $partSize = config::BLOCK_SIZE
  58. ) {
  59. $this->upToken = $upToken;
  60. $this->key = $key;
  61. $this->inputStream = $inputStream;
  62. $this->size = $size;
  63. $this->params = $params;
  64. $this->mime = $mime;
  65. $this->contexts = array();
  66. $this->finishedEtags = array("etags"=>array(), "uploadId"=>"", "expiredAt"=>0, "uploaded"=>0);
  67. $this->config = $config;
  68. $this->resumeRecordFile = $resumeRecordFile ? $resumeRecordFile : null;
  69. $this->partSize = $partSize ? $partSize : config::BLOCK_SIZE;
  70. try {
  71. $this->version = SplitUploadVersion::from($version ? $version : 'v1');
  72. } catch (\Exception $e) {
  73. throw new \Exception("only support v1/v2 now!", 0, $e);
  74. }
  75. list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($upToken);
  76. $this->bucket = $bucket;
  77. if ($err != null) {
  78. return array(null, $err);
  79. }
  80. $upHost = $config->getUpHost($accessKey, $bucket);
  81. if ($err != null) {
  82. throw new \Exception($err->message(), 1);
  83. }
  84. $this->host = $upHost;
  85. }
  86. /**
  87. * 上传操作
  88. */
  89. public function upload($fname)
  90. {
  91. $uploaded = 0;
  92. if ($this->version == SplitUploadVersion::V2) {
  93. $partNumber = 1;
  94. $encodedObjectName = $this->key? \Qiniu\base64_urlSafeEncode($this->key) : '~';
  95. };
  96. // get upload record from resumeRecordFile
  97. if ($this->resumeRecordFile != null) {
  98. $blkputRets = null;
  99. if (file_exists($this->resumeRecordFile)) {
  100. $stream = fopen($this->resumeRecordFile, 'r');
  101. if ($stream) {
  102. $streamLen = filesize($this->resumeRecordFile);
  103. if ($streamLen > 0) {
  104. $contents = fread($stream, $streamLen);
  105. fclose($stream);
  106. if ($contents) {
  107. $blkputRets = json_decode($contents, true);
  108. if ($blkputRets === null) {
  109. error_log("resumeFile contents decode error");
  110. }
  111. } else {
  112. error_log("read resumeFile failed");
  113. }
  114. } else {
  115. error_log("resumeFile is empty");
  116. }
  117. } else {
  118. error_log("resumeFile open failed");
  119. }
  120. } else {
  121. error_log("resumeFile not exists");
  122. }
  123. if ($blkputRets) {
  124. if ($this->version == SplitUploadVersion::V1) {
  125. if (isset($blkputRets['contexts']) && isset($blkputRets['uploaded']) &&
  126. is_array($blkputRets['contexts']) && is_int($blkputRets['uploaded'])) {
  127. $this->contexts = $blkputRets['contexts'];
  128. $uploaded = $blkputRets['uploaded'];
  129. }
  130. } elseif ($this->version == SplitUploadVersion::V2) {
  131. if (isset($blkputRets["etags"]) && isset($blkputRets["uploadId"]) &&
  132. isset($blkputRets["expiredAt"]) && $blkputRets["expiredAt"] > time()
  133. && $blkputRets["uploaded"] > 0 && is_array($blkputRets["etags"]) &&
  134. is_string($blkputRets["uploadId"]) && is_int($blkputRets["expiredAt"])) {
  135. $this->finishedEtags['etags'] = $blkputRets["etags"];
  136. $this->finishedEtags["uploadId"] = $blkputRets["uploadId"];
  137. $this->finishedEtags["expiredAt"] = $blkputRets["expiredAt"];
  138. $this->finishedEtags["uploaded"] = $blkputRets["uploaded"];
  139. $uploaded = $blkputRets["uploaded"];
  140. $partNumber = count($this->finishedEtags["etags"]) + 1;
  141. } else {
  142. $this->makeInitReq($encodedObjectName);
  143. }
  144. } else {
  145. throw new \Exception("only support v1/v2 now!");
  146. }
  147. } else {
  148. if ($this->version == SplitUploadVersion::V2) {
  149. $this->makeInitReq($encodedObjectName);
  150. }
  151. }
  152. } else {
  153. // init a Multipart Upload task if choose v2
  154. if ($this->version == SplitUploadVersion::V2) {
  155. $this->makeInitReq($encodedObjectName);
  156. }
  157. }
  158. while ($uploaded < $this->size) {
  159. $blockSize = $this->blockSize($uploaded);
  160. $data = fread($this->inputStream, $blockSize);
  161. if ($data === false) {
  162. throw new \Exception("file read failed", 1);
  163. }
  164. if ($this->version == SplitUploadVersion::V1) {
  165. $crc = \Qiniu\crc32_data($data);
  166. $response = $this->makeBlock($data, $blockSize);
  167. } elseif ($this->version == SplitUploadVersion::V2) {
  168. $md5 = md5($data);
  169. $response = $this->uploadPart(
  170. $data,
  171. $partNumber,
  172. $this->finishedEtags["uploadId"],
  173. $encodedObjectName,
  174. $md5
  175. );
  176. } else {
  177. throw new \Exception("only support v1/v2 now!");
  178. }
  179. $ret = null;
  180. if ($response->ok() && $response->json() != null) {
  181. $ret = $response->json();
  182. }
  183. if ($response->statusCode < 0) {
  184. list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($this->upToken);
  185. if ($err != null) {
  186. return array(null, $err);
  187. }
  188. $upHostBackup = $this->config->getUpBackupHost($accessKey, $bucket);
  189. $this->host = $upHostBackup;
  190. }
  191. if ($this->version == SplitUploadVersion::V1) {
  192. if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
  193. $response = $this->makeBlock($data, $blockSize);
  194. $ret = $response->json();
  195. }
  196. if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
  197. return array(null, new Error($this->currentUrl, $response));
  198. }
  199. array_push($this->contexts, $ret['ctx']);
  200. } elseif ($this->version == SplitUploadVersion::V2) {
  201. if ($response->needRetry() || !isset($ret['md5']) || $md5 != $ret['md5']) {
  202. $response = $this->uploadPart(
  203. $data,
  204. $partNumber,
  205. $this->finishedEtags["uploadId"],
  206. $encodedObjectName,
  207. $md5
  208. );
  209. $ret = $response->json();
  210. }
  211. if (!$response->ok() || !isset($ret['md5']) || $md5 != $ret['md5']) {
  212. return array(null, new Error($this->currentUrl, $response));
  213. }
  214. $blockStatus = array('etag' => $ret['etag'], 'partNumber' => $partNumber);
  215. array_push($this->finishedEtags['etags'], $blockStatus);
  216. $partNumber += 1;
  217. } else {
  218. throw new \Exception("only support v1/v2 now!");
  219. }
  220. $uploaded += $blockSize;
  221. if ($this->version == SplitUploadVersion::V2) {
  222. $this->finishedEtags['uploaded'] = $uploaded;
  223. }
  224. if ($this->resumeRecordFile !== null) {
  225. if ($this->version == SplitUploadVersion::V1) {
  226. $recordData = array(
  227. 'contexts' => $this->contexts,
  228. 'uploaded' => $uploaded
  229. );
  230. $recordData = json_encode($recordData);
  231. } elseif ($this->version == SplitUploadVersion::V2) {
  232. $recordData = json_encode($this->finishedEtags);
  233. } else {
  234. throw new \Exception("only support v1/v2 now!");
  235. }
  236. if ($recordData) {
  237. $isWritten = file_put_contents($this->resumeRecordFile, $recordData);
  238. if ($isWritten === false) {
  239. error_log("write resumeRecordFile failed");
  240. }
  241. } else {
  242. error_log('resumeRecordData encode failed');
  243. }
  244. }
  245. }
  246. if ($this->version == SplitUploadVersion::V1) {
  247. return $this->makeFile($fname);
  248. } elseif ($this->version == SplitUploadVersion::V2) {
  249. return $this->completeParts($fname, $this->finishedEtags['uploadId'], $encodedObjectName);
  250. } else {
  251. throw new \Exception("only support v1/v2 now!");
  252. }
  253. }
  254. /**
  255. * 创建块
  256. */
  257. private function makeBlock($block, $blockSize)
  258. {
  259. $url = $this->host . '/mkblk/' . $blockSize;
  260. return $this->post($url, $block);
  261. }
  262. private function fileUrl($fname)
  263. {
  264. $url = $this->host . '/mkfile/' . $this->size;
  265. $url .= '/mimeType/' . \Qiniu\base64_urlSafeEncode($this->mime);
  266. if ($this->key != null) {
  267. $url .= '/key/' . \Qiniu\base64_urlSafeEncode($this->key);
  268. }
  269. $url .= '/fname/' . \Qiniu\base64_urlSafeEncode($fname);
  270. if (!empty($this->params)) {
  271. foreach ($this->params as $key => $value) {
  272. $val = \Qiniu\base64_urlSafeEncode($value);
  273. $url .= "/$key/$val";
  274. }
  275. }
  276. return $url;
  277. }
  278. /**
  279. * 创建文件
  280. */
  281. private function makeFile($fname)
  282. {
  283. $url = $this->fileUrl($fname);
  284. $body = implode(',', $this->contexts);
  285. $response = $this->post($url, $body);
  286. if ($response->needRetry()) {
  287. $response = $this->post($url, $body);
  288. }
  289. if (!$response->ok()) {
  290. return array(null, new Error($this->currentUrl, $response));
  291. }
  292. return array($response->json(), null);
  293. }
  294. private function post($url, $data)
  295. {
  296. $this->currentUrl = $url;
  297. $headers = array('Authorization' => 'UpToken ' . $this->upToken);
  298. return Client::post($url, $data, $headers);
  299. }
  300. private function blockSize($uploaded)
  301. {
  302. if ($this->size < $uploaded + $this->partSize) {
  303. return $this->size - $uploaded;
  304. }
  305. return $this->partSize;
  306. }
  307. private function makeInitReq($encodedObjectName)
  308. {
  309. $res = $this->initReq($encodedObjectName);
  310. $this->finishedEtags["uploadId"] = $res['uploadId'];
  311. $this->finishedEtags["expiredAt"] = $res['expireAt'];
  312. }
  313. /**
  314. * 初始化上传任务
  315. */
  316. private function initReq($encodedObjectName)
  317. {
  318. $url = $this->host.'/buckets/'.$this->bucket.'/objects/'.$encodedObjectName.'/uploads';
  319. $headers = array(
  320. 'Authorization' => 'UpToken ' . $this->upToken,
  321. 'Content-Type' => 'application/json'
  322. );
  323. $response = $this->postWithHeaders($url, null, $headers);
  324. return $response->json();
  325. }
  326. /**
  327. * 分块上传v2
  328. */
  329. private function uploadPart($block, $partNumber, $uploadId, $encodedObjectName, $md5)
  330. {
  331. $headers = array(
  332. 'Authorization' => 'UpToken ' . $this->upToken,
  333. 'Content-Type' => 'application/octet-stream',
  334. 'Content-MD5' => $md5
  335. );
  336. $url = $this->host.'/buckets/'.$this->bucket.'/objects/'.$encodedObjectName.
  337. '/uploads/'.$uploadId.'/'.$partNumber;
  338. $response = $this->put($url, $block, $headers);
  339. return $response;
  340. }
  341. private function completeParts($fname, $uploadId, $encodedObjectName)
  342. {
  343. $headers = array(
  344. 'Authorization' => 'UpToken '.$this->upToken,
  345. 'Content-Type' => 'application/json'
  346. );
  347. $etags = $this->finishedEtags['etags'];
  348. $sortedEtags = \Qiniu\arraySort($etags, 'partNumber');
  349. $metadata = array();
  350. $customVars = array();
  351. if ($this->params) {
  352. foreach ($this->params as $k => $v) {
  353. if (strpos($k, 'x:') === 0) {
  354. $customVars[$k] = $v;
  355. } elseif (strpos($k, 'x-qn-meta-') === 0) {
  356. $metadata[$k] = $v;
  357. }
  358. }
  359. }
  360. if (empty($metadata)) {
  361. $metadata = null;
  362. }
  363. if (empty($customVars)) {
  364. $customVars = null;
  365. }
  366. $body = array(
  367. 'fname' => $fname,
  368. 'mimeType' => $this->mime,
  369. 'metadata' => $metadata,
  370. 'customVars' => $customVars,
  371. 'parts' => $sortedEtags
  372. );
  373. $jsonBody = json_encode($body);
  374. $url = $this->host.'/buckets/'.$this->bucket.'/objects/'.$encodedObjectName.'/uploads/'.$uploadId;
  375. $response = $this->postWithHeaders($url, $jsonBody, $headers);
  376. if ($response->needRetry()) {
  377. $response = $this->postWithHeaders($url, $jsonBody, $headers);
  378. }
  379. if (!$response->ok()) {
  380. return array(null, new Error($this->currentUrl, $response));
  381. }
  382. return array($response->json(), null);
  383. }
  384. private function put($url, $data, $headers)
  385. {
  386. $this->currentUrl = $url;
  387. return Client::put($url, $data, $headers);
  388. }
  389. private function postWithHeaders($url, $data, $headers)
  390. {
  391. $this->currentUrl = $url;
  392. return Client::post($url, $data, $headers);
  393. }
  394. }