python學習筆記之paramiko和sqlalchemy (第九天)
參考銀角大王 :http://www.rzrgm.cn/wupeiqi/articles/5095821.html
http://www.rzrgm.cn/wupeiqi/articles/5713330.html(pymysql模塊)
金角大王:http://www.rzrgm.cn/alex3714/articles/5950372.html(python mysql)
http://www.rzrgm.cn/alex3714/articles/5978329.html(sqlalchemy ORM)
一、Python的paramiko模塊,該模塊機遇SSH用于連接遠程服務器并執行相關操作
1、SSHClient
用于連接遠程服務器并執行基本命令
基于用戶名密碼連接:
import paramiko ssh = paramiko.SSHClient() # 允許連接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='192.168.4.*',port=22,username='ubuntu',password='******') stdin,stdout,stderr = ssh.exec_command('df -Th') for result in stdout.readlines(): #result = list(filter(lambda x:x is not None,[stdout.read(),stderr.read()]))[0] print(result,end='') ssh.close() transport = paramiko.Transport('192.168.4.*',22) transport.connect(username='ubuntu',password='*******') ssh1 = paramiko.SSHClient() ssh1._transport = transport stdin,stdout,stderr = ssh1.exec_command('ls -l') #result1 = list(filter(lambda x:x is not None,[stdout.read(),stderr.read()])) for result1 in stdout.readlines(): #result = list(filter(lambda x:x is not None,[stdout.read(),stderr.read()]))[0] print(result1,end='') transport.close
基于密鑰連接:
import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/ubuntu/.ssh/id_rsa') ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='192.168.4.*',port=22,username='ubuntu',pkey=private_key) stdin,stdout,stderr = ssh.exec_command('df -Th') result = list(filter(lambda x:x is not None,[stdout.read(),stderr.read()]))[0] print(result) ssh.close() transport = paramiko.Transport('192.168.4.*',22) transport.connect(username='ubuntu',pkey=private_key) ssh1 = paramiko.SSHClient() ssh1._transport = transport stdin,stdout,stderr = ssh1.exec_command('ls -l') result1 = list(filter(lambda x:x is not None,[stdout.read(),stderr.read()])) print(result1) transport.close
基于密鑰字符串:
import paramiko from io import StringIO private_str = """-----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEAzbQ+3wA47CEdTCSOiHL38PC4vckze2u/JXiw66uiQMsp9sDX CP6I4QppLaGqG1s1SjZf7H0sb7jNz7LqRZe24Scnum1Cm0YhBtVLSfy454Ugh7ec uHD/hM6EbKHm10jdZrd4rJezvSqJkpsc/T2MSbiYWz0I2dKb4sYbJlDB6CPtBoiZ 8mmSadekr67SsmygYpigBWrGyjcPJa1xpaY/dqXZkByGFBY/j4Ipm792T3ilTXd1 U1q0xa/+Axkv1doqEBAzqIukc56alSwvpPyWkTj7uH17BGIaAxjHVQM6BdWGiKbr AffDQuvZcOeHD/IKVY3qwU7i7kC0fjXkKj3+OwIDAQABAoIBAQCfwYa6hl+u86Df S9zul+t70liH/MTg67NOFMfCoE+o5qA2pVncAGKp8/3vlIiaKmHeuxAQiL6EHhCp aBiN59/+SPyPyt9Z3EM2HV0VnxKzrC6xeKZckFXB/OnXvH2dYVehuIgd8suC9JBX reP7wVs8vgKFiYdgNDMhEh5vyXRvJmMpwEF9T8h3BDA4Dzq/algGheX+fAPbsIV8 lTymzf3y4BZN+F1vBBUidV5IaDvHO8ojwvJmVw0PM/DIfssbruzqw96czeIhv2SN 1TK56aWHWBClq7CRVrSF8wHVcAuKFl4Bldm6OneT4fZFTmsdFgbT7N+Xd3d9T7O7 OhDgkctBAoGBAOjP/BAUv5dI/HBMyPEPrKa27nitlGn+SVXURdLNEIRAL5ayb0FD xcHG3DoJ68DLTX/E4okyySRERqT8MF3cfpuFC69rzN5ZPvlyLll8iH/y7r7jMsPi g0T8xN17cRgk3tzkpEU+RbXXZeGWvz7H/taZJ0IjRlrMVgpkX3Fhiv4RAoGBAOIx FGI0P5MPfZYU0fhL0ltDCbRnOmQKcgnY815dkOi6E/7lDAVKygwuVeFBnw68NhWS VXUMZHD6cL3PLD1nxMtnUCfjoxsgWed3I53Zr9+werQbp7uagXEkbXxC4fLAB5y2 8+n/Gt8lM2kS270fakuENgjF1TG1dgLcHViWFluLAoGBAMLmw51u0UpEltko3xw4 RBC5moC6yEHH2Psan22vsQEWfwCI1uzrYNYttdZ80bnwOSwa1o+HTa6n3gBnA9LA Mdnekv7Dn5oRWtAXj8foopmC9e4mZCxrJ/wMJH9KxU4yJ8UDQKabUF7AOZGW0vor EiPzyVLsFw0SfYFrsB9KSsMRAoGAAKb9lQ7rhAQOa6lhtaKaV7MIyyFlFLbG/2pF wWbprRIkTp7gev9tN73Gd6DV0ZgPW96RKoY/n+fI/XMkgITVF3UT0Rmh9ckRGU7J poHjNPTwVaaixDK83tOpESusNSQCoZwRdgJLVItp64qnYZM+njsiYMIZTExmq7lw yDmelOMCgYEA0zMbr8mivE+9eQ6gWL/u2Z6Dgpuzk2LZ25axvjU9rdt4lZEm1WIR tbfUPsVMsTEJAAUJDHQ73a0aODLMv03HVNcikiCkPg+1cfJYWqyRFVkfM7Txvqxj 1XN5Uv6Y33j3g3xjC73qQG3uqFXPE1NKh9f0Vr4P12H8hp91JjMPITE= -----END RSA PRIVATE KEY-----""" private_key = paramiko.RSAKey(file_obj=StringIO(private_str)) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='192.168.4.*',port=22,username='ubuntu',pkey=private_key) stdin,stdout,stderr = ssh.exec_command('df -Th') result = list(filter(lambda x:x is not None,[stdout.read(),stderr.read()]))[0] print(result) ssh.close() transport = paramiko.Transport('192.168.4.*',22) transport.connect(username='ubuntu',pkey=private_key) ssh1 = paramiko.SSHClient() ssh1._transport = transport stdin,stdout,stderr = ssh1.exec_command('ls -l') result1 = list(filter(lambda x:x is not None,[stdout.read(),stderr.read()])) print(result1) transport.close
2、SFTPClient
用于連接遠程服務器并執行上傳下載
可單個文件上傳、下載,亦可批量上傳,下載(但無法保持目錄結構)
import paramiko,os,sys from stat import ST_MODE, S_ISREG, S_ISDIR, S_ISLNK # 定義一個類,表示一臺遠端linux主機 class Linux(object): # 通過IP, 用戶名,密碼,超時時間初始化一個遠程Linux主機 def __init__(self, ip, username, password, timeout=30): self.ip = ip self.username = username self.password = password self.timeout = timeout # transport和chanel self.t = '' self.chan = '' # 鏈接失敗的重試次數 self.try_times = 3 # 調用該方法連接遠程主機 def connect(self): pass # 斷開連接 def close(self): pass # 發送要執行的命令 def send(self, cmd): pass # get單個文件 def sftp_get(self, remotefile, localfile): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) sftp.get(remotefile, localfile) t.close() # put單個文件 def sftp_put(self, localfile, remotefile): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) sftp.put(localfile, remotefile) t.close() # ------獲取遠端linux主機上指定目錄及其子目錄下的所有文件------ def __get_all_files_in_remote_dir(self, sftp, remote_dir): # 保存所有文件的列表 all_files = list() # 去掉路徑字符串最后的字符'/',如果有的話 if remote_dir[-1] == '/': remote_dir = remote_dir[0:-1] # 獲取當前指定目錄下的所有目錄及文件,包含屬性值 files = sftp.listdir_attr(remote_dir) for x in files: # remote_dir目錄中每一個文件或目錄的完整路徑 print(x) print(x.st_mode) filename = remote_dir + '/' + x.filename # 如果是目錄,則遞歸處理該目錄,這里用到了stat庫中的S_ISDIR方法,與linux中的宏的名字完全一致 if S_ISDIR(x.st_mode): all_files.extend(self.__get_all_files_in_remote_dir(sftp, filename)) else: all_files.append(filename) return all_files def sftp_get_dir(self, remote_dir, local_dir): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) # 獲取遠端linux主機上指定目錄及其子目錄下的所有文件 all_files = self.__get_all_files_in_remote_dir(sftp, remote_dir) # 依次get每一個文件 for x in all_files: filename = x.split('/')[-1] local_filename = os.path.join(local_dir, filename) print('Get文件%s傳輸中...' % filename) sftp.get(x, local_filename) # ------獲取本地指定目錄及其子目錄下的所有文件------ def __get_all_files_in_local_dir(self, local_dir): # 保存所有文件的列表 all_files = list() # 獲取當前指定目錄下的所有目錄及文件,包含屬性值 files = os.listdir(local_dir) for x in files: # local_dir目錄中每一個文件或目錄的完整路徑 filename = os.path.join(local_dir, x) # 如果是目錄,則遞歸處理該目錄 if os.path.isdir(x): all_files.extend(self.__get_all_files_in_local_dir(filename)) else: all_files.append(filename) return all_files def sftp_put_dir(self, local_dir, remote_dir): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) # 去掉路徑字符穿最后的字符'/',如果有的話 if remote_dir[-1] == '/': remote_dir = remote_dir[0:-1] # 獲取本地指定目錄及其子目錄下的所有文件 all_files = self.__get_all_files_in_local_dir(local_dir) # 依次put每一個文件 for x in all_files: filename = os.path.split(x)[-1] remote_filename = remote_dir + '/' + filename print('Put文件%s傳輸中...' % filename) sftp.put(x, remote_filename) if __name__ == '__main__': remotefile = '/home/ubuntu/devops/currency_rate.sh' localfile = '/home/ubuntu/test/currency_rate.sh' remote_path = '/home/ubuntu/aa' local_path = '/home/ubuntu/test' host = Linux('192.168.4.*', 'ubuntu', '******') # 將遠端的xxoo.txt get到本地,并保存為ooxx.txt #host.sftp_get(remotefile, localfile) # # 將本地的xxoo.txt put到遠端,并保持為xxoo.txt # host.sftp_put(localfile, remotefile) # 將遠端remote_path目錄中的所有文件get到本地local_path目錄 host.sftp_get_dir(remote_path, local_path) # # 將本地local_path目錄中的所有文件put到遠端remote_path目錄 #host.sftp_put_dir(remote_path, local_path)
二、使用pymysql模塊進行mysql數據庫操作:
import pymysql
conn = pymysql.connect(host='192.168.4.*',port=3306,
user='hzfdt', passwd='****', db='test')
cur = conn.cursor()
cur.execute("USE test")
#cur.execute("insert into students(name,sex,age,tel) values('kai','feman',36,'18069859005')")
#conn.commit()
cur.execute("select * from students")
#cur.scroll(-1,mode='relative')
#cur.scroll(2,mode='absolute')
aa = cur.fetchall()
cur.close()
conn.close()
三、sqlalchemy使用
1、導入指定模塊:
import sqlalchemy
from sqlalchemy import create_engine,Column,Integer,String,func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
2、初始化數據庫連接:
###初始化數據庫連接:
###'數據庫類型+數據庫驅動名稱://用戶名:口令@機器地址:端口號/數據庫名'
engine = create_engine("mysql+pymysql://hzfdt:*****@192.168.4.208/test",encoding='utf-8',echo=False)
###生成orm基類
Base = declarative_base()
3、創建一個表:
###具體每個表的class定義
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True,autoincrement=True)
name = Column(String(32),unique=True,nullable=False)
password = Column(String(64),nullable=False)
###正常返回查詢的內存對象對址,要變的可讀,只需在定義表的類下面加上這樣的代碼
def __repr__(self):
return "<User(id = %d,name = %s,password = %s)>" % (self.id,self.name,self.password)
#######################創建表#######################
Base.metadata.create_all(engine)
4、添加數據:
###創建與數據庫的會話session class ,注意,這里返回給session的是個class,不是實例 Session_class = sessionmaker(bind=engine) ###生成session實例 Session = Session_class() #######################添加數據####################### user_obj = User(name='zhang',password='123') ###把要創建的數據對象添加到這個session里 Session.add(user_obj) ###提交,創建數據 Session.commit()
5、查詢數據
#######################查詢數據####################### ###all返回是一個查詢數據的列表,故需要循環 my_user = Session.query(User).filter_by(name = "zhang").all() ###first返回 my_user_1 = Session.query(User).filter_by(name = "zhang").first() ###one查詢只有唯一一條的記錄 my_user_2 = Session.query(User).filter_by(name = "kai").one() ''' ###表類中沒有__repr__函數下的輸出查詢結果 print(my_user) for user in my_user: print(user.id,user.name,user.password) print(my_user_1) print(my_user_1.id,my_user_1.name,my_user_1.password) print(my_user_2) print(my_user_2.id,my_user_2.name,my_user_2.password) ''' ###輸出結果如下### ''' [<__main__.User object at 0x103803eb8>, <__main__.User object at 0x103803f28>, <__main__.User object at 0x103803f98>] 5 zhang 123 6 zhang 123 7 zhang 123 <__main__.User object at 0x103803eb8> 5 zhang 123 <__main__.User object at 0x10390d400> 4 kai 123 ''' ''' ###表類中有__repr__函數下的輸出查詢結果 print(my_user) print(my_user_1) print(my_user_2) ###輸出結果如下### [<User(id = 5,name = zhang,password = 123)>, <User(id = 6,name = zhang,password = 123)>, <User(id = 7,name = zhang,password = 123)>] <User(id = 5,name = zhang,password = 123)> <User(id = 4,name = kai,password = 123)> '''
6、修改數據:
#######################修改數據####################### my_user = Session.query(User).filter_by(name = "zhang").first() print(my_user) my_user.password = '123456' Session.commit() print(my_user)
7、回滾數據:
#######################回滾數據###################### my_user = Session.query(User).filter_by(name = 'wang').first() my_user.name = 'zhou' fake_user = User(name = 'liu',password = 'abcd') Session.add(fake_user) ###這時看session里有你剛添加和修改的數據 print(Session.query(User).filter(User.name.in_(['zhou','liu'])).all() ) #Session.rollback() print(Session.query(User).filter(User.name.in_(['zhou','liu'])).all() ) Session.commit()
8、外鍵關聯:
relationship進行二表關聯,名字隨便取
relationship_populates二個表中的populates后面的命名必須互為對應
relationship_backref 在關聯表中定義,允許你在被關聯表里通過backref字段反向查出所有它在本表里的關聯項
#!/Library/Frameworks/Python.framework/Versions/3.6/bin/python3 # -*- coding: utf-8 -*- import sqlalchemy from sqlalchemy import create_engine,Column,Integer,String,func,ForeignKey,and_ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship ###初始化數據庫連接: ###'數據庫類型+數據庫驅動名稱://用戶名:口令@機器地址:端口號/數據庫名' engine = create_engine("mysql+pymysql://hzfdt:******@192.168.4.208/test",encoding='utf-8',echo=False) ###生成orm基類 Base = declarative_base() ###具體每個表的class定義 class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key=True,autoincrement=True) name = Column(String(32),unique=True,nullable=False) password = Column(String(64),nullable=False) #user_rs = relationship("Address") #user_list = relationship("Address",back_populates='addr') user_rs_backref = relationship("Address",backref="user_list_1") ###這個nb,允許你在user表里通過backref字段反向查出所有它在addresses表里的關聯項 ###正常返回查詢的內存對象對址,要變的可讀,只需在定義表的類下面加上這樣的代碼 #def __repr__(self): # return "<User(id = %d,name = %s,password = %s)>" % (self.id,self.name,self.password) class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True, autoincrement=True) email_address = Column(String(32), nullable=False) user_id = Column(Integer, ForeignKey('user.id')) #addr_rs = relationship("User") #addr = relationship("User",back_populates="user_list") addr_rs_backref = relationship("User",backref="addr_1") #def __repr__(self): # return "<Address(email_address='%s')>" % self.email_address #######################創建表####################### #Base.metadata.create_all(engine) ###創建與數據庫的會話session class ,注意,這里返回給session的是個class,不是實例 Session_class = sessionmaker(bind=engine) ###生成session實例 Session = Session_class() #obj = Session.query(User).filter(and_(User.name=='zhang',User.id=='6')).first() #obj = Session.query(User).filter(User.name=='zhang',User.id=='6').first() #print(obj.name,obj.id) ''' obj.addresses = [Address(email_address='aa@hzfdt.com'), Address(email_address='bb@hzfdt.com')] Session.commit() print(obj.addresses) ''' user_obj = Session.query(User).filter(User.name=='zhang').first() addr_obj = Session.query(Address).first() #########relationship################ ###用relationship進行二表關聯,名字隨便取### #print(user_obj.name,user_obj.password) #for i in user_obj.user_rs: # print(i.user_id,i.email_address) #print(addr_obj.addr_rs.name,addr_obj.addr_rs.id) ''' 結果如下: zhang 123456 5 aa@qq.com 5 bb@qq.com zhang 5 ''' #########relationship_populates################ ###二個表中的populates后面的命名必須互為對應### #print(user_obj.name,user_obj.password) #for i in user_obj.user_list: # print(i.user_id,i.email_address) #print(addr_obj.addr.name,addr_obj.addr.id) ''' 結果如下: zhang 123456 5 aa@qq.com 5 bb@qq.com zhang 5 ''' #############relationship_backref############### ####這個nb,允許你在user表里通過backref字段反向查出所有它在addresses表里的關聯項 print(user_obj.name,user_obj.password) for i in user_obj.addr_1: print(i.email_address) print(addr_obj.user_list_1.name,addr_obj.user_list_1.id) ''' 結果如下: zhang 123456 aa@qq.com bb@qq.com zhang 5 '''
9、多對多關系
處理中文
sqlalchemy設置編碼字符集一定要在數據庫訪問的URL上增加charset=utf8,否則數據庫的連接就不是utf8的編碼格式
eng = create_engine('mysql://root:root@localhost:3306/test2?charset=utf8',echo=True)
#一本書可以有多個作者,一個作者又可以出版多本書
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,func,and_,Table,text,DATE from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import func engine = create_engine("mysql+pymysql://hzfdt:*****@192.168.4.x/test?charset=utf8",echo=True) ###生成orm基類 Base = declarative_base() book_m2m_author = Table('book_m2m_author', Base.metadata, Column('book_id',Integer,ForeignKey('books.id')), Column('author_id',Integer,ForeignKey('authors.id')), ) class Book(Base): __tablename__ = 'books' id = Column(Integer,primary_key=True) name = Column(String(256)) pub_date = Column(DATE) authors = relationship('Author',secondary=book_m2m_author,backref='books') def __repr__(self): return "the book name is %s,the pub date is %s" % (self.name,self.pub_date) class Author(Base): __tablename__ = 'authors' id = Column(Integer, primary_key=True) name = Column(String(256)) #abc = relationship('Book', secondary=book_m2m_author, backref='auth') def __repr__(self): return "the author name is %s" % self.name #Base.metadata.create_all(engine) ###創建與數據庫的會話session class ,注意,這里返回給session的是個class,不是實例 Session_class = sessionmaker(bind=engine) s = Session_class() ###添加數據### ''' b1 = Book(name="跟Alex學Python") b2 = Book(name="跟Alex學把妹") b3 = Book(name="跟Alex學裝逼") b4 = Book(name="跟Alex學開車") a1 = Author(name="Alex") a2 = Author(name="Jack") a3 = Author(name="Rain") b1.authors = [a1, a2] b2.authors = [a1, a2, a3] s.add_all([b1, b2, b3, b4, a1, a2, a3]) s.commit() ''' ###查詢數據### print('--------通過書表查關聯的作者---------') book_obj = s.query(Book).filter_by(name="跟Alex學Python").first() print(book_obj.name, book_obj.authors) print('--------通過作者表查關聯的書---------') author_obj =s.query(Author).filter_by(name="Alex").first() print(author_obj.name , author_obj.books) ###多對多刪除### ###刪除數據時不用管boo_m2m_authors , sqlalchemy會自動幫你把對應的數據刪除### #通過書刪除作者 author_obj =s.query(Author).filter_by(name="Jack").first() book_obj = s.query(Book).filter_by(name="跟Alex學把妹").first() book_obj.authors.remove(author_obj) #從一本書里刪除一個作者 #通過作者刪除書# book_obj = s.query(Book).filter_by(name="跟Alex學Python").first() author_obj.books.remove(book_obj) #從一個作者里刪除一本書 s.commit() #直接刪除作者或書# author_obj =s.query(Author).filter_by(name="Alex").first() book_obj = s.query(Book).filter_by(name="跟Alex學把妹").first() s.delete(author_obj) s.delete(book_obj) s.commit()
10、使用sqlalchemy建立攜帶加密字段的ORM表
數據庫密碼明文是很危險的一件事情,所以需要進行對密碼進行加密。結werkzeug.security進行對用戶密碼加密。
from sqlalchemy.ext.hybrid import hybrid_property from werkzeug.security import generate_password_hash,check_password_hash class UserProfile(Base): __tablename__ = 'user_profile' id = Column(Integer,primary_key=True,autoincrement=True) username = Column(String(32),unique=True,nullable=False) password = Column(String(256),unique=True,nullable=False) groups = relationship('Group',secondary=Group2UserProfile) bind_hosts = relationship('BindHost',secondary=BindHost2UserProfile) audit_logs = relationship('AuditLog') @hybrid_property def passwd(self): return self.password @passwd.setter def hash_passwd(self,plaintext): self.password = generate_password_hash(plaintext) def verify_passwd(self,plaintext): return check_password_hash(self.password,plaintext)
#創建的時候還是使用password字段存入
obj = models.UserProfile(username=key,passwd=val.get('password'))
#直接使用方法,密碼正確就會返回True
user_obj = session.query(models.UserProfile).filter(models.UserProfile.username==username).first()
user_obj.verify_passwd(password)
故障排錯:
在python3.4下SQLAlchemy1.1.18下如下設置:
@hybrid_property
def passwd(self):
return self.password
@passwd.setter
def hash_passwd(self,plaintext):
self.password = generate_password_hash(plaintext)
二個函數名可以不一致;但在python3.5下SQLAlchemy1.2.6環境下,函數名稱一定要一樣,否則會報如下錯誤:
File "/usr/local/lib/python3.5/dist-packages/SQLAlchemy-1.2.6-py3.5-linux-x86_64.egg/sqlalchemy/ext/hybrid.py", line 873, in __set__
raise AttributeError("can't set attribute")
AttributeError: can't set attribute
浙公網安備 33010602011771號