import psycopg2
from sshtunnel import SSHTunnelForwarder
# SSH 隧道配置
ssh_host = '10.29.12.222' # 代理服务器地址
ssh_port = 22 # SSH 端口(默认 22)
ssh_user = 'dbviewer' # SSH 用户名
ssh_password = 'Carizon@1234' # SSH 密码(如果使用密钥认证,可以省略)
# 数据库配置
db_host = '10.35.1.84' # 数据库所在的远程服务器地址(通过代理访问)
db_port = 5432 # PostgreSQL 默认端口
db_user = 'xx' # 数据库用户名
db_password = 'xxx@2024' # 数据库密码
db_name = 'xxx' # 数据库名称
try:
# 创建 SSH 隧道
with SSHTunnelForwarder(
(ssh_host, ssh_port), # SSH 代理地址和端口
ssh_username=ssh_user,
ssh_password=ssh_password, # 如果使用私钥认证,可以用 `ssh_pkey` 参数
remote_bind_address=(db_host, db_port), # 远程数据库地址和端口
local_bind_address=('127.0.0.1', 6543) # 本地绑定的地址和端口(可自定义)
) as tunnel:
print(f"SSH 隧道已建立:127.0.0.1:{tunnel.local_bind_port}")
# 使用 psycopg2 连接数据库
conn = psycopg2.connect(
host='127.0.0.1', # 通过本地端口连接
port=tunnel.local_bind_port,
user=db_user,
password=db_password,
dbname=db_name
)
print("成功连接到数据库!")
# 示例查询
with conn.cursor() as cursor:
cursor.execute("SELECT version();")
cursor.execute("select * from had_lane_link limit 2;")
print(cursor.fetchone())
print("PostgreSQL 版本:", cursor.fetchone())
# 关闭数据库连接
conn.close()
except Exception as e:
print(f"发生错误:{e}")
网友评论