Skip to content

Commit 7eba748

Browse files
authored
Merge pull request #112 from EMCECS/feature-SDK-649
[SDK-649] Initial commit for LargeFileUploader enhancement to support…
2 parents 776db1b + 48c4a51 commit 7eba748

File tree

4 files changed

+225
-9
lines changed

4 files changed

+225
-9
lines changed

src/main/java/com/emc/object/s3/LargeFileUploader.java

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ public static String getMpuETag(List<MultipartPartETag> partETags) {
8181
private final String bucket;
8282
private final String key;
8383

84+
private final String srcBucket;
85+
private final String srcKey;
86+
private String sourceVersionId;
87+
8488
private final InputStream stream;
8589
private final LargeFileMultipartSource multipartSource;
8690

@@ -125,6 +129,8 @@ public LargeFileUploader(S3Client s3Client, String bucket, String key, InputStre
125129
this.stream = stream;
126130
this.fullSize = size;
127131
this.multipartSource = null;
132+
this.srcBucket = null;
133+
this.srcKey = null;
128134
}
129135

130136
/**
@@ -139,6 +145,23 @@ public LargeFileUploader(S3Client s3Client, String bucket, String key, LargeFile
139145
this.key = key;
140146
this.multipartSource = multipartSource;
141147
this.stream = null;
148+
this.srcBucket = null;
149+
this.srcKey = null;
150+
}
151+
152+
/**
153+
* Creates a new LargeFileUpload instance using the specified <code>s3Client</code> to copy
154+
* from <code>srcBucket/srcKey</code> to <code>dstBucket/dstKey</code> without streaming
155+
* data between the client and ECS server.
156+
*/
157+
public LargeFileUploader(S3Client s3Client, String srcBucket, String srcKey, String dstBucket, String dstKey) {
158+
this.s3Client = s3Client;
159+
this.bucket = dstBucket;
160+
this.key = dstKey;
161+
this.srcBucket = srcBucket;
162+
this.srcKey = srcKey;
163+
this.multipartSource = null;
164+
this.stream = null;
142165
}
143166

144167
@Override
@@ -282,6 +305,8 @@ public void upload() {
282305

283306
if (fullSize >= mpuThreshold)
284307
doMultipartUpload();
308+
else if (srcKey != null && srcBucket != null)
309+
doSingleCopy();
285310
else
286311
doSinglePut();
287312
}
@@ -291,6 +316,8 @@ private InputStream getSourceCompleteDataStream() throws IOException {
291316
if (multipartSource != null) {
292317
is = multipartSource.getCompleteDataStream();
293318
} else {
319+
if (stream == null)
320+
throw new IOException("null source stream");
294321
// only close the source stream if configured to do so
295322
is = new FilterInputStream(stream) {
296323
@Override
@@ -336,6 +363,17 @@ public void doSinglePut() {
336363
}
337364
}
338365

366+
public void doSingleCopy() {
367+
configure();
368+
if (srcKey == null || srcBucket == null)
369+
throw new IllegalArgumentException("must specify source bucket and key for the copy");
370+
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(srcBucket, srcKey, bucket, key)
371+
.withSourceVersionId(sourceVersionId).withObjectMetadata(objectMetadata)
372+
.withAcl(acl).withCannedAcl(cannedAcl);
373+
CopyObjectResult result = s3Client.copyObject(copyObjectRequest);
374+
eTag = result.getRawETag();
375+
}
376+
339377
/*
340378
* get a map of existing MPU parts from which we can resume an MPU. we can only resume an MPU if the existing
341379
* part sizes and count are exactly the same as configured in this LFU instance
@@ -426,7 +464,11 @@ public void doMultipartUpload() {
426464

427465
// no existing part to use, so upload this part
428466
} else {
429-
futures.add(executorService.submit(new UploadPartTask(resumeContext.getUploadId(), partNumber, offset, length)));
467+
if (srcKey != null && srcBucket != null) {
468+
futures.add(executorService.submit(new CopyPartTask(resumeContext.getUploadId(), partNumber, offset, length)));
469+
}else {
470+
futures.add(executorService.submit(new UploadPartTask(resumeContext.getUploadId(), partNumber, offset, length)));
471+
}
430472
}
431473
}
432474

@@ -565,6 +607,10 @@ protected void configure() {
565607
// must read stream sequentially
566608
executorService = null;
567609
threads = 1;
610+
} else if (srcKey != null && srcBucket != null) {
611+
// If resuming from copied parts, no need to verify the parts found in target
612+
if (resumeContext != null) resumeContext.setVerifyPartsFoundInTarget(false);
613+
fullSize = s3Client.getObjectMetadata(new GetObjectMetadataRequest(srcBucket, srcKey).withVersionId(sourceVersionId)).getContentLength();
568614
} else {
569615
throw new IllegalArgumentException("must specify a file, stream, or multipartSource to read");
570616
}
@@ -773,6 +819,14 @@ public void setAbortMpuOnFailure(boolean abortMpuOnFailure) {
773819
this.abortMpuOnFailure = abortMpuOnFailure;
774820
}
775821

822+
public String getSourceVersionId() {
823+
return sourceVersionId;
824+
}
825+
826+
public void setSourceVersionId(String sourceVersionId) {
827+
this.sourceVersionId = sourceVersionId;
828+
}
829+
776830
public LargeFileUploader withObjectMetadata(S3ObjectMetadata objectMetadata) {
777831
setObjectMetadata(objectMetadata);
778832
return this;
@@ -834,6 +888,44 @@ public LargeFileUploader withAbortMpuOnFailure(boolean abortMpuOnFailure) {
834888
return this;
835889
}
836890

891+
public LargeFileUploader withSourceVersionId(String sourceVersionId) {
892+
setSourceVersionId(sourceVersionId);
893+
return this;
894+
}
895+
896+
private class CopyPartTask implements Callable<MultipartPartETag> {
897+
private final String uploadId;
898+
private final int partNumber;
899+
private final long offset;
900+
private final long length;
901+
public CopyPartTask(String uploadId, int partNumber, long offset, long length) {
902+
this.uploadId = uploadId;
903+
this.partNumber = partNumber;
904+
this.offset = offset;
905+
this.length = length;
906+
}
907+
908+
@Override
909+
public MultipartPartETag call() {
910+
if (!active.get()) {
911+
// we were paused or aborted, so should not start any more tasks
912+
throw new CancellationException();
913+
} else {
914+
log.debug("copying {}/{}, uploadId: {}, partNumber {} (offset: {}, length: {}), versionId: {}",
915+
bucket, key, uploadId, partNumber, offset, length, sourceVersionId);
916+
CopyPartRequest copyPartRequest = new CopyPartRequest(srcBucket, srcKey, bucket, key, resumeContext.getUploadId(), partNumber)
917+
.withSourceRange(new Range(offset, offset + length - 1))
918+
.withSourceVersionId(sourceVersionId);
919+
try {
920+
CopyPartResult result = s3Client.copyPart(copyPartRequest);
921+
return new MultipartPartETag(result.getPartNumber(), result.getRawETag());
922+
} catch (S3Exception e) {
923+
throw new RuntimeException(e);
924+
}
925+
}
926+
}
927+
}
928+
837929
private class UploadPartTask implements Callable<MultipartPartETag> {
838930
private final String uploadId;
839931
private final int partNumber;

src/main/java/com/emc/object/s3/request/CopyPartRequest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,10 @@ public CopyPartRequest withSourceRange(Range sourceRange) {
7979
setSourceRange(sourceRange);
8080
return this;
8181
}
82+
83+
@Override
84+
public CopyPartRequest withSourceVersionId(String sourceVersionId) {
85+
super.setSourceVersionId(sourceVersionId);
86+
return this;
87+
}
8288
}

src/test/java/com/emc/object/s3/LargeFileUploaderTest.java

Lines changed: 123 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,13 @@
2626
*/
2727
package com.emc.object.s3;
2828

29-
import com.emc.object.s3.bean.GetObjectResult;
30-
import com.emc.object.s3.bean.MultipartPart;
31-
import com.emc.object.s3.bean.MultipartPartETag;
32-
import com.emc.object.s3.bean.Upload;
29+
import com.emc.object.s3.bean.*;
3330
import com.emc.object.s3.jersey.S3JerseyClient;
3431
import com.emc.object.s3.lfu.LargeFileMultipartSource;
3532
import com.emc.object.s3.lfu.LargeFileUpload;
3633
import com.emc.object.s3.lfu.LargeFileUploaderResumeContext;
3734
import com.emc.object.s3.lfu.PartMismatchException;
38-
import com.emc.object.s3.request.AbortMultipartUploadRequest;
39-
import com.emc.object.s3.request.GetObjectRequest;
40-
import com.emc.object.s3.request.ListMultipartUploadsRequest;
41-
import com.emc.object.s3.request.UploadPartRequest;
35+
import com.emc.object.s3.request.*;
4236
import com.emc.object.util.ProgressListener;
4337
import com.emc.rest.util.StreamUtil;
4438
import com.emc.util.RandomInputStream;
@@ -52,6 +46,7 @@
5246
import java.security.DigestInputStream;
5347
import java.security.MessageDigest;
5448
import java.util.*;
49+
import java.util.concurrent.ThreadPoolExecutor;
5550
import java.util.concurrent.TimeUnit;
5651
import java.util.concurrent.TimeoutException;
5752
import java.util.concurrent.atomic.AtomicLong;
@@ -168,6 +163,126 @@ public void testLargeFileUploaderProgressListener() throws Exception {
168163
client.deleteObject(getTestBucket(), key);
169164
}
170165

166+
@Test
167+
public void testLFUCopy() throws Exception {
168+
// enable versioning on the bucket
169+
client.setBucketVersioning(getTestBucket(), new VersioningConfiguration().withStatus(VersioningConfiguration.Status.Enabled));
170+
171+
String srcKey = "lfu-source.bin";
172+
String dstKey = "lfu-single-copy.bin";
173+
byte[] data = new byte[(int) MockMultipartSource.totalSize];
174+
new Random().nextBytes(data);
175+
String srcMD5Sum = DigestUtils.md5Hex(data);
176+
client.putObject(getTestBucket(), srcKey, data, null);
177+
String versionId0 = client.listVersions(getTestBucket(), null).getVersions().get(0).getVersionId();
178+
S3ObjectMetadata sourceMetadata0 = client.getObjectMetadata(new GetObjectMetadataRequest(getTestBucket(), srcKey).withVersionId(versionId0));
179+
Assert.assertEquals(MockMultipartSource.totalSize, sourceMetadata0.getContentLength().longValue());
180+
Assert.assertEquals(srcMD5Sum, sourceMetadata0.getETag());
181+
182+
//Upload new Version Object
183+
new Random().nextBytes(data);
184+
String srcMD5SumNew = DigestUtils.md5Hex(data);
185+
client.putObject(getTestBucket(), srcKey, data, null);
186+
String versionId1 = client.listVersions(getTestBucket(), null).getVersions().get(0).getVersionId();
187+
S3ObjectMetadata sourceMetadata1 = client.getObjectMetadata(new GetObjectMetadataRequest(getTestBucket(), srcKey).withVersionId(versionId1));
188+
Assert.assertEquals(MockMultipartSource.totalSize, sourceMetadata1.getContentLength().longValue());
189+
Assert.assertEquals(srcMD5SumNew, sourceMetadata1.getETag());
190+
191+
//Single Copy from old version object
192+
S3ObjectMetadata objectMetadata = new S3ObjectMetadata().addUserMetadata("key", "value");
193+
LargeFileUploader uploader = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
194+
.withObjectMetadata(objectMetadata)
195+
.withSourceVersionId(versionId0);
196+
uploader.upload();
197+
Assert.assertEquals(0, uploader.getBytesTransferred());
198+
Assert.assertEquals(srcMD5Sum, uploader.getETag());
199+
Assert.assertEquals(MockMultipartSource.totalSize, client.getObjectMetadata(getTestBucket(), dstKey).getContentLength().longValue());
200+
Assert.assertEquals(objectMetadata.getUserMetadata(), client.getObjectMetadata(getTestBucket(), dstKey).getUserMetadata());
201+
202+
//Single Copy from new version object (without providing version Id)
203+
uploader = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
204+
.withObjectMetadata(objectMetadata);
205+
uploader.upload();
206+
Assert.assertEquals(0, uploader.getBytesTransferred());
207+
Assert.assertEquals(srcMD5SumNew, uploader.getETag());
208+
Assert.assertEquals(MockMultipartSource.totalSize, client.getObjectMetadata(getTestBucket(), dstKey).getContentLength().longValue());
209+
Assert.assertEquals(objectMetadata.getUserMetadata(), client.getObjectMetadata(getTestBucket(), dstKey).getUserMetadata());
210+
211+
//Multi Part Copy from old version object
212+
uploader = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
213+
.withObjectMetadata(objectMetadata)
214+
.withPartSize(MockMultipartSource.partSize)
215+
.withMpuThreshold(MockMultipartSource.partSize)
216+
.withSourceVersionId(versionId0);
217+
uploader.upload();
218+
Assert.assertEquals(0, uploader.getBytesTransferred());
219+
Assert.assertTrue(uploader.getETag().contains("-")); // hyphen signifies multipart / updated object
220+
Assert.assertEquals(MockMultipartSource.totalSize, client.getObjectMetadata(getTestBucket(), dstKey).getContentLength().longValue());
221+
Assert.assertEquals(objectMetadata.getUserMetadata(), client.getObjectMetadata(getTestBucket(), dstKey).getUserMetadata());
222+
Assert.assertEquals(srcMD5Sum, DigestUtils.md5Hex(client.getObject(getTestBucket(), dstKey).getObject()));
223+
224+
//Multi Part Copy from new version object
225+
uploader = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
226+
.withObjectMetadata(objectMetadata)
227+
.withPartSize(MockMultipartSource.partSize)
228+
.withMpuThreshold(MockMultipartSource.partSize)
229+
.withSourceVersionId(versionId1);
230+
uploader.upload();
231+
Assert.assertEquals(0, uploader.getBytesTransferred());
232+
Assert.assertTrue(uploader.getETag().contains("-")); // hyphen signifies multipart / updated object
233+
Assert.assertEquals(MockMultipartSource.totalSize, client.getObjectMetadata(getTestBucket(), dstKey).getContentLength().longValue());
234+
Assert.assertEquals(objectMetadata.getUserMetadata(), client.getObjectMetadata(getTestBucket(), dstKey).getUserMetadata());
235+
Assert.assertEquals(srcMD5SumNew, DigestUtils.md5Hex(client.getObject(getTestBucket(), dstKey).getObject()));
236+
}
237+
238+
@Test
239+
public void testLFUCopyPauseResume() throws Exception {
240+
String srcKey = "mpu-copy-pause.source";
241+
String dstKey = "mpu-copy-pause.target";
242+
243+
MockMultipartSource mockMultipartSource = new MockMultipartSource();
244+
LargeFileUploader lfu = new TestLargeFileUploader(client, getTestBucket(), srcKey, mockMultipartSource)
245+
.withPartSize(mockMultipartSource.getPartSize()).withMpuThreshold((int) mockMultipartSource.getTotalSize());
246+
lfu.doMultipartUpload();
247+
248+
lfu = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
249+
.withPartSize(mockMultipartSource.getPartSize()).withMpuThreshold((int) mockMultipartSource.getTotalSize())
250+
.withThreads(1);
251+
LargeFileUpload upload = lfu.uploadAsync();
252+
253+
// wait for first a few parts to start
254+
while (lfu.getExecutorService() == null || ((ThreadPoolExecutor) lfu.getExecutorService()).getCompletedTaskCount() == 0) {
255+
Thread.sleep(100);
256+
}
257+
LargeFileUploaderResumeContext resumeContext = upload.pause();
258+
259+
// object should not exist
260+
try {
261+
Assert.assertNull(client.getObjectMetadata(getTestBucket(), dstKey));
262+
} catch (S3Exception e) {
263+
Assert.assertEquals(404, e.getHttpCode());
264+
Assert.assertEquals("NoSuchKey", e.getErrorCode());
265+
}
266+
267+
// check resume context accuracy
268+
Assert.assertNotNull(resumeContext.getUploadId());
269+
Assert.assertNotNull(resumeContext.getUploadedParts());
270+
Assert.assertFalse(resumeContext.getUploadedParts().isEmpty());
271+
List<MultipartPart> parts = client.listParts(getTestBucket(), dstKey, resumeContext.getUploadId()).getParts();
272+
Assert.assertNotNull(parts);
273+
274+
lfu = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
275+
.withPartSize(mockMultipartSource.getPartSize()).withMpuThreshold((int) mockMultipartSource.getTotalSize())
276+
.withResumeContext(resumeContext);
277+
lfu.doMultipartUpload();
278+
279+
// Unfortunately, MPU ETag is not preserved even if copy parts matches the original multipart upload.
280+
// So the content verification is done by calculating md5sum instead of checking ETag.
281+
//Assert.assertEquals(mockMultipartSource.getMpuETag(), client.getObjectMetadata(getTestBucket(), dstKey).getETag());
282+
Assert.assertEquals(DigestUtils.md5Hex(client.getObject(getTestBucket(), srcKey).getObject()), DigestUtils.md5Hex(client.getObject(getTestBucket(), dstKey).getObject()));
283+
Assert.assertEquals(mockMultipartSource.getTotalSize(), client.getObjectMetadata(getTestBucket(), dstKey).getContentLength().longValue());
284+
}
285+
171286
@Test
172287
public void testLargeFileUploaderStream() {
173288
String key = "large-file-uploader-stream.bin";

src/test/java/com/emc/object/s3/TestLargeFileUploader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public TestLargeFileUploader(S3Client s3Client, String bucket, String key, Large
4545
super(s3Client, bucket, key, multipartSource);
4646
}
4747

48+
public TestLargeFileUploader(S3Client s3Client, String srcBucket, String srcKey, String dstBucket, String dstKey){
49+
super(s3Client, srcBucket, srcKey, dstBucket, dstKey);
50+
}
4851
@Override
4952
protected long getMinPartSize() {
5053
return 100 * 1024; // 100 KiB for testing

0 commit comments

Comments
 (0)