`
heidian
  • 浏览: 99105 次
  • 性别: Icon_minigender_1
  • 来自: 湖南
文章分类
社区版块
存档分类

DBPool

阅读更多
package com.huawei.utils.db;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;

import javax.sql.ConnectionEvent;
import javax.sql.ConnectionEventListener;
import javax.sql.DataSource;
import javax.sql.PooledConnection;

import org.apache.log4j.Logger;

import sun.jdbc.rowset.CachedRowSet;

import com.huawei.insa2.util.Args;

/**
 *
 * 数据库连接池.
 * 1,内建线程管理连接的连续性;
 * 2,设置有最大连接数目,如果超过就等待,知道超时,抛出一个异常;
 * 文件名:DBPool.java
 * 版权:Copyright 2001-2002 Huawei Tech. Co. Ltd. All Rights Reserved.
 * 描述: plat 1.0 统一平台
 */
/**
 * 数据库连接池,该连接池只是用来取得Connection对象,之后的使用和JDBC没有区别。所有内部处
 * 理过程均对调用者透明。该连接池实现了DataSource接口,用法可参见DataSource,并提供一些
 * 附加特性,例如离线结果集等。连接池参数的修改实时生效,通过setArgs()来完成。没有传入的参
 * 数保持原来的值不变。如果url、帐号、密码参数变化,则对于未分配出去的空闲连接将立即关闭,
 * 而对于已被申请出去的连接,仍可以继续使用,直到调用close()时关闭。一旦调用连接池的
 * close()方法,则该连接池将不可用。每个连接池会定期检查其维护的所有空闲连接是否还可用,如
 * 果发现连接不可用则将其剔除。重连间隔由watch-interval参数指定,如果设置有test-sql参数
 * 则通过执行该查询是否成功来判断连接是否可用(比较准确但开销较大),否则用连接的isClosed()
 * 方法来判断连接是否可用。连接池参数可在创建连接池时通过构造方法的Args参数来传入,可参见
 * Args类的说明。如果args里有多余的参数则被忽略。需要小心参数名不要写错,大小写敏感,否则
 * 该参数将被忽略。可用参数如下:
 * <table>
 * <tr><td>名称</td><td>含义</td><td>缺省值</td></tr>
 * <tr><td>name</td><td>连接池名称</td><td>dbpool</td></tr>
 * <tr><td>driver</td><td>驱动程序类名</td><td>不可缺省</td></tr>
 * <tr><td>url</td><td>JDBC连接的URL</td><td>不可缺省</td></tr>
 * <tr><td>login-timeout</td><td>数据库登录超时时间。单位:秒</td>
 *     <td>驱动程序自己决定</td></tr>
 * <tr><td>connection-timeout</td><td>getConnection()取得连接的最长等待时间,
 *     超过该时间仍然等不到空闲连接则抛出SQLException登录超时时间。0表示不等待,
 *     -1表示永远等待,其他值表示等待的秒数</td><td>0,即不等待。</td></tr>
 * <tr><td>user</td><td>登录数据库的用户名</td><td>null</td></tr>
 * <tr><td>password</td><td>登录数据库的密码</td><td>null</td></tr>
 * <tr><td>max-connection</td><td>连接池允许创建的最大连接数</td><td>100</td></tr>
 * <tr><td>max-free-connection</td><td>连接池允许维持的最大空闲连接数,
 *     超过该数量的空闲连接将被释放</td><td>100</td></tr>
 * <tr><td>watch-interval</td><td>定期检查空闲连接是否可用的间隔时间。单位:秒</td>
 *     <td>-1,表示不进行定期检查</td></tr>
 * <tr><td>correct-charset</td><td>是否需要做字符集更正[true|false]。
 * 如果数据库操作汉字遇到问题可试着改变该参数。</td><td>false</td></tr>
 * </table>
 */
public class DBPool
    implements DataSource, ConnectionEventListener
{
  // 同步锁。
  private final Object lock = new Object();

  // 数据库连接池名称。
  private String name = "dbpool";

  // 数据库驱动程序类名。
  private String driver;

  // 数据库URL。
  private String url;

  // 数据库登录用户名。
  private String user;

  // 数据库登录用户密码。
  private String password;

  //是否锁定请求,默认是锁定的。
  private boolean lockGetConnectionThread = true;

  // 日志记录的时间格式。
  private static DateFormat df = new SimpleDateFormat(
      "yyyy-MM-dd HH:mm:ss.SSS");

  // 当前连接池维护的所有空闲连接。里面放的是被缓冲的原始Connection对象。
  private LinkedList freeConns = new LinkedList();

  // 当前连接池已分配出去的连接。里面放的是被缓冲的原始Connection对象。
  private LinkedList allocatedConns = new LinkedList();

  // 设定允许缓冲的最大连接个数,缺省100.
  //设定最大连接数目;
  private int maxConnNum = 10;

  // 最大空闲连接个数,超过这个就杀死;
  private int maxFreeConnNum = (int) (maxConnNum * 0.8);

  // 申请连接最大超时时间。单位:毫秒,0表示不等待,-1表示永远等待,缺省为不等待。
  private int connTimeout = 0;

  //一个连接占用的最大时间,增加超时限定,<=0不限制,>0限制,单位秒。
  private int maxUsedTimeAConnection = 20000;

  // 内部输出日志流。
  private static final Logger logger = Logger.getLogger(DBPool.class);

  // 输出错误日志的流。
  private PrintWriter printWriter = new PrintWriter(System.out);

  // 监视间隔时间,单位毫秒。
  private long watchInterval = -5000;

  //****连接超时时间,当连接超时,直接清理掉所有连接,进行重连,默认5小时重连一次。
//    private long connectionReshTimeLimit = 1000*3600*5;

   //****第一次连接时间,整个连接池第一次建立连接的时间。
//    private long firstConnTime = 1000*3600*5;



    // 是否更正字符集。被PooledResultSet使用。
    private boolean correctCharset;

  // 用来测试数据库连接是否正常的sql语句。
  private String testSQL = "";

  // tidy检查连接的位置。
  private int tidyPosition = 0;

  // 监视数据库连接中连接是否还好使的线程。
  private TidyThread watchThread;

  //当前连接池的状态。
  private int POOL_STATE = 0; //0,ok,-1,不可用。

  //获得连接的限制lock。
  private byte[] getConLock = new byte[0];

  /**
   * 连接池监视线程,定义成嵌入类可使生命周期与DBPool保持一致。
   */
  class TidyThread
      extends Thread
  {
    /** 该线程存活标志,kill()方法将该标志置为false。*/
    private boolean alive = true;

    /**
     * 监视线程的构造方法。
     * @param name 线程名称。
     */
    public TidyThread(String name)
    {
      super(name);
    }

    /**
     * 杀死这个线程。
     */
    public void kill()
    {
      alive = false;
    }

    /**
     * 线程主体,循环运行task()方法,直到调用了kill()方法。
     */
    public final void run()
    {
      //无论出现什么异常都不能使该线程终止!
      while (alive)
      {
        try
        {
          Thread.sleep(watchInterval);
        }
        catch (Exception ex)
        {
        }
        try
        {
          tidy();
        }
        catch (Throwable t)
        {
          //出现严重错误,搞不好系统会死掉
          logger.debug("TidyThread tidy fatall error.", t);
        }
      }
    }
  };

  /**
   * 创建数据库连接池。有效参数列表参见类说明。必选参数包括driver、url,
   * 其他参数都有缺省值。
   *
   * @param args 连接池参数。
   * @throws SQLException
   */
  public DBPool(Args args)
      throws SQLException
  {
    setArgs(args);
  }

  /**
   * 设置连接池参数。实时生效。具体可用参数列表参见类说明。
   * args里不存在的参数保持原值不变。
   *
   * @param args 参数列表。
   * @throws SQLException
   */
  public void setArgs(Args args)
      throws SQLException
  {

    //取得JDBC驱动程序类名
    driver = args.get("driver", driver);

    if (driver == null || driver.trim().length() == 0)
    {
      logger.debug("driver parameter absent");
      throw new IllegalArgumentException("driver parameter absent");
    }

    driver = driver.trim();
    //加载数据库驱动程序
    try
    {
      Class.forName(driver);
    }
    catch (ClassNotFoundException ex)
    {
      logger.debug("can't find JDBC driver: " + driver);
      throw new SQLException("can't find JDBC driver: " + driver);
    }
    catch (Exception ex)
    {
      logger.debug("load JDBC driver fail: " + ex.getMessage(),ex);
      throw new SQLException("load JDBC driver fail: " + ex.getMessage());
    }

    //取得数据库URL
    String oldURL = url; 
    url = args.get("url", url);
    if (url != null)
    {
      url = url.trim();
    }

    if (url == null || url.length() == 0)
    {
      logger.debug("url parameter absent");
      throw new IllegalArgumentException("url parameter absent");
    }

    //取得数据库连接池名称
    name = args.get("name", name);

    //是否需要做字符集更正
    correctCharset = args.get("correct-charset", correctCharset);

    //设置用来测试数据库连接是否正常的SQL语句
    testSQL = args.get("test-sql", testSQL);
    if (testSQL != null && testSQL.trim().length() == 0)
    {
      testSQL = null;
    }

    //取得连接等待时长
    connTimeout = args.get("connection-timeout", 60);
    if (connTimeout < -1)
    {
      logger.debug("connection-timeout parameter error.connTimeout:"+connTimeout);
      throw new IllegalArgumentException("connection-timeout parameter error.");
    }
    connTimeout *= 1000; //变成毫秒

    //取得登录超时时间,缺省值是系统默认值
    int init_timeout = DriverManager.getLoginTimeout();
    init_timeout = (init_timeout > 60) ? init_timeout : 60;
    int login_timeout = args.get("login-timeout",init_timeout);
    if(login_timeout > 0)
    {
        DriverManager.setLoginTimeout(login_timeout);
    }
    //取得数据库登录用户帐号(可选参数)
    String oldUser = user;
    user = args.get("user", user);

    //取得数据库登录用户密码(可选参数)
    String oldPassword = password;
    password = (String) args.get("password", password);

    //取得数据库最大连接数(可选参数)
    maxConnNum = args.get("max-connection", maxConnNum);

    //取得数据库最大空闲连接数(可选参数)
    maxFreeConnNum = args.get("max-free-connection", maxFreeConnNum);

    //监视周期
    watchInterval = args.get("watch-interval", watchInterval / 1000) *
        1000;

    //获得锁的情况。
    String tempStr = args.get("lockGetConnectionThread", "true");
    if (tempStr.equalsIgnoreCase("false"))
    {
      lockGetConnectionThread = false; //args.get("lockGetConnectionThread", true);
    }
    else
    {
      lockGetConnectionThread = true;
    }

    if (watchThread != null)
    {
      watchThread.kill();
      watchThread = null;
    }

    //根据情况决定是否重新启动监视线程。
    if (watchInterval > 0)
    {
      watchThread = new TidyThread(url + "-watcher"); //创建线程name
      watchThread.start(); //启动线程
    }

    /********** deleted by zhw 41915 20050211 修改隐患,防止异常。
             if (watchInterval != 0 && watchThread == null) //需要启动监视线程
             {
        watchThread = new TidyThread(name + "-watcher"); //创建线程
        watchThread.start(); //启动线程
             }
             else if (watchInterval == 0 && watchThread != null) //需要停止监视线程
             {
        watchThread.kill();
        watchThread = null;
             }
             else if (watchThread != null)
             {
        watchThread.setName(name);//不稳定的,如果线程正在运行,将抛出异常。
             }
             else
             {
        //其他情况不管.
             }
     **********/
    if (oldURL == null || !oldURL.equals(url) ||
        (oldUser == null && user != null)
        || (oldUser != null && !oldUser.equals(user))
        || (oldPassword == null && password != null)
        || (oldPassword != null && !oldPassword.equals(password)))
    {
      reset();
    }
  }

  /**
   * 执行查询的快捷方式。输入一个select语句,返回查询结果。
   * 该操作返回的结果集是离线结果集,
   * 对于大数据量的查询不推荐使用该方式。
   * @param sql SQL查询语句。
   * @return 查询到的结果集。
   * @throws SQLException 如果数据库存取出错或者该SQL语句返回结果集。
   */
  public ResultSet query(String sql)
      throws SQLException
  {
    //最大只能查找到1000条,因为是离线结果集合.
    int maxRow = 100000;
    Connection conn = null;
    Statement stmt = null;
    ResultSet rs =null;
    try
    {
      conn = getConnection();
      stmt = conn.createStatement();
      rs = stmt.executeQuery(sql);

      //将数据取出放入离线结果集
      CachedRowSet crs = new CachedRowSet();
      crs.setType(ResultSet.TYPE_SCROLL_INSENSITIVE);
      crs.setMaxRows(maxRow);
      crs.populate(rs);
      return crs;
    }
    finally
    {
      //关闭ResultSet
      closeResultSetQuietly(rs);
      //关闭Statament
      closeStatamentQuietly(stmt);
      //关闭connection
      closeConnectionQuietly(conn);
    }
  }

  /**
   * 执行查询的快捷方式。输入一个select语句,返回查询结果。
   * 该操作返回的结果集是离线结果集,
   * 对于大数据量的查询不推荐使用该方式。
   * @param sql SQL查询语句。
   * @return 查询到的结果集。
   * @throws SQLException 如果数据库存取出错或者该SQL语句返回结果集。
   */
  public ResultSet[] queryMultiRS(String[] sql)
      throws SQLException
  {
    ResultSet[] rs = new ResultSet[sql.length];
    int maxRow = 1000;

    Connection conn = this.getConnection();
    Statement stat = null;
    ResultSet rsxx = null;
    try
    {
      conn.setAutoCommit(false);
      stat = conn.createStatement();
      for (int index = 0; index < sql.length; index++)
      {
        rsxx = stat.executeQuery(sql[index]);
        //将数据取出放入离线结果集
        CachedRowSet crs = new CachedRowSet();
        crs.setType(ResultSet.TYPE_SCROLL_INSENSITIVE);
        crs.setMaxRows(maxRow);
        crs.populate(rsxx);
        rs[index] = crs;
      }
      conn.commit();
    }
    catch (SQLException ex)
    {
        throw ex;
    }
    finally
    {
      setAutoCommit(conn,true);
      //关闭resultset
      closeResultSetQuietly(rsxx);
      //关闭statament
      closeStatamentQuietly(stat);
      //关闭connection
      closeConnectionQuietly(conn);
    }
    return rs;
  }

  /**
   * 执行批处理并不返回结果集合。
   * @param sql SQL查询语句。
   * @return 查询到的结果集。
   * @throws SQLException 如果数据库存取出错或者该SQL语句返回结果集。
   */
  public int[] executeSqlBatch(String[] sql)
      throws SQLException
  {
    int[] result = null;

    Connection conn = this.getConnection();
    Statement stat = null;
    try
    {
      conn.setAutoCommit(false);
      stat = conn.createStatement();

      //执行批处理。
      for (int index = 0; index < sql.length; index++)
      {
        stat.addBatch(sql[index]);
      }
      result = new int[sql.length];
      result = stat.executeBatch();
      conn.commit();
    }
    catch (SQLException ex)
    {
      throw ex;
    }
    finally
    {
      setAutoCommit(conn,true);
      //关闭statament
      closeStatamentQuietly(stat);
      //关闭connection
      closeConnectionQuietly(conn);
    }
    return result;
  }

  /**
   * 执行不返回结果集的SQL语句。例如:UPDATE、DELETE、INSERT语句或建表等DDL操作。
   *
   * @return 插入、删除、修改的记录条数,或DDL操作则返回0。
   * @param sql String
   * @throws SQLException
   */
  public int update(String sql)
      throws SQLException
  {
    Connection conn = null;
    Statement stmt = null;
    int n = 0;
    try
    {
      conn = getConnection();
      stmt = conn.createStatement();
      n = stmt.executeUpdate(sql);
    }
    finally
    {
      //关闭statament
      closeStatamentQuietly(stmt);
      //关闭connection
      closeConnectionQuietly(conn);
    }
    return n;
  }

  /**
   * 批量执行sql的statement,直接传递参数进去
   * 目前支持只使用一个sql语句,多种类型不同参数组的方式。
   *
   * @throws SQLException
   * @param sql String
   * @param stat PreStatement[]
   */
  public void executePreparedBatch(String sql, PreStatement[] stat)
      throws
      SQLException
  {
    Connection con = null;
    PreparedStatement pstat = null;
    try
    {
      con = this.getConnection();
      con.setAutoCommit(false);
      pstat = con.prepareStatement(sql);

      for (int index = 0; index < stat.length; index++)
      {
        //填充参数。
        for (int mm = 0; mm < stat[index].getParametersCount(); mm++)
        {
          switch (stat[index].getParams().getParameters()[mm].
              type)
          {
            case java.sql.Types.INTEGER:
              pstat.setInt(mm + 1,
                  ( (Integer) stat[index].getParams().
                  getParameters()[
                  mm].data).intValue());
              break;
            case java.sql.Types.VARCHAR:
              pstat.setString(mm + 1,
                  (stat[index].getParams().
                  getParameters()[mm].data).
                  toString());
              break;
            case java.sql.Types.TIMESTAMP:
              pstat.setTimestamp(mm + 1,
                  ( (java.sql.Timestamp) stat[
                  index].
                  getParams().getParameters()[
                  mm].data));
              break;
            case java.sql.Types.FLOAT:
              pstat.setFloat(mm + 1,
                  ( (Float) stat[index].getParams().
                  getParameters()[
                  mm].data).floatValue());
              break;
            case java.sql.Types.DOUBLE:
              pstat.setDouble(mm + 1,
                  ( (Double) stat[index].
                  getParams().getParameters()[
                  mm].data).doubleValue());
              break;
            case java.sql.Types.JAVA_OBJECT:
              pstat.setObject(mm + 1,
                  stat[index].getParams().getParameters()[
                  mm].data);
              break;
            case java.sql.Types.NUMERIC:
              pstat.setInt(mm + 1,
                  ( (Integer) stat[index].getParams().
                  getParameters()[
                  mm].data).intValue());
              break;
            default: //缺省都按照字符串处理
              pstat.setString(mm + 1,
                  (stat[index].getParams().
                  getParameters()[mm].data).
                  toString());
              break;
          }
        }
        pstat.addBatch();
      }
      pstat.executeBatch();
      con.commit();
    }
    catch (SQLException ex)
    {
      con.rollback();
      throw ex;
    }
    finally
    {
      //设置自动提交
      setAutoCommit(con,true);
      //关闭statament
      closeStatamentQuietly(pstat);
       //关闭connection
      closeConnectionQuietly(con);
    }
  }

  /**
   * 批量执行sql的statement,直接传递参数进去
   * 目前支持只使用一个sql语句,多种类型不同参数组的方式。
   *
   * @throws SQLException
   * @param stat PreStatement[]
   */
  public void executePrepared(PreStatement stat)
      throws
      SQLException
  {
    Connection con = null;
    PreparedStatement pstat = null;
    try
    {
      con = this.getConnection();
      con.setAutoCommit(false);
      pstat = con.prepareStatement(stat.sql);
      //填充参数。
      for (int mm = 0; mm < stat.getParametersCount(); mm++)
      {
        switch (stat.getParams().getParameters()[mm].type)
        {
          case java.sql.Types.INTEGER:
            pstat.setInt(mm + 1,
                ( (Integer) stat.getParams().
                getParameters()[
                mm].data).intValue());
            break;
          case java.sql.Types.VARCHAR:
            pstat.setString(mm + 1,
                (stat.getParams().getParameters()[mm].
                data).
                toString());
            break;
          case java.sql.Types.TIMESTAMP:
            pstat.setTimestamp(mm + 1,
                ( (java.sql.Timestamp) stat.
                getParams().getParameters()[mm].
                data));
            break;
          case java.sql.Types.FLOAT:
            pstat.setFloat(mm + 1,
                ( (Float) stat.getParams().
                getParameters()[
                mm].data).floatValue());
            break;
          case java.sql.Types.DOUBLE:
            pstat.setDouble(mm + 1,
                ( (Double) stat.getParams().
                getParameters()[
                mm].data).doubleValue());
            break;
          case java.sql.Types.JAVA_OBJECT:
            pstat.setObject(mm + 1,
                stat.getParams().getParameters()[mm].
                data);
            break;
          case java.sql.Types.NUMERIC:
            pstat.setInt(mm + 1,
                ( (Integer) stat.getParams().
                getParameters()[
                mm].data).intValue());
            break;
          default: //缺省都按照字符串处理
            pstat.setString(mm + 1,
                (stat.getParams().getParameters()[mm]).
                toString());
            break;
        }
      }
      pstat.execute();
      con.commit();
    }
    catch (SQLException ex)
    {
      con.rollback();
      throw ex;
    }
    finally
    {
      //设置自动提交
      setAutoCommit(con,true);
      //关闭statament
      closeStatamentQuietly(pstat);
      //关闭connection
      closeConnectionQuietly(con);
    }
  }

  /**
   * 费batch方式调用callable。
   * @throws SQLException
   * @param stat PreStatement[]
   */
  public void executeCallable(PreStatement stat)
      throws
      SQLException
  {
    Connection con = null;
    CallableStatement callstat = null;
    try
    {
      con = this.getConnection();
      callstat = con.prepareCall(stat.sql);
      stat.execSQLCallableStat(callstat);
    }
    catch (SQLException ex)
    {
      throw ex;
    }
    finally
    {
      closeStatamentQuietly(callstat);
      closeConnectionQuietly(con);
    }
  }

  /**
   * 取得一个连接。如果连接数达到缓冲极限后再申请的连接将会阻塞,直到指定的超时时间到达。
   * 若仍没有空闲连接则抛SQLException。在任何情况下该方法均不返回null。
   * @return 返回申请到的连接对象。
   * @throws SQLException 数据库操作异常
   */
  public Connection getConnection()
      throws SQLException
  {
    //首先判断是否可用连接池。
    this.checkConState();

    return getInnerConnection();
  }

  /**
   * 取得一个连接。如果连接数达到缓冲极限后再申请的连接将会阻塞,直到指定的超时时间到达。
   * 若仍没有空闲连接则抛SQLException。在任何情况下该方法均不返回null。
   * @return 返回申请到的连接对象。
   * @throws SQLException 数据库操作异常
   */
  private Connection getInnerConnection()
      throws SQLException
  {
    synchronized (lock)
    {
      //判断连接池是否已被关闭
      if (freeConns == null)
      {
        logger.debug("this pool is closed");
        throw new IllegalStateException("this pool is closed");
      }
      int freeConnNum = freeConns.size();
      int allocatedConnNum = allocatedConns.size();

      logger.debug("**********freeConns.size():" + freeConns.size()
          + "   allocatedConns.size():" +
          allocatedConns.size() + " Thread:"
          + Thread.currentThread().getName());

      if (freeConnNum > 0) //缓冲池中有空闲连接。
      {
        //xxx 空闲连接轮流使用效率高还是尽量用同一个效率高?
        //当然是使用一个高.
        //Connection c = (Connection) freeConns.removeFirst(); //轮流用
       Connection c = (Connection) freeConns.removeLast(); //尽量同一个
        PooledDBConnection pdb = new PooledDBConnection(c, this);
        allocatedConns.add(pdb); //在使用的池子.
        return pdb;
        //test... return c;
      }
      else //缓冲池中无空闲连接
      {
        //没有到达最大连接数量
        if (freeConnNum + allocatedConnNum < maxConnNum)
        {
          Connection c =
              DriverManager.getConnection(url, user, password);
          PooledDBConnection pdb = new PooledDBConnection(c, this);
          allocatedConns.add(pdb);
          return pdb;
          //return c;
        }
        else //达到最大连接个数
        {
          try
          {
            long costTime = 0;
            long beforeWait = System.currentTimeMillis();

            //循环的原因是有可能notify通知时仍可能没有空闲连接。
            //从这里进入循环等待..........
            while (true)
            {
              if (connTimeout <= 0)
              {
                lock.wait(); //永远等待
              }
              else if (connTimeout > 0)
              {
                lock.wait(connTimeout - costTime); //等待指定时间
              }

              //判断连接池是否已被关闭
              if (freeConns == null)
              {
                logger.debug("this pool is closed");
                throw new IllegalStateException("this pool is closed");
              }
              freeConnNum = freeConns.size();

              //等到一个空闲连接
              if (freeConnNum > 0)
              { //从池子中取出一个连接来.
                Connection c = (Connection) freeConns.
                    removeLast();
                PooledDBConnection pdb = new PooledDBConnection(
                    c, this);
                allocatedConns.add(pdb);
                return pdb;
                //return c;
              }

              //没等到空闲连接
              costTime = System.currentTimeMillis() - beforeWait;
              if (costTime >= connTimeout) //超时时间到,没机会了
              {
                logger.debug("connection number reachs the upper limit("
                  + maxConnNum + "). Timeout while waiting a connection.");
                throw new SQLException(
                    "connection number reachs the upper limit("
                    + maxConnNum + "). Timeout while waiting a connection.");
              }
            }
          }
          catch (InterruptedException ex) //等待被人为打断
          {
            logger.debug("interrupted while waiting connection."+ex.getMessage(),ex);
            throw new SQLException("interrupted while waiting connection.");
          }
        }
      }
    }
  }

  /**
   * 用指定的用户名和密码创建数据库连接,只有当用户名和密码和连接池配置匹配时才返回连接池
   * 中的连接,否则建立新连接并返回。
   *
   * @param dbUser 登录数据库用的帐号。
   * @param dbPswd 登录数据库用的密码。空密码请用""而不是null。
   * @throws SQLException
   * @return Connection
   */
  public Connection getConnection(String dbUser, String dbPswd)
      throws
      SQLException
  {
    if (dbUser == null)
    {
      logger.debug("parameter user is null");
      throw new NullPointerException("parameter user is null");
    }
    if (dbPswd == null)
    {
      logger.debug("parameter password is null");
      throw new NullPointerException("parameter password is null");
    }
    if (dbUser.equals(user) && dbPswd.equals(password))
    {
      return getConnection();
    }
    else
    {	
      // 修正一个隐患,无法使用内部连接池。
      Connection c =
          DriverManager.getConnection(url, user, password);
      PooledDBConnection pdb = new PooledDBConnection(c, this);
      allocatedConns.add(pdb);
      return pdb;

    }
  }

  /**
   * 设置登录超时时间。
   * @param time 以秒为单位的登录超时时长。
   */
  public void setLoginTimeout(int time)
  {
    DriverManager.setLoginTimeout(time);
  }

  /**
   * 取得登录超时时间。
   * @return 以秒为单位的登录超时时长。
   */
  public int getLoginTimeout()
  {
    return DriverManager.getLoginTimeout();
  }

  /**
   * 设置日志输出流。
   * @param pw 新的日志输出流。
   */
  public void setLogWriter(PrintWriter pw)
  {
    this.printWriter = pw; //        pw ;
  }

  /**
   * 取得当前记录日志的输出流。
   * @return 单前日志使用的输出流。
   */
  public PrintWriter getLogWriter()
  {
    return this.printWriter;
  }

  /**
   * 取得连接池空闲连接个数。
   * @return 空闲连接个数。
   */
  public int getIdle()
  {
    if (freeConns == null)
    {
      throw new IllegalStateException("DBPool closed");
    }
    return freeConns.size();
  }

  /**
   * 整理连接池中的连接。该方法被定期执行。该操作测试池中连接是否可用。为减少开销,每次执行
   * tidy时只检查一个连接(但尽量做到每次调用tidy时检查的连接都不同),如果发现有一个坏掉
   * 则清除所有空闲连接。注意并不会影响已经分配出去的连接的使用,若DBPool被tidy()清理过,
   * 则当之前分配出去的连接被归还时将会被直接关闭,而不会被重复使用。
   *
   * 希望能够增加特性,能够同时检查allocatedConns,如果其中某个连接超过某个时间仍然为空
   * 张伟修改,直接从连接池拿一个连接,使用getConnection
   */
  private void tidy()
  {
    synchronized (lock)
    {
      //从这里自动清除多余分撇的连接.
      for (int i = maxFreeConnNum; i < freeConns.size(); i++)
      {
    	  freeConns.removeFirst(); 	
      }
    }
    innerTidy(); //执行状态检查,同时争取连接,检查同时决定连接是否可用。
  }

  /**
   * 内部的清理方法。
   */
  private void innerTidy()
  {
    //added by zhw 41915 20050211 防止test-sql 为空的情况发生。
    if ( (this.testSQL != null) && (testSQL.trim().length() >= 0))
    {
      //直接选一个连接进行连接测试,如果失败,直接清除 .
      PooledConnection pc = null;
      boolean badConnectionFlag = false;
      try
      {
        // 检查连接,若不可用将抛出SQLException
        pc = (PooledConnection) getInnerConnection();
        test(pc.getConnection());

        //测试成功,直接打开锁。
        logger.debug("-----------------------DBPool test OK.");
        if (this.getConLockState() != 0)
        {
          //锁定getconnection,不允许其他人使用了。
          this.setConLockState(0);
          this.notifyGetConRequest();
        }
        return;
      }
      catch (Exception ex) //抛出异常也将连接剔除,一旦发生异常把所有连接断掉,清理。
      {
        logger.error("Test db connection error:"+ex.getMessage(),ex);
        if (ex.getMessage().indexOf("reachs the upper limit(") == -1)
        {
          //锁定getconnection,不允许其他人使用了。
          this.setConLockState( -1);

          badConnectionFlag = true;
          logger.debug("Tidy connection for " + ex.getMessage());
          logger.debug(
              "------------------Find dbpool disconnect ,begin clear connection.");
          synchronized (lock)
          {
            //释放所有已分配连接
            allocatedConns.clear();

            //关闭所有空闲连接
            while (!freeConns.isEmpty())
            {
              try
              {
                ( (Connection) freeConns.removeFirst()).close();
              }
              catch (Exception e)
              {
                //尽量尝试关闭连接,若非要抛出异常也没办法
              }
            }
          }
        }
        else
        {
          //否则对于超限使用可以不理会。
        }
      }
      finally
      {
        if (pc != null)
        {
          try
          {
            if (!badConnectionFlag)
            {
              pc.close(); //放回连接。
            }
            else
            {
              pc.getConnection().close(); //直接关闭,不用回收。
            }
          }
          catch (Exception ex)
          {
            //忽略.
          }
        }
      }
    }
  }

  /**
   * 旧有的tidy方法。
   */
  private void oldTidy()
  {
    synchronized (lock)
    {
      if (freeConns == null || freeConns.size() == 0)
      {
        return;
      }

      //added by zhw 41915 20050211 防止test-sql 为空的情况发生。
      if ( (this.testSQL != null) && (testSQL.trim().length() >= 0))
      {
        //轮流选取连接进行检查
        Connection c = null;
        try
        {
          // 检查连接,若不可用将抛出SQLException
          //使用的很巧妙!!
          test( (Connection) freeConns.get( (tidyPosition++) %
              freeConns.size()));
          return;
        }
        catch (SQLException ex) //抛出异常也将连接剔除,一旦发生异常把所有连接断掉,清理。
        {
          logger.debug("tidy connection for " + ex.getMessage());
          //关闭所有空闲连接
          while (!freeConns.isEmpty())
          {
            try
            {
              ( (Connection) freeConns.removeFirst()).close();
            }
            catch (Exception e)
            {
              //尽量尝试关闭连接,若非要抛出异常也没办法
            }
          }
          //释放所有已分配连接
          allocatedConns.clear();
        }
        //从这里自动清除多余分撇的连接.
        for (int i = maxFreeConnNum; i < freeConns.size(); i++)
        {
          freeConns.removeFirst();
        }
      }

      //added by zhw 41915 20050211 增加对
      //增加对allocatedConns的清理,增加时间标识,如果真的超时,直接清理掉。
      //关闭的是已经分配的connection。
//            tidyTimeoutConnection();
    }

  }

  /**
   * 清理超时的连接。
   *
   */
  private void tidyTimeoutConnection()
  {
    long nowTime = System.currentTimeMillis();
    Iterator iter = allocatedConns.iterator();
    PooledDBConnection pdb = null;

    String info = "Time:" + this.df.format(new Date(nowTime)) + "\n\r"
        + "DBPool:" + url + "\n\r"
        + "Connection state:" + "free:" + this.freeConns.size() +
        "  used:" + this.allocatedConns.size() + "\n\r";
    logger.debug(info);

    while (iter.hasNext())
    {
      pdb = (PooledDBConnection) iter.next();
      if ( (nowTime - pdb.lastUpdateTime) >=
          this.maxUsedTimeAConnection)
      {
        try
        {
          logger.debug(
              "Find a connection executing timeOut,system will force to close it.");
          pdb.close(); //强制close,归还连接。
        }
        catch (Exception ex)
        {
          //忽略
        }
      }
    }
  }

  /**
   * 关闭连接池,释放所有资源。调用之后该连接池不能再使用。
   * 从该连接池获得的所有连接将被关闭。
   */
  public void close()
  {
    synchronized (lock)
    {
      if(watchThread != null)
      {
        watchThread.kill();
        watchThread = null;
      }

      //关闭所有空闲连接
      while (!freeConns.isEmpty())
      {
        try
        {
          ( (Connection) freeConns.removeFirst()).close();
        }
        catch (Exception ex)
        {
        }
      }
      freeConns = null;

      //释放所有已分配连接
      allocatedConns.clear();
    }
  }

  /**
   * 字符串描述。
   * @return 连接池的字符串描述。
   */
  public String toString()
  {
    synchronized (lock)
    {
      int free = freeConns.size();
      int total = freeConns.size() + allocatedConns.size();
      return "DBPool: name=" + name + ", driver=" + driver
          + ", url=" + url + ", user=" + user + ", password="
          + "********" + ", max-connection=" + maxConnNum
          + ", max-free-connection" + maxFreeConnNum
          + ",correct-charset=" + correctCharset
          + ", current free/total connection is "
          + free + '/' + total;
    }
  }

//////////////////////////////////////////////////////////////////////////////////////////////
//ConnectionEventListener..............实现

  /**
   * 内部阻塞getConnection();请求。 以下四个函数使用了pool_state和getConLock。
   *
   * @throws SQLException
   */
  private void checkConState()
      throws SQLException
  {
    long waitInterval = 10000;

    if (!lockGetConnectionThread)
    {
      if (this.getConLockState() == 0)
      {
        return;
      }
      else
      {
        throw new SQLException("connection cannot available.poolstate:" +
            this.getConLockState());
      }
    }

    while (this.getConLockState() != 0)
    {
      //连接池异常,阻塞。
      try
      {
        synchronized (this.getConLock)
        {
          this.getConLock.wait(waitInterval);
        }
      }
      catch (Exception ex)
      {
        //忽略。
      }
    }
  }

  /**
   * 内部唤醒getConnection();请求。
   */
  private void notifyGetConRequest()
  {
    synchronized (this.getConLock)
    {
      this.getConLock.notifyAll();
    }
  }

  /**
   * 获得连接状态。
   *
   * @return int
   */
  private int getConLockState()
  {
    synchronized (this.getConLock)
    {
      return this.POOL_STATE;
    }
  }

  /**
   * 内部设置连接状态。
   *
   * @param state ConnectionEvent
   */
  private void setConLockState(int state)
  {
    synchronized (this.getConLock)
    {
      this.POOL_STATE = state;
    }
  }

  /**
   * 连接上出现异常事件响应方法。
   * @param e 事件细节
   */
  public void connectionErrorOccurred(ConnectionEvent e)
  {
    logger.debug(e.getSQLException());
    try
    {
      Connection conn = ( (PooledConnection) e.getSource()).
          getConnection();
      //freeConnection(conn);
      kill(conn);
    }
    catch (SQLException ex)
    {
      logger.error(ex.getMessage(),ex);
    }
  }

  /**
   * 缓冲连接被关闭。
   * @param e 事件细节
   */
  public void connectionClosed(ConnectionEvent e)
  {
//        Connection conn = null;
//        try
//        {
//            conn = ( (PooledConnection) e.getSource()).getConnection();

    //在这里接收到需要被回收的connection;
    PooledDBConnection conn = (PooledDBConnection) e.getSource();
    freeConnection(conn);
//        }
//        catch (SQLException ex)
//        {
//            kill(conn);
//            log(ex);
//        }
  }

//ConnectionEventListener..............实现.
/////////////////////////////////////////////////////////////

  /**
   * 记录日志。
   * @param message 日志信息。
   * @return 在头部添加了连接池信息的日志信息,用于抛异常时填充的信息。
   */
  protected String log(Object message)
  {
    this.logger.debug(df.format(new Date())
        + locate() + name + ':' + message);
    return message.toString();
  }

  /**
   * 检查一个数据库连接是否正常。
   * @param c 一个真正的JDBC连接对象。
   * @throws SQLException 如果连接异常则抛出。
   */
  public void test(Connection c)
      throws SQLException
  {
    Statement stmt = null;
    ResultSet rs = null;
    try
    {
      //如果底层socket断掉了,我们如何重新连接?
      stmt = c.createStatement();

      //有testSQL参数,则通过SQL语句能否成功执行来判断连接是否正常(最保险的办法)
      if (testSQL != null)
      {
        rs = stmt.executeQuery(testSQL);
        rs.close();
      }
      stmt.close();
    }
    finally //若抛异常则认为连接坏掉
    {
      closeResultSetQuietly(rs);
      closeStatamentQuietly(stmt);
    }
  }

  /**
   * 清除当前所有连接,关闭空闲连接,释放已分配连接(但不关闭)。
   */
  protected void reset()
  {
    Connection c;
    synchronized (lock)
    {
      allocatedConns.clear();
      while (!freeConns.isEmpty())
      {
        c = (Connection) freeConns.getFirst();
        try
        {
          c.close();
        }
        catch (SQLException ex)
        {
          //忽略
        }
      }
    }
  }

  /**
   * 根据correct-charset参数决定是否做字符集更正,用于从DB取出的字符串。
   * @param str 从数据库中取出的字符串。
   * @return 转换成正确字符集的字符串。
   */
  protected String fromDB(String str)
  {
    if (correctCharset)
    {
      if (str == null)
      {
        return null;
      }
      try
      {
        return new String(str.getBytes("iso-8859-1"));
      }
      catch (UnsupportedEncodingException ex)
      {
        return str;
      }
    }
    else
    {
      return str;
    }
  }

  /**
   * 根据correct-charset参数决定是否做字符集更正,用于将要放入DB的字符串。
   * @param str 打算传递给数据库操作的字符串。
   * @return 转换成数据库支持的字符串。
   */
  protected String toDB(String str)
  {
    if (correctCharset)
    {
      if (str == null)
      {
        return null;
      }
      try
      {
        return new String(str.getBytes(), "iso-8859-1");
      }
      catch (UnsupportedEncodingException ex)
      {
        return str;
      }
    }
    else
    {
      return str;
    }
  }

  /**
   * 用于定位调用堆栈层次中调用本类某个方法(或者更高层次)时的位置。
   * @return 位置信息(类、函数、代码行)。
   */
  private static String locate()
  {
    final StringWriter sw = new StringWriter(32);

    new Exception().printStackTrace(
        new PrintWriter(sw)
    {
      private int i = 1; //输出堆栈第几层
      public void println(Object o)
      {
        //跳过描述信息
      }

      public void println(char[] c)
      {
        println(new String(c));
      }

      public void println(String str)
      {
        if (str.indexOf("com.huawei.utils.db") < 0) //跳过堆栈最里层
        {
          sw.getBuffer().setLength(0);
          str = str.substring(str.indexOf('('));
          if (!str.equals("(Unknown Source)"))
          {
            super.println(str);
          }
        }
      }
    }
    );
    return sw.toString().trim();
  }

  /**
   * 归还连接,将连接从繁忙列表移动到空闲列表。
   *
   * @param connxx 被归还的原始连接。
   */
  private void freeConnection(PooledDBConnection connxx)
  {
    Connection conn = connxx.getConnection();
    synchronized (lock)
    {
      try
      {
        //保障一定要提交到,不会出现内存泄漏 。
        try
        {
          //恢复连接状态到初始状态
          if (!conn.getAutoCommit())
          {
            //无论如何都先提交一把。
            try
            {
              conn.commit();
            }
            catch (Exception ex)
            {
              logger.error(ex);
            }
            conn.setAutoCommit(true);
          }
          conn.clearWarnings();
        }
        catch (Exception ex)
        {

        }

        if (freeConns == null) //连接池已经被关闭,不能再使用了
        {
          try
          {
            conn.close(); //直接关闭原始连接
          }
          catch (Exception ex)
          {
            //忽略
          }
        }

        //如果连接池重连,还回的连接可能不再属于本连接池,则直接关闭。
        //正常现象,不纪录日志
        if (!allocatedConns.remove(connxx))
        {
          try
          {
            conn.close();
          }
          catch (Exception ex)
          {
            //忽略
          }
        }
        if (conn.isClosed())
        {
          logger.debug("connection closed when it released");
          return;
        }

        //如果空闲连接的数量过多,则直接关闭
        if (freeConns.size() >= maxFreeConnNum)
        {
          try
          {
            conn.close();
          }
          catch (Exception ex)
          {
            logger.error("error occurs while release connection:"
                + conn + ' ' + ex);
          }
          return ;
        }

        //xxx informix 不支持readonly模式
        try
        {
          if (conn.isReadOnly())
          {
            conn.setReadOnly(false);
          }
        }
        catch (Exception ex)
        {
          //忽略
        }

        freeConns.add(conn);

        //通知某个阻塞在申请连接过程中的线程
        lock.notify();
      }
      catch (Exception ex) //归还过程出错将剔除这个连接
      {
        logger.error("error occurs while take back connection:"
            + conn + ' ' + ex);
        try
        {
          conn.close(); //尽量尝试关闭数据库连接
        }
        catch (Exception e)
        {
          //忽略
        }
      }
    }
  }

  /**
   * 杀死一个连接。不再重复使用。
   * @param conn 真实连接。
   */
  private synchronized void kill(Connection conn)
  {
    synchronized (lock)
    {
      allocatedConns.remove(conn);
      try
      {
        conn.close();
      }
      catch (Exception ex)
      {
        logger.error(ex);
      }
    }
  }

  /**
   * 批量执行sql语句,如要返回ResultSet就是必须全部返回
   * 如果不要返回,则返回的是null数组
   * @param sql String[]输入的sql语句数组
   * @param isReturnRS boolean,是否返回ResultSet
   * @return ResultSet[]
   * add by 孙泽飞 2005-10-25
   */
  public synchronized ResultSet[] getMulRS(String[] sql, boolean isReturnRS)
    throws SQLException
  {
    //定义并取得连接
    Connection conn = null;
    try
    {
      //取得连接
      conn = this.getConnection();
    }
    catch (SQLException ex2)
    {
      throw ex2;
    }
    //初始化参数
    Statement stat = null;

    //初始化离线结果集
    ResultSet[] rs = new ResultSet[sql.length];
    ResultSet rsxx = null;
    try
    {
      stat = conn.createStatement();
      //将提交设置为非自动
      conn.setAutoCommit(false);
      int maxRow = 1000;

      //批量执行sql语句
      for (int index = 0; index < sql.length; index++)
      {
        if (isReturnRS)
        {
          //需要返回结果集,执行并取得结果集
          rsxx = stat.executeQuery(sql[index]);

          //将数据取出放入离线结果集,并返回离线结果集
          CachedRowSet crs = new CachedRowSet();
          crs.setType(ResultSet.TYPE_SCROLL_INSENSITIVE);
          crs.setMaxRows(maxRow);
          crs.populate(rsxx);

          rs[index] = crs;
        }
        else
        {
          //不需要返回结果集
          stat.execute(sql[index]);
        }
      }
      //将执行提交
      conn.commit();
    }
    catch (SQLException ex)
    {
      throw ex;
    }
    finally
    {
      //设置自动提交
      setAutoCommit(conn,true);
      //关闭resultSet
      closeResultSetQuietly(rsxx);
      //关闭statament
      closeStatamentQuietly(stat);
      //关闭connection
      closeConnectionQuietly(conn);
    }
    return rs;
  }

  /**
   * 关闭连接
   * @param conn Connection
   */
  public final static void closeConnectionQuietly(Connection conn)
  {
    if(conn != null)
    {
      try
      {
        conn.close();
      }
      catch (Exception ex)
      {
        logger.error("close connection error:" + ex.getMessage(), ex);
      }
    }
  }

  /**
   * 关闭statament
   * @param stat Statement
   */
  public final static void closeStatamentQuietly(Statement stat)
  {
    if(stat != null)
    {
      try
      {
        stat.close();
      }
      catch (Exception ex)
      {
        logger.error("close statament error:" + ex.getMessage(), ex);
      }
    }
  }

  /**
   * 关闭ResultSet
   * @param rs ResultSet
   */
  public final static void closeResultSetQuietly(ResultSet rs)
  {
    if(rs != null)
    {
      try
      {
        rs.close();
      }
      catch (Exception ex)
      {
        logger.error("close Resultset error:" + ex.getMessage(), ex);
      }
    }
  }

  /**
   * 设置connection自动提交
   * @param conn Connection
   */
  public final static void setAutoCommit(Connection conn,boolean flag)
  {
    if(conn != null)
    {
      try
      {
        conn.setAutoCommit(flag);
      }
      catch(Exception ex)
      {
        logger.error("set connection autoCommit error:"+ex.getMessage(),ex);
      }
    }
  }
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics