python標準のstring.Templateを用いたcisco config作成スクリプト
jinja2もいいけれど...
jinja2の過去記事はこちら
- python jinja2(part1) cisco機器のconfig作成に便利なjinja2.Environment()設定 - 技術メモ
- python jinja2(part2) テンプレート継承を用いたcisco機器のconfig自動作成 - 技術メモ
単純な置換だけで完成する差分config程度なら、やはり簡単なのがよい(string.Templateは、perlのheredocくらいの手軽さ)。使い勝手のよさそうな以下の3種類のサンプルスクリプトを作成(Python3.4)。
#!/usr/bin/python3 from string import Template import yaml import ipaddress with open('./yml.txt', "r") as f: d = yaml.load(f) with open('./tmpl.txt', "r") as f: t = Template(f.read()) for k, v in d.items(): obj = ipaddress.ip_interface(v['ip']) config = t.substitute( v, host = k, ip_addr = obj.with_netmask.replace('/', ' '), net_addr = obj.network.with_hostmask.replace('/', ' '), ) with open(k + '.txt', "w") as f: f.write(config)
yml.txt(YAMLデータ)
tky-rt01: vlan : 2 ip : 192.168.2.1/24 tky-rt02: vlan : 2 ip : 192.168.2.2/24
#!/usr/bin/python3 from string import Template import csv import ipaddress with open('./data.csv', "r") as f: data = [d for d in csv.DictReader(f)] with open('./tmpl.txt', "r") as f: t = Template(f.read()) for d in data: obj = ipaddress.ip_interface(d['ip']) config = t.substitute( d, ip_addr = obj.with_netmask.replace('/', ' '), net_addr = obj.network.with_hostmask.replace('/', ' '), ) with open(d['host'] + '.txt', "w") as f: f.write(config)
host,vlan,ip tky-rt01,2,192.168.2.1/24 tky-rt02,2,192.168.2.2/24
#!/usr/bin/env python3 from string import Template from openpyxl import load_workbook import ipaddress wb = load_workbook('data.xlsx') ws = wb.active data = [] for i in range(1, ws.max_row + 1): values = [ws.cell(row = i, column = j).value for j in range(1, ws.max_column + 1)] if i == 1: keys = values else: data.append(dict(zip(keys, values))) with open('./tmpl.txt', "r") as f: t = Template(f.read()) for d in data: obj = ipaddress.ip_interface(d['ip']) config = t.substitute( d, ip_addr = obj.with_netmask.replace('/', ' '), net_addr = obj.network.with_hostmask.replace('/', ' '), ) with open(d['host'] + '.txt', "w") as f: f.write(config)
data.xlsx
tmpl.txt(共通テンプレート)
! ! $host config. ! vlan $vlan ! interface Vlan$vlan ip address $ip_addr ! router eigrp 1 network $net_addr !
tky-rt01.txt(出力結果)
! ! tky-rt01 config. ! vlan 2 ! interface Vlan2 ip address 192.168.2.1 255.255.255.0 ! router eigrp 1 network 192.168.2.0 0.0.0.255 !
tky-rt02.txt(出力結果)
! ! tky-rt02 config. ! vlan 2 ! interface Vlan2 ip address 192.168.2.2 255.255.255.0 ! router eigrp 1 network 192.168.2.0 0.0.0.255 !
PrettyGoodTerminal
win環境ならいいかも・・・メモ。
- 開発元
atom パッケージ - language-teraterm-macro
python, perlのスクリプト作成でお世話になっている、atomエディタ(https://atom.io/)。
中々珍しい、teraterm macro(ttl)を補助してくれるパッケージ(Atom用パッケージ | できる!TeraTermマクロ)をみつけ、数週間使ってみたが、very good!
CTRL+/によるコメントアウトは、Cスタイル(/* */)よりもセミコロン(;)の方が好み・・・
python hashlibによるMD5/SHA-1/SHA-256 File validation
MD5/SHA-1/SHA-256をまとめて計算、そしてCSV出力もサポートするPythonスクリプト(Windows版のPython3.4で作成)。
メモ
C:\>get_digest.py -h usage: get_digest.py [-h] [-c] glob_pattern [glob_pattern ...] positional arguments: glob_pattern glob pattern(eg. *.bin *.log) optional arguments: -h, --help show this help message and exit -c, --csv switch to csv output
get_digest.py
#!/usr/bin/env python3 import os import glob import argparse import hashlib def get_digest(file_, blocksize = 2047): md5 = hashlib.md5() sha1 = hashlib.sha1() sha256 = hashlib.sha256() with open(file_, 'rb') as f: while True: buf = f.read(blocksize) if not buf: break md5.update(buf) sha1.update(buf) sha256.update(buf) return { 'md5' : md5.hexdigest(), 'sha1' : sha1.hexdigest(), 'sha256': sha256.hexdigest(), } parser = argparse.ArgumentParser() parser.add_argument('glob_pattern', action = 'store', nargs = '+', help = 'glob pattern(eg. *.bin *.log)') parser.add_argument('-c', '--csv', dest = 'is_csv', action = 'store_true', help = 'switch to csv output') args = parser.parse_args() files = [] for pattern in args.glob_pattern: files.extend(glob.glob(pattern)) if args.is_csv: print('"FILE","SIZE","MD5","SHA-1","SHA-256"') template = '"{file}","{size}","{md5}","{sha1}","{sha256}"' else: template = """ FILE : {file} ({size} bytes) MD5 : {md5} SHA-1 : {sha1} SHA-256 : {sha256}""" for file_ in files: if os.path.isdir(file_): print("{} is a directory.".format(file_)) continue print( template.format( file = os.path.abspath(file_), size = os.path.getsize(file_), **get_digest(file_) ) )
python openpyxl - ネットワーク機器のExcelパラメータシートをpythonで読取
ネットワーク機器(cisco)のExcelパラメータシート
ホスト名とインターフェース名を含むExcelのパラメータシートを読み取って、データを加工(config作成、構築データ作成等)を行いたい時のサンプルスクリプトを作成(環境は、python 3.4 + openpyxl 2.3.0)。
ネストした辞書データ({"ホスト名" : {"インターフェース名": {} } })になるため、読み取り終了後、確認のため、YAMLへ変換して保存。
サンプルExcelデータ
サンプルデータとしたのは、以下の様なA列にホスト名、B列にインターフェース名を含むデータ。一部の列には、'[' ']'で囲まれたリストのデータを含んでいる。
サンプルスクリプト
#!/usr/bin/env python3 from openpyxl import load_workbook import yaml xls = 'interface.xlsx' # 入力Excelパラメータシート yml = 'interface.yaml' # 出力YAMLファイル wb = load_workbook(xls) ws = wb.get_sheet_by_name('interface') dict_ = {} for i in range(1, ws.max_row + 1): col = [ws.cell(row = i, column = j).value for j in range(ws.max_column + 1)] if i == 1: field = col continue dict_.setdefault(col[1], {}).setdefault(col[2], {}) for j in range(3, ws.max_column + 1): if col[j]: if '[' and ']' in str(col[j]): if eval(col[j]): dict_[col[1]][col[2]].update({field[j] : eval(col[j])}) else: dict_[col[1]][col[2]].update({field[j] : col[j]}) with open(yml, 'w') as f: f.write(yaml.dump(dict_))
出力例(YAML)
osk-rt-01: GigabitEthernet0: description: LAN switch tagged_vlan: ['1,32,33,1002-1005'] GigabitEthernet1: {status: shutdown} GigabitEthernet2: {status: shutdown} GigabitEthernet3: {status: shutdown} GigabitEthernet4: {status: shutdown} GigabitEthernet5: {status: shutdown} GigabitEthernet6: {status: shutdown} GigabitEthernet7: {status: shutdown} GigabitEthernet8: {duplex: auto, speed: auto, status: shutdown} GigabitEthernet9: {duplex: full, speed: 10} GigabitEthernet9.901: description: tagged WAN 1 primary_ip: 10.240.1.2 255.255.255.128 tagged_vlan: ['901'] Vlan1: {status: shutdown} Vlan32: description: Osaka office LAN primary_ip: 10.1.32.252 255.255.255.0 standby_group: 32 standby_ip: 10.1.32.254 standby_option: [preempt] standby_priority: 115 Vlan33: description: Osaka office LAN primary_ip: 10.1.33.252 255.255.255.0 standby_group: 33 standby_ip: 10.1.33.254 standby_option: [preempt, track 1 decrement 10] standby_priority: 120 osk-rt-02: GigabitEthernet0: description: LAN switch tagged_vlan: ['1,32,33,1002-1005'] GigabitEthernet1: {status: shutdown} GigabitEthernet2: {status: shutdown} GigabitEthernet3: {status: shutdown} GigabitEthernet4: {status: shutdown} GigabitEthernet5: {status: shutdown} GigabitEthernet6: {status: shutdown} GigabitEthernet7: {status: shutdown} GigabitEthernet8: {duplex: auto, speed: auto, status: shutdown} GigabitEthernet9: {duplex: full, speed: 10} GigabitEthernet9.902: description: tagged WAN 2 primary_ip: 10.240.2.2 255.255.255.128 tagged_vlan: ['902'] Vlan1: {status: shutdown} Vlan32: description: Osaka office LAN primary_ip: 10.1.32.253 255.255.255.0 standby_group: 32 standby_ip: 10.1.32.254 standby_option: [preempt, track 1 decrement 10] standby_priority: 120 Vlan33: description: Osaka office LAN primary_ip: 10.1.33.253 255.255.255.0 standby_group: 33 standby_ip: 10.1.33.254 standby_option: [preempt] standby_priority: 115
python pexpectによるcisco / juniperのCLI automation
コンフィグバックアップだけではないマルチパーパス・スクリプト
expectライクなpython pexpectで、cisco / juniper 機器のログ採取と設定変更機能を兼ねたwrapperモジュール(pexpect_wrapper.py)を書いてみた(Linux上のpython2.7, pexpect4.0.1環境で確認)。
- telnet及びssh対応。また、username: を聞いてきたり、聞いてこなかったり、の応答の違いも自動処理。
- IOSの場合、"[confirm]"の文字列に対してEnterで自動応答(ただし、clear系コマンドのみ対象とし、reloadの自動応答は排除)。
- JUNOSの場合、コマンドのtypoが発生すると、正規表現によるprompt待ちを狂わせる。typoを検知したらCTRL+Uを送信し、誤ったコマンドの履歴をクリアしてprompt待ちを正常化すること。
- typoが発生したら、手入力に切り替えるモード(expectのinteract相当)を備えること。
- 複数コマンドを投入する際、任意の空行を挿入出来ること(これは趣味)。
メモ
- sshのオプションで、"-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"にしているので、不要なら削除。
- telnetクライアントがインストールされていないLinuxディストリビューションの場合、別途インストール。
- pexpect.spawn()のmaxread = 1024 * 2, searchwindowsize = 1024 * 4は、環境にあわせて要調整(自分の環境では"show tech"の取得が早くなったが・・、デフォルトでもよいかも)。
- ios_command()の正規表現r"^[\w\-/.]+ ?[>#] *(?:\(enable\))? *$"は、ASAのマルチコンテキスト環境で現れる"/"を考慮。また、CatOSを考慮して\(enable\)も付けているが、不要なら削除。
- 接続失敗が考慮されていなかったり、exit処理が拙かったので修正(2017/1/27)。
pexpect_wrapper.py
#!/usr/bin/env python import sys import re import pexpect class Connect: def __init__( self, host = '', protocol = 'telnet', username = '', password = '', ): self.host = host self.protocol = protocol self.username = username self.password = password if 'tel' in self.protocol: cmd = "telnet {0}".format(self.host) elif 'ssh' in self.protocol: opt = "-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" cmd = "ssh {0} {1}@{2}".format(opt, self.username, self.host) self.session = pexpect.spawn( cmd, timeout = 30, maxread = 1024 * 2, searchwindowsize = 1024 * 4 ) def __auth_failed(self, message): self.session.terminate() print message sys.exit() def __command_error_reporter(self, command): print "\nCommand error detected at '{}'.".format(command), print "Press '^]' to resume." print self.session.match.group(0), self.session.interact() def __re_compile(self, patterns): return [re.compile(pattern, re.S|re.M) for pattern in patterns] def login(self, timeout = 10): prompt = self.__re_compile([ r"(?i)(?:username|login): *$", r"(?i)password: *$", r"[>#$%] *$", ]) try: index = self.session.expect_list(prompt, timeout = timeout) except: self.session.terminate() print "Connect error at '{}'.\n".format(self.host), print self.session.before sys.exit() if index == 0: self.session.sendline(self.username) index = self.session.expect_list(prompt, timeout = timeout) if index != 1: self.__auth_failed('login failed.') if index == 1: self.session.sendline(self.password) index = self.session.expect_list(prompt, timeout = timeout) if index != 2: self.__auth_failed('login failed.') def logout(self): self.session.sendline('exit') self.session.expect(pexpect.EOF) self.session.close() def log(self, logfile = ''): if logfile: self.session.logfile_read = open(logfile, 'w') else: self.session.logfile_read = sys.stdout def blank_lines(self, blanks): for i in range(blanks): self.session.sendline('') self.session.readline() def ios_enable(self, enable_password = '', timeout = 10): if enable_password == '': return prompt = self.__re_compile([ r"> *$", r"(?i)password: *$", r"(?:#|\(enable\)) *$", ]) self.session.sendline('') index = self.session.expect_list(prompt, timeout = timeout) if index == 0: self.session.sendline('enable') index = self.session.expect_list(prompt, timeout = timeout) if index == 1: self.session.sendline(enable_password) index = self.session.expect_list(prompt, timeout = timeout) if index == 1: self.__auth_failed('enable failed.') def ios_command( self, command_list, blanks = 0, error_reporting = False, timeout = 300, ): prompt = self.__re_compile([ r"^[\w\-/.]+ ?[>#] *(?:\(enable\))? *$", r"\((?:config|cfg)[^\)]*\) ?# *$", r"(?i)^clear.*\[confirm\] *$", r"(?i)^% *(?:ambiguous|incomplete|invalid|unknown|\S+ overlaps).*$" ]) self.blank_lines(2) for command in command_list: self.session.sendline(command) self.session.readline() index = self.session.expect_list(prompt, timeout = timeout) if index == 0: if blanks > 0: self.blank_lines(blanks) elif index == 1: pass elif index == 2: self.session.sendline('') self.session.expect_list(prompt, timeout = timeout) if blanks > 0: self.blank_lines(blanks) elif index == 3: if error_reporting is True: self.__command_error_reporter(command) else: self.session.sendcontrol('u') self.session.sendline('') index = self.session.expect_list(prompt, timeout = timeout) if index == 0: if blanks > 0: self.blank_lines(blanks) self.blank_lines(2) def junos_command( self, command_list, blanks = 0, error_reporting = False, timeout = 300, ): prompt = self.__re_compile([ r"^[\w\-]+@[\w\-.]+(?:\([^\)]*\))? ?[>%] *$", r"^[\w\-]+@[\w\-.]+(?:\([^\)]*\))? ?# *$", r"^\s+\^", r"^(?i)error:" ]) self.blank_lines(2) for command in command_list: self.session.send(command) index = self.session.expect_list( [pexpect.TIMEOUT, prompt[2]], timeout = 0.1 ) if index == 1: self.session.sendcontrol('u') if error_reporting is True: self.__command_error_reporter(command) self.session.sendline('') if error_reporting is True: index = self.session.expect_list(prompt, timeout = timeout) else: index = self.session.expect_list( [prompt[0], prompt[1]], timeout = timeout ) if index == 0: if blanks > 0: self.blank_lines(blanks) elif index == 1: pass else: self.__command_error_reporter(command) self.blank_lines(2)
Example
- logfile を指定しなれけば、標準出力にログを書き出しながら実行。
- ios_command()/junos_command()でコマンド毎に2行の空行挿入("blanks = 2")。
- ios_command()/junos_command()で"error_reporting = True"を指定しているので、不正コマンド応答(例えば、"% invalid ~")が見られたら、pexpectをinteractモードに自動変更(CTRL+]の入力でinteractモードを終了し、スクリプトを継続する)。
#!/usr/bin/env python import pexpect_wrapper parameter = { 'host' : '192.168.0.1', 'protocol' : 'telnet', 'username' : 'admin', 'password' : 'fake_pass', } commands = [ 'clear counters', 'clear logging', 'conf t', 'interface lo0', 'ip address 192.0.2.1 255.255.255.255', 'end', 'wr mem', 'ter len 0', 'show run', 'show log', ] c = pexpect_wrapper.Connect(**parameter) c.login() c.ios_enable(enable_password = 'fake_pass') c.log(logfile = 'ios_router.log') c.ios_command(commands, blanks = 2, error_reporting = True,) c.logout() parameter = { 'host' : '192.168.0.2', 'protocol' : 'telnet', 'username' : 'admin', 'password' : 'fake_pass', } commands = [ 'set cli screen-length 0', 'set cli screen-width 0', 'show config | display set', 'config', 'set interface lo0 unit 0 family inet address 192.0.2.2/32', 'commit and-q', 'show config | display set', 'show log messages | last 20', ] c = pexpect_wrapper.Connect(**parameter) c.login() c.log(logfile = 'junos_router.log') c.junos_command(commands, blanks = 2, error_reporting = True,) c.logout()