技術メモ

ネットワークエンジニアがpython, perl等々を気楽に使うための覚え書き

python pexpectによるcisco / juniperのCLI automation

コンフィグバックアップだけではないマルチパーパス・スクリプト

expectライクなpython pexpectで、cisco / juniper 機器のログ採取と設定変更機能を兼ねたwrapperモジュール(pexpect_wrapper.py)を書いてみた(Linux上のpython2.7, pexpect4.0.1環境で確認)。

  • telnet及びssh対応。また、username: を聞いてきたり、聞いてこなかったり、の応答の違いも自動処理。
  • IOSの場合、"[confirm]"の文字列に対してEnterで自動応答(ただし、clear系コマンドのみ対象とし、reloadの自動応答は排除)。
  • JUNOSの場合、コマンドのtypoが発生すると、正規表現によるprompt待ちを狂わせる。typoを検知したらCTRL+Uを送信し、誤ったコマンドの履歴をクリアしてprompt待ちを正常化すること。
  • typoが発生したら、手入力に切り替えるモード(expectのinteract相当)を備えること。
  • 複数コマンドを投入する際、任意の空行を挿入出来ること(これは趣味)。

メモ

  • sshのオプションで、"-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"にしているので、不要なら削除。
  • telnetクライアントがインストールされていないLinuxディストリビューションの場合、別途インストール。
  • pexpect.spawn()のmaxread = 1024 * 2, searchwindowsize = 1024 * 4は、環境にあわせて要調整(自分の環境では"show tech"の取得が早くなったが・・、デフォルトでもよいかも)。
  • ios_command()の正規表現r"^[\w\-/.]+ ?[>#] *(?:\(enable\))? *$"は、ASAのマルチコンテキスト環境で現れる"/"を考慮。また、CatOSを考慮して\(enable\)も付けているが、不要なら削除。
  • 接続失敗が考慮されていなかったり、exit処理が拙かったので修正(2017/1/27)。

pexpect_wrapper.py

#!/usr/bin/env python
import sys
import re
import pexpect

