本文包含以下内容:
1、连接池的初始化
2、连接的创建
3、连接的获取
4、连接的销毁
5、连接的回收
6、连接池的日志监控
1、连接池的初始化
在我们的ssm项目中,我们会看到druid数据源的配置,定义数据源class类,指定初始化方法,指定销毁方法。如下图:
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
</bean>
初始化bean用的时init方法,如果不配置init-method,连接池将会在首次获取连接时初始化。
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
......
}
正是因为有了在获取连接的同时初始化连接池逻辑,才有了初始化方法最开始出现的inited变量,防止重复初始。
inited默认值false,初始化结束后,inited = true, 重启项目时,inited = false;
public void init() throws SQLException {
if (inited) {
return;
}
try {
......
}catch(Exception e) {
......
}finally{
inited = true;
......
}
}
public void restart() throws SQLException {
lock.lock();
try {
......
this.close();
this.resetStat();
this.inited = false;
this.enable = true;
this.closed = false;
} finally {
lock.unlock();
}
}
初始化时,使用独占锁,如果出现并发情况,上面的inited没有拦住,独占锁可以保证同时只有一个线程进行初始化。
public void init() throws SQLException {
if (inited) {
return;
}
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
......
}
根据fairLock分为公平锁和非公平锁,设置了maxWait或者构造函数中传入true,则为公平锁,setMaxWait时设置lock为公平锁的代码这里没有贴出来,大家可以试想一下,如果设置了maxWait,而又使用的是非公平锁,会出现什么现象?
答:先请求连接的线程很可能会出现获取连接超时现象。
public DruidDataSource(boolean fairLock){
super(fairLock);
......
}
public abstract class DruidAbstractDataSource {
public DruidAbstractDataSource(boolean lockFair){
lock = new ReentrantLock(lockFair);
notEmpty = lock.newCondition();
empty = lock.newCondition();
}
}
线程池是一个DruidConnectionHolder数组,具体可以看文章开头类图
public void init() throws SQLException {
......
try {
// 连接池中可用的连接
connections = new DruidConnectionHolder[maxActive];
// 失效、过期的连接,会暂时放在这个数组里面
evictConnections = new DruidConnectionHolder[maxActive];
// 销毁线程会检测连接,如果检测存活的连接会暂时放在这里,然后统一放入connections中
keepAliveConnections = new DruidConnectionHolder[maxActive];
......
}catch(Exception e) {
......
}finally{
inited = true;
......
}
}
初始化连接
同步方式初始化连接时,当空闲连接数量小于初始化连接数量时,创建物理连接,并用连接初始holder,放入holder数组;
异步方式初始化连接时,需要定义调度任务createScheduler
/**
* An {@link ExecutorService} that can schedule commands to run after a given
* delay, or to execute periodically.
*/
public interface ScheduledExecutorService extends ExecutorService {
}
public void init() throws SQLException {
try {
......
if (createScheduler != null && asyncInit) {
for (int i = 0; i < initialSize; ++i) {
submitCreateTask(true);
}
} else if (!asyncInit) {
// init connections
while (poolingCount < initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
}
// 开启logger日志打印的线程
createAndLogThread();
// 开启创建连接的线程
createAndStartCreatorThread();
// 开启销毁过期连接的线程
createAndStartDestroyThread();
// initedLatch默认2,等待initedLatch变成0
initedLatch.await();
......
}catch(Exception e) {
......
}finally{
inited = true;
......
}
}
说到这里,大家可能对创建连接部分有些疑问,submitCreateTask(true)、createAndStartCreatorThread()都是创建连接的,怎么回事?
注:在往下说之前,先说下druid连接池使用的设计模式。它采用的是典型的生产者消费者模式,
生产者是连接的创建线程和回收线程,消费者是获取连接的线程。还记得上面构造函数中看到的
两个Condition变量吗,empty和notEmpty,empty监控连接池为空,notEmpty监控连接池不为空,
这是连接池的核心部分。
2、连接创建
异步方式的连接创建线程,这种方式需要定义调度任务createScheduler。
private void submitCreateTask(boolean initTask) {
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask(initTask);
......
this.createSchedulerFuture = createScheduler.submit(task);
}
public class CreateConnectionTask implements Runnable {
private int errorCount = 0;
private boolean initTask = false;
private final long taskId;
public CreateConnectionTask(boolean initTask) {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
this.initTask = initTask;
}
@Override
public void run() {
runInternal();
}
private void runInternal() {
for (;;) {
PhysicalConnectionInfo physicalConnection = null;
try {
physicalConnection = createPhysicalConnection();
} catch (Exception e) {
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
createSchedulerFuture = createScheduler.schedule(this,
timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
}
if (physicalConnection == null) {
continue;
}
physicalConnection.createTaskId = taskId;
boolean result = put(physicalConnection);
if (!result) {
JdbcUtils.close(physicalConnection.getPhysicalConnection());
}
break;
}
}
}
异步方式创建连接,但没有定义createScheduler调度任务,应该如何创建连接呢?
protected void createAndStartCreatorThread() {
if (createScheduler == null) {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.start();
return;
}
initedLatch.countDown();
}
public class CreateConnectionThread extends Thread {
public void run() {
int errorCount = 0;
for (;;) {
......
if (poolingCount >= notEmptyWaitThreadCount
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await();
}
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
......
PhysicalConnectionInfo connection = null;
try {
connection = createPhysicalConnection();
} catch (SQLException e) {
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
}
if (connection == null) {
continue;
}
boolean result = put(connection);
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
errorCount = 0; // reset errorCount
}
}
}
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
......
}
return put(holder, physicalConnectionInfo.createTaskId);
}
private boolean put(DruidConnectionHolder holder, long createTaskId) {
lock.lock();
try {
if (poolingCount >= maxActive) {
return false;
}
connections[poolingCount] = holder;
incrementPoolingCount();
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {
if (poolingCount + createTaskCount < notEmptyWaitThreadCount
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}
private void emptySignal() {
if (createScheduler == null) {
empty.signal();
return;
}
if (createTaskCount >= maxCreateTaskCount) {
return;
}
if (activeCount + poolingCount + createTaskCount >= maxActive) {
return;
}
submitCreateTask(false);
}
3、连接获取
通过责任链获取连接的方式,不在本文讨论之中。
说到这里,我们不得不说一下
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis);
}
}
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) {
DruidPooledConnection poolableConnection;
try {
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
......
}
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
} else {
if (poolableConnection.conn.isClosed()) {
discardConnection(poolableConnection.holder);
continue;
}
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (idleMillis >= timeBetweenEvictionRunsMillis || idleMillis < 0) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
上来首先判断是否有空闲连接,如果没有,则直接创建,如果有连接,则直接获取
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
DruidConnectionHolder holder;
for (boolean createDirect = false;;) {
if (createDirect) {
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
}
}
......
try {
connectCount++;
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
......
} catch (Exception e) {
......
} finally {
lock.unlock();
}
break;
}
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
/**
* nanos=maxWait转纳秒
*/
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
for (;;) {
if (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
try {
// Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
estimate = notEmpty.awaitNanos(estimate); // signal by recycle or creator
// notEmpty.await(); // signal by recycle or creator // takeLast线程挂起方式,其它几乎一样
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
} finally {
notEmptyWaitThreadCount--;
}
}
......
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}
}
4、连接销毁
init()方法中createAndStartDestroyThread()的调用会新增一个销毁线程,根据destroyScheduler,决定新增哪种销毁线程,timeBetweenEvictionRunsMillis是调用销毁任务的周期时间
protected void createAndStartDestroyThread() {
destroyTask = new DestroyTask();
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}
线程每隔timeBetweenEvictionRunsMillis时间,调用一次销毁线程
public class DestroyConnectionThread extends Thread {
public void run() {
for (;;) {
try {
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000);
}
if (Thread.interrupted()) {
break;
}
// 从这里可以看出,两种方式最终执行的都是DestroyTask回收任务
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
}
线程销毁任务
public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
// 连接池瘦身
shrink(true, keepAlive);
// 每次连接使用完之后,没有正确的关闭,会导致连接泄露问题。
// removeAbandoned = true,验证长时间未归还的连接,是否强制回收。
// 当连接未执行任何sql,而且从借出到当前时间超出了可借时间removeAbandonedTimeout,则强制回收。
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
连接池瘦身,连接检查是否可用及抛弃多余连接。
public void shrink(boolean checkTime, boolean keepAlive) {
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
try {
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
if (checkTime) {
// phyTimeoutMillis 物理连接超时时间
if (phyTimeoutMillis > 0) {
// connectTimeMillis 连接创建时间
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
// 从连接创建到现在的时间超出连接超时时间,则抛弃连接
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// lastActiveTimeMillis 连接上次活跃时间
// minEvictableIdleTimeMillis 连接最小抛弃时间
// maxEvictableIdleTimeMillis 连接最大抛弃时间
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// keepAliveBetweenTimeMillis 保持活跃时间,空闲时间操作该时间,将会进行可用性判定
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
// 是否需要创建连接,以满足最小连接数量minIdle
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
// 需要抛弃的连接直接关闭
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
// 需要保活的连接进行可用性判定,判定可用则放进连接池中,不可用则抛弃
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
......
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L);
if (!putOk) {
discard = true;
}
}
// put失败,则直接关闭
if (discard) {
try {
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
// 关闭连接后,不能忘了一步,检查是否需要创建连接,如果需要,则激活创建连接线程
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
Arrays.fill(keepAliveConnections, null);
}
if (needFill) {
lock.lock();
try {
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
image.png
image.png
image.png
连接销毁,连接借出时间超出可借时间 removeAbandonedTimeoutMillis,则进行回收或抛弃
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
activeConnectionLock.lock();
try {
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext();) {
DruidPooledConnection pooledConnection = iter.next();
if (pooledConnection.isRunning()) {
continue;
}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
if (timeMillis >= removeAbandonedTimeoutMillis) {
iter.remove();
pooledConnection.setTraceEnable(false);
abandonedList.add(pooledConnection);
}
}
} finally {
activeConnectionLock.unlock();
}
if (abandonedList.size() > 0) {
for (DruidPooledConnection pooledConnection : abandonedList) {
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
}
}
return removeCount;
}
5、连接回收
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.holder;
final Connection physicalConnection = holder.conn;
......
// 从被占用的连接数组中移出
activeConnections.remove(pooledConnection);
......
final boolean isAutoCommit = holder.underlyingAutoCommit;
final boolean isReadOnly = holder.underlyingReadOnly;
final boolean testOnReturn = this.testOnReturn;
try {
// 如果在归还至连接池时发现此连接对象还有未处理完的事务,则直接回滚
if ((!isAutoCommit) && (!isReadOnly)) {
pooledConnection.rollback();
}
// reset holder, restore default settings, clear warnings
boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
if (!isSameThread) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
holder.reset();
} finally {
lock.unlock();
}
} else {
holder.reset();
}
//连接已被抛弃,则不作任何处理(不再归还)
if (holder.discard) {
return;
}
// 回收连接测试,默认不开启,开启会影响性能
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
if (!validate) {
JdbcUtils.close(physicalConnection);
......
return;
}
}
// 连接不可用,直接抛弃
if (!enable) {
discardConnection(holder);
return;
}
boolean result;
final long currentTimeMillis = System.currentTimeMillis();
if (phyTimeoutMillis > 0) {
// 连接从创建到现在的时间超出连接超时时间,则抛弃
long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
discardConnection(holder);
return;
}
}
lock.lock();
try {
......
// 回收的连接放回connections
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
// 回收失败,直接关闭
if (!result) {
JdbcUtils.close(holder.conn);
}
} catch (Throwable e) {
......
}
}
6、日志记录线程
private void createAndLogThread() {
if (this.timeBetweenLogStatsMillis <= 0) {
return;
}
String threadName = "Druid-ConnectionPool-Log-" + System.identityHashCode(this);
logStatsThread = new LogStatsThread(threadName);
logStatsThread.start();
this.resetStatEnable = false;
}
public class LogStatsThread extends Thread {
public void run() {
try {
for (;;) {
try {
logStats();
} catch (Exception e) {
LOG.error("logStats error", e);
}
Thread.sleep(timeBetweenLogStatsMillis);
}
} catch (InterruptedException e) {
// skip
}
}
}
public void logStats() {
final DruidDataSourceStatLogger statLogger = this.statLogger;
if (statLogger == null) {
return;
}
DruidDataSourceStatValue statValue = getStatValueAndReset();
statLogger.log(statValue);
}
注释:
testOnBorrow 获取连接后,是否进行连接测试
maxActive 允许的最大连接数
poolingCount 空闲连接数量
initialSize 初始化连接池连接数量
minEvictableIdleTimeMillis 空闲连接可以保持不会被废弃的最小时间
timeBetweenEvictionRunsMillis 每隔多久检查一次空闲连接 / 销毁连接的间隔时间
timeBetweenConnectErrorMillis 连接错误重试的时间间隔,默认500ms
timeBetweenLogStatsMillis 每个多长时间将监控日志记录输出到日志文件中
failFast 设置获取连接错误时,是否马上返回错误
notEmpty 获取连接时,发现没有可用连接,则notEmpty.wait()方法挂起当前线程,当通过调度任务创建连接成功后,创建任务会发出通知信号notEmpty.signal(),将挂起线程激活
phyTimeoutMillis 物理连接超时时间
connectTimeMillis 连接创建时间
lastActiveTimeMillis 最新活跃时间。刷新时机:
1、执行完sql之后就会更新
2、keepAlive检查通过之后刷新
3、初始化。创建物理连接时









网友评论