python pexpectによるcisco / juniperのCLI automation
コンフィグバックアップだけではないマルチパーパス・スクリプト
expectライクなpython pexpectで、cisco / juniper 機器のログ採取と設定変更機能を兼ねたwrapperモジュール(pexpect_wrapper.py)を書いてみた(Linux上のpython2.7, pexpect4.0.1環境で確認)。
- telnet及びssh対応。また、username: を聞いてきたり、聞いてこなかったり、の応答の違いも自動処理。
- IOSの場合、"[confirm]"の文字列に対してEnterで自動応答(ただし、clear系コマンドのみ対象とし、reloadの自動応答は排除)。
- JUNOSの場合、コマンドのtypoが発生すると、正規表現によるprompt待ちを狂わせる。typoを検知したらCTRL+Uを送信し、誤ったコマンドの履歴をクリアしてprompt待ちを正常化すること。
- typoが発生したら、手入力に切り替えるモード(expectのinteract相当)を備えること。
- 複数コマンドを投入する際、任意の空行を挿入出来ること(これは趣味)。
メモ
- sshのオプションで、"-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"にしているので、不要なら削除。
- telnetクライアントがインストールされていないLinuxディストリビューションの場合、別途インストール。
- pexpect.spawn()のmaxread = 1024 * 2, searchwindowsize = 1024 * 4は、環境にあわせて要調整(自分の環境では"show tech"の取得が早くなったが・・、デフォルトでもよいかも)。
- ios_command()の正規表現r"^[\w\-/.]+ ?[>#] *(?:\(enable\))? *$"は、ASAのマルチコンテキスト環境で現れる"/"を考慮。また、CatOSを考慮して\(enable\)も付けているが、不要なら削除。
- 接続失敗が考慮されていなかったり、exit処理が拙かったので修正(2017/1/27)。
pexpect_wrapper.py
#!/usr/bin/env python import sys import re import pexpect class Connect: def __init__( self, host = '', protocol = 'telnet', username = '', password = '', ): self.host = host self.protocol = protocol self.username = username self.password = password if 'tel' in self.protocol: cmd = "telnet {0}".format(self.host) elif 'ssh' in self.protocol: opt = "-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" cmd = "ssh {0} {1}@{2}".format(opt, self.username, self.host) self.session = pexpect.spawn( cmd, timeout = 30, maxread = 1024 * 2, searchwindowsize = 1024 * 4 ) def __auth_failed(self, message): self.session.terminate() print message sys.exit() def __command_error_reporter(self, command): print "\nCommand error detected at '{}'.".format(command), print "Press '^]' to resume." print self.session.match.group(0), self.session.interact() def __re_compile(self, patterns): return [re.compile(pattern, re.S|re.M) for pattern in patterns] def login(self, timeout = 10): prompt = self.__re_compile([ r"(?i)(?:username|login): *$", r"(?i)password: *$", r"[>#$%] *$", ]) try: index = self.session.expect_list(prompt, timeout = timeout) except: self.session.terminate() print "Connect error at '{}'.\n".format(self.host), print self.session.before sys.exit() if index == 0: self.session.sendline(self.username) index = self.session.expect_list(prompt, timeout = timeout) if index != 1: self.__auth_failed('login failed.') if index == 1: self.session.sendline(self.password) index = self.session.expect_list(prompt, timeout = timeout) if index != 2: self.__auth_failed('login failed.') def logout(self): self.session.sendline('exit') self.session.expect(pexpect.EOF) self.session.close() def log(self, logfile = ''): if logfile: self.session.logfile_read = open(logfile, 'w') else: self.session.logfile_read = sys.stdout def blank_lines(self, blanks): for i in range(blanks): self.session.sendline('') self.session.readline() def ios_enable(self, enable_password = '', timeout = 10): if enable_password == '': return prompt = self.__re_compile([ r"> *$", r"(?i)password: *$", r"(?:#|\(enable\)) *$", ]) self.session.sendline('') index = self.session.expect_list(prompt, timeout = timeout) if index == 0: self.session.sendline('enable') index = self.session.expect_list(prompt, timeout = timeout) if index == 1: self.session.sendline(enable_password) index = self.session.expect_list(prompt, timeout = timeout) if index == 1: self.__auth_failed('enable failed.') def ios_command( self, command_list, blanks = 0, error_reporting = False, timeout = 300, ): prompt = self.__re_compile([ r"^[\w\-/.]+ ?[>#] *(?:\(enable\))? *$", r"\((?:config|cfg)[^\)]*\) ?# *$", r"(?i)^clear.*\[confirm\] *$", r"(?i)^% *(?:ambiguous|incomplete|invalid|unknown|\S+ overlaps).*$" ]) self.blank_lines(2) for command in command_list: self.session.sendline(command) self.session.readline() index = self.session.expect_list(prompt, timeout = timeout) if index == 0: if blanks > 0: self.blank_lines(blanks) elif index == 1: pass elif index == 2: self.session.sendline('') self.session.expect_list(prompt, timeout = timeout) if blanks > 0: self.blank_lines(blanks) elif index == 3: if error_reporting is True: self.__command_error_reporter(command) else: self.session.sendcontrol('u') self.session.sendline('') index = self.session.expect_list(prompt, timeout = timeout) if index == 0: if blanks > 0: self.blank_lines(blanks) self.blank_lines(2) def junos_command( self, command_list, blanks = 0, error_reporting = False, timeout = 300, ): prompt = self.__re_compile([ r"^[\w\-]+@[\w\-.]+(?:\([^\)]*\))? ?[>%] *$", r"^[\w\-]+@[\w\-.]+(?:\([^\)]*\))? ?# *$", r"^\s+\^", r"^(?i)error:" ]) self.blank_lines(2) for command in command_list: self.session.send(command) index = self.session.expect_list( [pexpect.TIMEOUT, prompt[2]], timeout = 0.1 ) if index == 1: self.session.sendcontrol('u') if error_reporting is True: self.__command_error_reporter(command) self.session.sendline('') if error_reporting is True: index = self.session.expect_list(prompt, timeout = timeout) else: index = self.session.expect_list( [prompt[0], prompt[1]], timeout = timeout ) if index == 0: if blanks > 0: self.blank_lines(blanks) elif index == 1: pass else: self.__command_error_reporter(command) self.blank_lines(2)
Example
- logfile を指定しなれけば、標準出力にログを書き出しながら実行。
- ios_command()/junos_command()でコマンド毎に2行の空行挿入("blanks = 2")。
- ios_command()/junos_command()で"error_reporting = True"を指定しているので、不正コマンド応答(例えば、"% invalid ~")が見られたら、pexpectをinteractモードに自動変更(CTRL+]の入力でinteractモードを終了し、スクリプトを継続する)。
#!/usr/bin/env python import pexpect_wrapper parameter = { 'host' : '192.168.0.1', 'protocol' : 'telnet', 'username' : 'admin', 'password' : 'fake_pass', } commands = [ 'clear counters', 'clear logging', 'conf t', 'interface lo0', 'ip address 192.0.2.1 255.255.255.255', 'end', 'wr mem', 'ter len 0', 'show run', 'show log', ] c = pexpect_wrapper.Connect(**parameter) c.login() c.ios_enable(enable_password = 'fake_pass') c.log(logfile = 'ios_router.log') c.ios_command(commands, blanks = 2, error_reporting = True,) c.logout() parameter = { 'host' : '192.168.0.2', 'protocol' : 'telnet', 'username' : 'admin', 'password' : 'fake_pass', } commands = [ 'set cli screen-length 0', 'set cli screen-width 0', 'show config | display set', 'config', 'set interface lo0 unit 0 family inet address 192.0.2.2/32', 'commit and-q', 'show config | display set', 'show log messages | last 20', ] c = pexpect_wrapper.Connect(**parameter) c.login() c.log(logfile = 'junos_router.log') c.junos_command(commands, blanks = 2, error_reporting = True,) c.logout()