一.bigFileDownloader.class
import utils.Tools;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 大文件下载器
*/
public class BigFileDownloader {
protected final URL requestURL;
protected final long fileSize;
/**
* 负责已下载数据的存储
*/
protected final Storage storage;
protected final AtomicBoolean taskCanceled = new AtomicBoolean(false);
public BigFileDownloader(String strURL) throws Exception {
requestURL = new URL(strURL);
//获取待下载资源的大小(单位字节)
fileSize = retieveFileSize(requestURL);
System.out.println("file total size:"+Long.toString(fileSize));
String fileName = strURL.substring(strURL.lastIndexOf('/')+1);
//创建负责存储已下载数据的对象
storage = new Storage(fileSize,fileName);
}
public void download(int taskCount,long reportInterval) throws Exception{
long chunkSizePerThread = fileSize/taskCount;
// 下载数据段的起始字节
long lowerBound = 0;
// 下载数据段的结束字段
long upperBound = 0;
DownloadTask dt;
for(int i = taskCount -1 ; i>=0; i--){
lowerBound = i * chunkSizePerThread;
if (i == taskCount - 1){
upperBound = fileSize;
}else{
upperBound = lowerBound + chunkSizePerThread-1;
}
//创建下载任务
dt = new DownloadTask(lowerBound, upperBound ,requestURL,storage,taskCanceled);
dispatchWork(dt,i);
}
//定时报告下载进度
reportProgress(reportInterval);
//清理程序占用的资源
doCleanup();
}
protected void doCleanup(){
Tools.silentClose(storage);
}
protected void cancelDownload(){
if(taskCanceled.compareAndSet(false,true)){
doCleanup();
}
}
protected void dispatchWork(final DownloadTask dt,int workerIndex) {
//创建下载线程
Thread workerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
dt.run();
}catch (Exception e){
//取消整个文件的下载
cancelDownload();
}
}
});
workerThread.setName("downloader-" + workerIndex);
workerThread.start();
}
//根据指定的URL获取相应文件的大小
private static long retieveFileSize(URL requestURL) throws Exception{
long size = -1;
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) requestURL.openConnection();
conn.setRequestMethod("HEAD");
conn.setRequestProperty("Connection", "Keep-alive");
conn.connect();
int statusCode = conn.getResponseCode();
if (HttpURLConnection.HTTP_OK != statusCode) {
throw new Exception("Server exception,status code:" + statusCode);
}
String cl = conn.getHeaderField("Content-Length");
size = Long.valueOf(cl);
} finally {
if (null != conn) {
conn.disconnect();
}
}
return size;
}
//报告下载进度
private void reportProgress(long reportInterval) throws InterruptedException{
float lastCompletion;
int completion = 0;
while (!taskCanceled.get()) {
lastCompletion = completion;
completion = (int) (storage.getTotalWrites() * 100 / fileSize);
if (completion == 100) {
break;
} else if (completion - lastCompletion >= 1) {
System.out.println("Completion:%"+completion+"%%");
if (completion >= 90) {
reportInterval = 1000;
}
}
Thread.sleep(reportInterval);
}
System.out.println("Completion:%"+completion+"%%");
}
}
二.Storage.class
import utils.Tools;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicLong;
public class Storage implements Closeable,AutoCloseable {
private final RandomAccessFile storeFile;
private final FileChannel storeChannel;
protected final AtomicLong totalWrites = new AtomicLong(0);
public Storage(long fileSize,String fileShortName) throws IOException{
String fullFileName = System.getProperty("java.io.tmpdir" + "/"+ fileShortName);
String localFileName;
localFileName = createStoreFile(fileSize,fullFileName);
storeFile = new RandomAccessFile(localFileName,"rw");
storeChannel = storeFile.getChannel();
}
/**
* 将data中指定的数据写入文件
*
* @param offset
* 写入数据在整个文件中的起始偏移位置
* @param byteBuf
* byteBuf必须在该方法调用前执行 byteBuf.flip()
* @return 写入文件的数据长度
* @throws IOException
*/
public int store(long offset, ByteBuffer byteBuf) throws IOException{
int length;
storeChannel.write(byteBuf,offset);
length = byteBuf.limit();
totalWrites.addAndGet(length);
return length;
}
public long getTotalWrites(){
return totalWrites.get();
}
private String createStoreFile(final long fileSize,String fullFileName) throws IOException{
File file = new File(fullFileName);
System.out.println("create local file:"+fullFileName);
RandomAccessFile raf;
//以读、写方式打开,支持文件的读取或写入。若文件不存在,则创建之。
raf = new RandomAccessFile(file,"rw");
try {
raf.setLength(fileSize);
}finally {
Tools.silentClose(raf);
}
return fullFileName;
}
@Override
public synchronized void close() throws IOException {
if(storeChannel.isOpen()){
Tools.silentClose(storeChannel,storeFile);
}
}
}
1.NIO的channel
- IO流(同步、阻塞)
- NIO(同步、非阻塞)
- NIO2(异步、非阻塞)
| 区别 | stream | channel |
|---|---|---|
| 支持异步 | 不支持 | 支持 |
| 是否可双向传输数据 | 不能,只能单向 | 可以,既可以从通道读取数据,也可以向通道写入数据 |
| 是否结合Buffer使用 | 不 | 必须结合buffer使用 |
| 性能 | 较低 | 较高 |
2.channel简易文件复制
public static void copyFileUseNIO(String src,String dst) throws IOException{
//声明源文件和目标文件
FileInputStream fi=new FileInputStream(new File(src));
FileOutputStream fo=new FileOutputStream(new File(dst));
//获得传输通道channel
FileChannel inChannel=fi.getChannel();
FileChannel outChannel=fo.getChannel();
//获得容器buffer
ByteBuffer buffer=ByteBuffer.allocate(1024);
while(true){
//判断是否读完文件
int eof =inChannel.read(buffer);
if(eof==-1){
break;
}
//重设一下buffer的position=0,limit=position
buffer.flip();
//开始写
outChannel.write(buffer);
//写完要重置buffer,重设position=0,limit=capacity
buffer.clear();
}
inChannel.close();
outChannel.close();
fi.close();
fo.close();
}
三.DownloadTask.class
import utils.Tools;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.atomic.AtomicBoolean;
public class DownloadTask implements Runnable{
private final long lowerBound;
private final long upperBound;
private final DownloadBuffer xbuf;
private final URL requestURL;
private final AtomicBoolean cancelFlag;
public DownloadTask(long lowerBound,long upperBound,URL requestURL,
Storage storage, AtomicBoolean cancelFlag){
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.requestURL = requestURL;
this.xbuf = new DownloadBuffer(lowerBound,upperBound,storage);
this.cancelFlag = cancelFlag;
}
//对指定的URL发起HTTP分段下载请求
private static InputStream issueRequest(URL requestURL, long lowerBound,
long upperBound) throws IOException{
Thread me = Thread.currentThread();
System.out.println(me + "->[" + lowerBound + "," + upperBound + "]");
final HttpURLConnection conn;
InputStream in = null;
conn = (HttpURLConnection) requestURL.openConnection();
String strConnTimeout = System.getProperty("x.dt.conn.timeout");
int connTimeout = null == strConnTimeout ? 60000 : Integer
.valueOf(strConnTimeout);
conn.setConnectTimeout(connTimeout);
String strReadTimeout = System.getProperty("x.dt.read.timeout");
int readTimeout = null == strReadTimeout ? 60000 : Integer
.valueOf(strReadTimeout);
conn.setReadTimeout(readTimeout);
conn.setRequestMethod("GET");
conn.setRequestProperty("Connection", "Keep-alive");
// Range: bytes=0-1024
conn.setRequestProperty("Range", "bytes=" + lowerBound + "-" + upperBound);
conn.setDoInput(true);
conn.connect();
int statusCode = conn.getResponseCode();
if (HttpURLConnection.HTTP_PARTIAL != statusCode) {
conn.disconnect();
throw new IOException("Server exception,status code:" + statusCode);
}
System.out.println(me + "-Content-Range:" + conn.getHeaderField("Content-Range")
+ ",connection:" + conn.getHeaderField("connection"));
in = new BufferedInputStream(conn.getInputStream()) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
conn.disconnect();
}
}
};
return in;
}
@Override
public void run() {
if(cancelFlag.get()){
return;
}
ReadableByteChannel channel = null;
try {
channel = Channels.newChannel(issueRequest(requestURL,lowerBound,upperBound));
ByteBuffer buf = ByteBuffer.allocate(1024);
while(!cancelFlag.get()&&channel.read(buf)>0){
//将从网络读取的数据写入缓冲区
xbuf.write(buf);
buf.clear();
}
}catch (Exception e){
throw new RuntimeException(e);
}finally {
Tools.silentClose(channel,xbuf);
}
}
}
四.DownloadBuffer.class
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
public class DownloadBuffer implements Closeable {
/**
* 当前Buffer中缓冲的数据相对于整个存储文件的位置偏移
*/
private long globalOffset;
private long upperBound;
private int offset = 0;
public final ByteBuffer byteBuf;
private final Storage storage;
public DownloadBuffer(long globalOffset,long upperBound,final Storage storage){
this.globalOffset = globalOffset;
this.upperBound = upperBound;
this.byteBuf = ByteBuffer.allocate(1024*1024);
this.storage = storage;
}
public void write(ByteBuffer buf) throws IOException{
int length = buf.position();
final int capacity = byteBuf.capacity();
//当前缓冲区已满,或者剩余容量不够容纳新数据
if((offset + length) > capacity || length ==capacity){
// 将缓冲区中的数据写入文件
flush();
}
}
public void flush() throws IOException{
int length;
byteBuf.flip();
length = storage.store(globalOffset,byteBuf);
byteBuf.clear();
globalOffset += length;
offset = 0;
}
@Override
public void close() throws IOException {
if(globalOffset< upperBound){
flush();
}
}
}
五.silentClose
public static void silentClose(Closeable... closeable) {
if (null == closeable) {
return;
}
for (Closeable c : closeable) {
if (null == c) {
continue;
}
try {
c.close();
} catch (Exception ignored) {
}
}
}





网友评论