Skip to content

Commit a30f21e

Browse files
committed
[SDK-649] Refine code and add test case.
1 parent c4ea4a3 commit a30f21e

File tree

3 files changed

+119
-4
lines changed

3 files changed

+119
-4
lines changed

src/main/java/com/emc/object/s3/LargeFileUploader.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public static String getMpuETag(List<MultipartPartETag> partETags) {
8383

8484
private final String srcBucket;
8585
private final String srcKey;
86+
private String sourceVersionId;
8687

8788
private final InputStream stream;
8889
private final LargeFileMultipartSource multipartSource;
@@ -153,13 +154,12 @@ public LargeFileUploader(S3Client s3Client, String bucket, String key, LargeFile
153154
* from <code>srcBucket/srcKey</code> to <code>dstBucket/dstKey</code> without streaming
154155
* data between the client and ECS server.
155156
*/
156-
public LargeFileUploader(S3Client s3Client, String srcBucket, String srcKey, String dstBucket, String dstKey, long size) {
157+
public LargeFileUploader(S3Client s3Client, String srcBucket, String srcKey, String dstBucket, String dstKey) {
157158
this.s3Client = s3Client;
158159
this.bucket = dstBucket;
159160
this.key = dstKey;
160161
this.srcBucket = srcBucket;
161162
this.srcKey = srcKey;
162-
this.fullSize = size;
163163
this.multipartSource = null;
164164
this.stream = null;
165165
}
@@ -305,6 +305,8 @@ public void upload() {
305305

306306
if (fullSize >= mpuThreshold)
307307
doMultipartUpload();
308+
else if (srcKey != null && srcBucket != null)
309+
doSingleCopy();
308310
else
309311
doSinglePut();
310312
}
@@ -314,6 +316,8 @@ private InputStream getSourceCompleteDataStream() throws IOException {
314316
if (multipartSource != null) {
315317
is = multipartSource.getCompleteDataStream();
316318
} else {
319+
if (stream == null)
320+
throw new IOException("null source stream");
317321
// only close the source stream if configured to do so
318322
is = new FilterInputStream(stream) {
319323
@Override
@@ -359,6 +363,17 @@ public void doSinglePut() {
359363
}
360364
}
361365

366+
public void doSingleCopy() {
367+
configure();
368+
if (srcKey == null || srcBucket == null)
369+
throw new IllegalArgumentException("must specify source bucket and key for the copy");
370+
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(srcBucket, srcKey, bucket, key)
371+
.withSourceVersionId(sourceVersionId).withObjectMetadata(objectMetadata)
372+
.withAcl(acl).withCannedAcl(cannedAcl);
373+
CopyObjectResult result = s3Client.copyObject(copyObjectRequest);
374+
eTag = result.getRawETag();
375+
}
376+
362377
/*
363378
* get a map of existing MPU parts from which we can resume an MPU. we can only resume an MPU if the existing
364379
* part sizes and count are exactly the same as configured in this LFU instance
@@ -593,8 +608,9 @@ protected void configure() {
593608
executorService = null;
594609
threads = 1;
595610
} else if (srcKey != null && srcBucket != null) {
611+
// If resuming from copied parts, no need to verify the parts found in target
596612
if (resumeContext != null) resumeContext.setVerifyPartsFoundInTarget(false);
597-
executorService = null;
613+
fullSize = s3Client.getObjectMetadata(new GetObjectMetadataRequest(srcBucket, srcKey).withVersionId(sourceVersionId)).getContentLength();
598614
} else {
599615
throw new IllegalArgumentException("must specify a file, stream, or multipartSource to read");
600616
}
@@ -803,6 +819,14 @@ public void setAbortMpuOnFailure(boolean abortMpuOnFailure) {
803819
this.abortMpuOnFailure = abortMpuOnFailure;
804820
}
805821

822+
public String getSourceVersionId() {
823+
return sourceVersionId;
824+
}
825+
826+
public void setSourceVersionId(String sourceVersionId) {
827+
this.sourceVersionId = sourceVersionId;
828+
}
829+
806830
public LargeFileUploader withObjectMetadata(S3ObjectMetadata objectMetadata) {
807831
setObjectMetadata(objectMetadata);
808832
return this;
@@ -864,6 +888,11 @@ public LargeFileUploader withAbortMpuOnFailure(boolean abortMpuOnFailure) {
864888
return this;
865889
}
866890

891+
public LargeFileUploader withSourceVersionId(String sourceVersionId) {
892+
setSourceVersionId(sourceVersionId);
893+
return this;
894+
}
895+
867896
private class CopyPartTask implements Callable<MultipartPartETag> {
868897
private final String uploadId;
869898
private final int partNumber;
@@ -888,7 +917,7 @@ public MultipartPartETag call() {
888917
.withSourceRange(new Range(offset, offset + length - 1));
889918
try {
890919
CopyPartResult result = s3Client.copyPart(copyPartRequest);
891-
return new MultipartPartETag(result.getPartNumber(), result.getETag());
920+
return new MultipartPartETag(result.getPartNumber(), result.getRawETag());
892921
} catch (S3Exception e) {
893922
throw new RuntimeException(e);
894923
}

src/test/java/com/emc/object/s3/LargeFileUploaderTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.security.DigestInputStream;
5353
import java.security.MessageDigest;
5454
import java.util.*;
55+
import java.util.concurrent.ThreadPoolExecutor;
5556
import java.util.concurrent.TimeUnit;
5657
import java.util.concurrent.TimeoutException;
5758
import java.util.concurrent.atomic.AtomicLong;
@@ -168,6 +169,88 @@ public void testLargeFileUploaderProgressListener() throws Exception {
168169
client.deleteObject(getTestBucket(), key);
169170
}
170171

172+
@Test
173+
public void testLFUCopy() {
174+
String srcKey = "lfu-source.bin";
175+
String dstKey = "lfu-single-copy.bin";
176+
String dstKey2 = "lfu-multipart-copy.bin";
177+
178+
byte[] data = new byte[(int) MockMultipartSource.totalSize];
179+
new Random().nextBytes(data);
180+
client.putObject(getTestBucket(), srcKey, data, null);
181+
S3ObjectMetadata sourceMetadata = client.getObjectMetadata(getTestBucket(), srcKey);
182+
Assert.assertEquals(MockMultipartSource.totalSize, sourceMetadata.getContentLength().longValue());
183+
184+
//Single Copy
185+
S3ObjectMetadata objectMetadata = new S3ObjectMetadata().addUserMetadata("key", "value");
186+
LargeFileUploader uploader = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
187+
.withObjectMetadata(objectMetadata);
188+
uploader.upload();
189+
Assert.assertEquals(0, uploader.getBytesTransferred());
190+
Assert.assertEquals(sourceMetadata.getETag(), uploader.getETag());
191+
Assert.assertEquals(MockMultipartSource.totalSize, client.getObjectMetadata(getTestBucket(), dstKey).getContentLength().longValue());
192+
Assert.assertEquals(objectMetadata.getUserMetadata(), client.getObjectMetadata(getTestBucket(), dstKey).getUserMetadata());
193+
194+
//Multi Part Copy
195+
LargeFileUploader uploader2 = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey2)
196+
.withObjectMetadata(objectMetadata)
197+
.withPartSize(MockMultipartSource.partSize)
198+
.withMpuThreshold(MockMultipartSource.partSize);
199+
uploader2.upload();
200+
Assert.assertEquals(0, uploader2.getBytesTransferred());
201+
Assert.assertTrue(uploader2.getETag().contains("-")); // hyphen signifies multipart / updated object
202+
Assert.assertEquals(MockMultipartSource.totalSize, client.getObjectMetadata(getTestBucket(), dstKey2).getContentLength().longValue());
203+
Assert.assertEquals(objectMetadata.getUserMetadata(), client.getObjectMetadata(getTestBucket(), dstKey2).getUserMetadata());
204+
}
205+
206+
@Test
207+
public void testLFUCopyPauseResume() throws Exception {
208+
String srcKey = "mpu-copy-pause.source";
209+
String dstKey = "mpu-copy-pause.target";
210+
211+
MockMultipartSource mockMultipartSource = new MockMultipartSource();
212+
LargeFileUploader lfu = new TestLargeFileUploader(client, getTestBucket(), srcKey, mockMultipartSource)
213+
.withPartSize(mockMultipartSource.getPartSize()).withMpuThreshold((int) mockMultipartSource.getTotalSize());
214+
lfu.doMultipartUpload();
215+
216+
lfu = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
217+
.withPartSize(mockMultipartSource.getPartSize()).withMpuThreshold((int) mockMultipartSource.getTotalSize())
218+
.withThreads(1);
219+
LargeFileUpload upload = lfu.uploadAsync();
220+
221+
// wait for first a few parts to start
222+
while (lfu.getExecutorService() == null || ((ThreadPoolExecutor) lfu.getExecutorService()).getCompletedTaskCount() == 0) {
223+
Thread.sleep(100);
224+
}
225+
LargeFileUploaderResumeContext resumeContext = upload.pause();
226+
227+
// object should not exist
228+
try {
229+
Assert.assertNull(client.getObjectMetadata(getTestBucket(), dstKey));
230+
} catch (S3Exception e) {
231+
Assert.assertEquals(404, e.getHttpCode());
232+
Assert.assertEquals("NoSuchKey", e.getErrorCode());
233+
}
234+
235+
// check resume context accuracy
236+
Assert.assertNotNull(resumeContext.getUploadId());
237+
Assert.assertNotNull(resumeContext.getUploadedParts());
238+
Assert.assertFalse(resumeContext.getUploadedParts().isEmpty());
239+
List<MultipartPart> parts = client.listParts(getTestBucket(), dstKey, resumeContext.getUploadId()).getParts();
240+
Assert.assertNotNull(parts);
241+
242+
lfu = new TestLargeFileUploader(client, getTestBucket(), srcKey, getTestBucket(), dstKey)
243+
.withPartSize(mockMultipartSource.getPartSize()).withMpuThreshold((int) mockMultipartSource.getTotalSize())
244+
.withResumeContext(resumeContext);
245+
lfu.doMultipartUpload();
246+
247+
// Unfortunately, MPU Etag is not preserved even if copy parts matches the original multipart upload.
248+
// So the content verification is done by calculating md5sum instead of checking ETag.
249+
//Assert.assertEquals(mockMultipartSource.getMpuETag(), client.getObjectMetadata(getTestBucket(), dstKey).getETag());
250+
Assert.assertEquals(DigestUtils.md5Hex(client.getObject(getTestBucket(), srcKey).getObject()), DigestUtils.md5Hex(client.getObject(getTestBucket(), dstKey).getObject()));
251+
Assert.assertEquals(mockMultipartSource.getTotalSize(), client.getObjectMetadata(getTestBucket(), dstKey).getContentLength().longValue());
252+
}
253+
171254
@Test
172255
public void testLargeFileUploaderStream() {
173256
String key = "large-file-uploader-stream.bin";

src/test/java/com/emc/object/s3/TestLargeFileUploader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public TestLargeFileUploader(S3Client s3Client, String bucket, String key, Large
4545
super(s3Client, bucket, key, multipartSource);
4646
}
4747

48+
public TestLargeFileUploader(S3Client s3Client, String srcBucket, String srcKey, String dstBucket, String dstKey){
49+
super(s3Client, srcBucket, srcKey, dstBucket, dstKey);
50+
}
4851
@Override
4952
protected long getMinPartSize() {
5053
return 100 * 1024; // 100 KiB for testing

0 commit comments

Comments
 (0)