class Connect:

    def __init__(
            self, host = '', protocol = 'telnet', username = '', password = '',
        ):
        self.host = host
        self.protocol = protocol
        self.username = username
        self.password = password

        if 'tel' in self.protocol:
            cmd = "telnet {0}".format(self.host)
        elif 'ssh' in self.protocol:
            opt = "-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
            cmd = "ssh {0} {1}@{2}".format(opt, self.username, self.host)
        self.session = pexpect.spawn(
            cmd, timeout = 30, maxread = 1024 * 2, searchwindowsize = 1024 * 4
        )

    def __auth_failed(self, message):
        self.session.terminate()
        print message
        sys.exit()

    def __command_error_reporter(self, command):
        print "\nCommand error detected at '{}'.".format(command),
        print "Press '^]' to resume."
        print self.session.match.group(0),
        self.session.interact()

    def __re_compile(self, patterns):
        return [re.compile(pattern, re.S|re.M) for pattern in patterns]

    def login(self, timeout = 10):
        prompt = self.__re_compile([
            r"(?i)(?:username|login): *$",
            r"(?i)password: *$",
            r"[>#$%] *$",
        ])
        try:
            index = self.session.expect_list(prompt, timeout = timeout)
        except:
            self.session.terminate()
            print "Connect error at '{}'.\n".format(self.host),
            print self.session.before
            sys.exit()
        if index == 0:
            self.session.sendline(self.username)
            index = self.session.expect_list(prompt, timeout = timeout)
            if index != 1:
                self.__auth_failed('login failed.')
        if index == 1:
            self.session.sendline(self.password)
            index = self.session.expect_list(prompt, timeout = timeout)
            if index != 2:
                self.__auth_failed('login failed.')

    def logout(self):
        self.session.sendline('exit')
        self.session.expect(pexpect.EOF)
        self.session.close()

    def log(self, logfile = ''):
        if logfile:
            self.session.logfile_read = open(logfile, 'w')
        else:
            self.session.logfile_read = sys.stdout

    def blank_lines(self, blanks):
        for i in range(blanks):
            self.session.sendline('')
            self.session.readline()

    def ios_enable(self, enable_password = '', timeout = 10):
        if enable_password == '': return
        prompt = self.__re_compile([
            r"> *$",
            r"(?i)password: *$",
            r"(?:#|\(enable\)) *$",
        ])
        self.session.sendline('')
        index = self.session.expect_list(prompt, timeout = timeout)
        if index == 0:
            self.session.sendline('enable')
            index = self.session.expect_list(prompt, timeout = timeout)
            if index == 1:
                self.session.sendline(enable_password)
                index = self.session.expect_list(prompt, timeout = timeout)
                if index == 1:
                    self.__auth_failed('enable failed.')

    def ios_command(
            self, command_list, blanks = 0, error_reporting = False,
            timeout = 300,
        ):
        prompt = self.__re_compile([
            r"^[\w\-/.]+ ?[>#] *(?:\(enable\))? *$",
            r"\((?:config|cfg)[^\)]*\) ?# *$",
            r"(?i)^clear.*\[confirm\] *$",
            r"(?i)^% *(?:ambiguous|incomplete|invalid|unknown|\S+ overlaps).*$"
        ])
        self.blank_lines(2)
        for command in command_list:
            self.session.sendline(command)
            self.session.readline()
            index = self.session.expect_list(prompt, timeout = timeout)
            if index == 0:
                if blanks > 0: self.blank_lines(blanks)
            elif index == 1:
                pass
            elif index == 2:
                self.session.sendline('')
                self.session.expect_list(prompt, timeout = timeout)
                if blanks > 0: self.blank_lines(blanks)
            elif index == 3:
                if error_reporting is True:
                    self.__command_error_reporter(command)
                else:
                    self.session.sendcontrol('u')
                    self.session.sendline('')
                    index = self.session.expect_list(prompt, timeout = timeout)
                    if index == 0:
                        if blanks > 0: self.blank_lines(blanks)
        self.blank_lines(2)

    def junos_command(
            self, command_list, blanks = 0, error_reporting = False,
            timeout = 300,
        ):
        prompt = self.__re_compile([
            r"^[\w\-]+@[\w\-.]+(?:\([^\)]*\))? ?[>%] *$",
            r"^[\w\-]+@[\w\-.]+(?:\([^\)]*\))? ?# *$",
            r"^\s+\^",
            r"^(?i)error:"
        ])
        self.blank_lines(2)
        for command in command_list:
            self.session.send(command)
            index = self.session.expect_list(
                [pexpect.TIMEOUT, prompt[2]], timeout = 0.1
            )
            if index == 1:
                self.session.sendcontrol('u')
                if error_reporting is True:
                    self.__command_error_reporter(command)

            self.session.sendline('')
            if error_reporting is True:
                index = self.session.expect_list(prompt, timeout = timeout)
            else:
                index = self.session.expect_list(
                    [prompt[0], prompt[1]], timeout = timeout
                )

            if index == 0:
                if blanks > 0: self.blank_lines(blanks)
            elif index == 1:
                pass
            else:
                self.__command_error_reporter(command)
        self.blank_lines(2)

Example

  • logfile を指定しなれけば、標準出力にログを書き出しながら実行。
  • ios_command()/junos_command()でコマンド毎に2行の空行挿入("blanks = 2")。
  • ios_command()/junos_command()で"error_reporting = True"を指定しているので、不正コマンド応答(例えば、"% invalid ~")が見られたら、pexpectをinteractモードに自動変更(CTRL+]の入力でinteractモードを終了し、スクリプトを継続する)。
#!/usr/bin/env python
import pexpect_wrapper

parameter = {
    'host' : '192.168.0.1',
    'protocol' : 'telnet',
    'username' : 'admin',
    'password' : 'fake_pass',
}

commands = [
    'clear counters',
    'clear logging',
    'conf t',
    'interface lo0',
    'ip address 192.0.2.1 255.255.255.255',
    'end',
    'wr mem',
    'ter len 0',
    'show run',
    'show log',
]

c = pexpect_wrapper.Connect(**parameter)
c.login()
c.ios_enable(enable_password = 'fake_pass')
c.log(logfile = 'ios_router.log')
c.ios_command(commands, blanks = 2, error_reporting = True,)
c.logout()

parameter = {
    'host' : '192.168.0.2',
    'protocol' : 'telnet',
    'username' : 'admin',
    'password' : 'fake_pass',
}
commands = [
    'set cli screen-length 0',
    'set cli screen-width 0',
    'show config | display set',
    'config',
    'set interface lo0 unit 0 family inet address 192.0.2.2/32',
    'commit and-q',
    'show config | display set',
    'show log messages | last 20',
]

c = pexpect_wrapper.Connect(**parameter)
c.login()
c.log(logfile = 'junos_router.log')
c.junos_command(commands, blanks = 2, error_reporting = True,)
c.logout()