技術メモ

ネットワークエンジニアがpython, perl等々を気楽に使うための覚え書き

python標準のstring.Templateを用いたcisco config作成スクリプト

jinja2もいいけれど...

jinja2の過去記事はこちら


単純な置換だけで完成する差分config程度なら、やはり簡単なのがよい(string.Templateは、perlのheredocくらいの手軽さ)。使い勝手のよさそうな以下の3種類のサンプルスクリプトを作成(Python3.4)。

  • YAMLデータ読み込み版(PyYAML3.11)。
  • CSVデータ読み込み版(Python3.4標準のcsvモジュール)。
  • Excelデータ読み込み版(openpyxl 2.3.0)。

サンプルスクリプトYAML版)

#!/usr/bin/python3
from string import Template
import yaml
import ipaddress

with open('./yml.txt', "r") as f:
    d = yaml.load(f)
with open('./tmpl.txt', "r") as f:
    t = Template(f.read())

for k, v in d.items():
    obj = ipaddress.ip_interface(v['ip'])
    config = t.substitute(
        v,
        host = k,
        ip_addr = obj.with_netmask.replace('/', ' '),
        net_addr = obj.network.with_hostmask.replace('/', ' '),
    )
    with open(k + '.txt', "w") as f:
        f.write(config)

yml.txt(YAMLデータ)

tky-rt01:
    vlan : 2
    ip : 192.168.2.1/24
tky-rt02:
    vlan : 2
    ip : 192.168.2.2/24

サンプルスクリプトCSV版)

#!/usr/bin/python3
from string import Template
import csv
import ipaddress

with open('./data.csv', "r") as f:
    data = [d for d in csv.DictReader(f)]
with open('./tmpl.txt', "r") as f:
    t = Template(f.read())

for d in data:
    obj = ipaddress.ip_interface(d['ip'])
    config = t.substitute(
        d,
        ip_addr = obj.with_netmask.replace('/', ' '),
        net_addr = obj.network.with_hostmask.replace('/', ' '),
    )
    with open(d['host'] + '.txt', "w") as f:
        f.write(config)

data.csvCSVファイル)

host,vlan,ip
tky-rt01,2,192.168.2.1/24
tky-rt02,2,192.168.2.2/24

サンプルスクリプトExcel版)

#!/usr/bin/env python3
from string import Template
from openpyxl import load_workbook
import ipaddress

wb = load_workbook('data.xlsx')
ws = wb.active
data = []
for i in range(1, ws.max_row + 1):
    values = [ws.cell(row = i, column = j).value for j in range(1, ws.max_column + 1)]
    if i == 1:
        keys = values
    else:
        data.append(dict(zip(keys, values)))
with open('./tmpl.txt', "r") as f:
    t = Template(f.read())

for d in data:
    obj = ipaddress.ip_interface(d['ip'])
    config = t.substitute(
        d,
        ip_addr = obj.with_netmask.replace('/', ' '),
        net_addr = obj.network.with_hostmask.replace('/', ' '),
    )
    with open(d['host'] + '.txt', "w") as f:
        f.write(config)

data.xlsx
f:id:mocas:20170207120948j:plain

tmpl.txt(共通テンプレート)

!
! $host config.
!
vlan $vlan
!
interface Vlan$vlan
 ip address $ip_addr
!
router eigrp 1
 network $net_addr
!

tky-rt01.txt(出力結果)

!
! tky-rt01 config.
!
vlan 2
!
interface Vlan2
 ip address 192.168.2.1 255.255.255.0
!
router eigrp 1
 network 192.168.2.0 0.0.0.255
!

tky-rt02.txt(出力結果)

!
! tky-rt02 config.
!
vlan 2
!
interface Vlan2
 ip address 192.168.2.2 255.255.255.0
!
router eigrp 1
 network 192.168.2.0 0.0.0.255
!

atom パッケージ - language-teraterm-macro

python, perlスクリプト作成でお世話になっている、atomエディタ(https://atom.io/)。

中々珍しい、teraterm macro(ttl)を補助してくれるパッケージ(Atom用パッケージ | できる!TeraTermマクロ)をみつけ、数週間使ってみたが、very good!

f:id:mocas:20151214130838p:plain

CTRL+/によるコメントアウトは、Cスタイル(/* */)よりもセミコロン(;)の方が好み・・・

python hashlibによるMD5/SHA-1/SHA-256 File validation

MD5/SHA-1/SHA-256をまとめて計算、そしてCSV出力もサポートするPythonスクリプト(Windows版のPython3.4で作成)。

メモ

  • 使い方は、対象ファイルをglob形式で指定(複数可)。
  • hash.update()で使うblocksizeの選択に注意(スクリプトでは、デフォルト2047を定義)。
C:\>get_digest.py -h
usage: get_digest.py [-h] [-c] glob_pattern [glob_pattern ...]

positional arguments:
  glob_pattern  glob pattern(eg. *.bin *.log)

optional arguments:
  -h, --help    show this help message and exit
  -c, --csv     switch to csv output

get_digest.py

#!/usr/bin/env python3
import os
import glob
import argparse
import hashlib

def get_digest(file_, blocksize = 2047):
    md5 = hashlib.md5()
    sha1 = hashlib.sha1()
    sha256 = hashlib.sha256()
    with open(file_, 'rb') as f:
        while True:
            buf = f.read(blocksize)
            if not buf: break
            md5.update(buf)
            sha1.update(buf)
            sha256.update(buf)
    return {
        'md5'   : md5.hexdigest(),
        'sha1'  : sha1.hexdigest(),
        'sha256': sha256.hexdigest(),
    }

parser = argparse.ArgumentParser()
parser.add_argument('glob_pattern', action = 'store', nargs = '+',
                    help = 'glob pattern(eg. *.bin *.log)')
parser.add_argument('-c', '--csv', dest = 'is_csv', action = 'store_true',
                    help = 'switch to csv output')
args = parser.parse_args()

files = []
for pattern in args.glob_pattern:
    files.extend(glob.glob(pattern))

if args.is_csv:
    print('"FILE","SIZE","MD5","SHA-1","SHA-256"')
    template = '"{file}","{size}","{md5}","{sha1}","{sha256}"'
else:
    template = """
FILE    : {file} ({size} bytes)
MD5     : {md5}
SHA-1   : {sha1}
SHA-256 : {sha256}"""

for file_ in files:
    if os.path.isdir(file_):
        print("{} is a directory.".format(file_))
        continue
    print(
        template.format(
            file = os.path.abspath(file_),
            size = os.path.getsize(file_),
            **get_digest(file_)
        )
    )

python openpyxl - ネットワーク機器のExcelパラメータシートをpythonで読取

ネットワーク機器(cisco)のExcelパラメータシート

ホスト名とインターフェース名を含むExcelのパラメータシートを読み取って、データを加工(config作成、構築データ作成等)を行いたい時のサンプルスクリプトを作成(環境は、python 3.4 + openpyxl 2.3.0)。
ネストした辞書データ({"ホスト名" : {"インターフェース名": {} } })になるため、読み取り終了後、確認のため、YAMLへ変換して保存。

サンプルExcelデータ

サンプルデータとしたのは、以下の様なA列にホスト名、B列にインターフェース名を含むデータ。一部の列には、'[' ']'で囲まれたリストのデータを含んでいる。
f:id:mocas:20151210183157p:plain

サンプルスクリプト

#!/usr/bin/env python3
from openpyxl import load_workbook
import yaml

xls = 'interface.xlsx' # 入力Excelパラメータシート
yml = 'interface.yaml' # 出力YAMLファイル

wb = load_workbook(xls)
ws = wb.get_sheet_by_name('interface')
dict_ = {}
for i in range(1, ws.max_row + 1):
    col = [ws.cell(row = i, column = j).value for j in range(ws.max_column + 1)]
    if i == 1:
        field = col
        continue
    dict_.setdefault(col[1], {}).setdefault(col[2], {})
    for j in range(3, ws.max_column + 1):
        if col[j]:
            if '[' and ']' in str(col[j]):
                if eval(col[j]):
                    dict_[col[1]][col[2]].update({field[j] : eval(col[j])})
            else:
                dict_[col[1]][col[2]].update({field[j] : col[j]})
with open(yml, 'w') as f:
    f.write(yaml.dump(dict_))

出力例(YAML)

osk-rt-01:
  GigabitEthernet0:
    description: LAN switch
    tagged_vlan: ['1,32,33,1002-1005']
  GigabitEthernet1: {status: shutdown}
  GigabitEthernet2: {status: shutdown}
  GigabitEthernet3: {status: shutdown}
  GigabitEthernet4: {status: shutdown}
  GigabitEthernet5: {status: shutdown}
  GigabitEthernet6: {status: shutdown}
  GigabitEthernet7: {status: shutdown}
  GigabitEthernet8: {duplex: auto, speed: auto, status: shutdown}
  GigabitEthernet9: {duplex: full, speed: 10}
  GigabitEthernet9.901:
    description: tagged WAN 1
    primary_ip: 10.240.1.2 255.255.255.128
    tagged_vlan: ['901']
  Vlan1: {status: shutdown}
  Vlan32:
    description: Osaka office LAN
    primary_ip: 10.1.32.252 255.255.255.0
    standby_group: 32
    standby_ip: 10.1.32.254
    standby_option: [preempt]
    standby_priority: 115
  Vlan33:
    description: Osaka office LAN
    primary_ip: 10.1.33.252 255.255.255.0
    standby_group: 33
    standby_ip: 10.1.33.254
    standby_option: [preempt, track 1 decrement 10]
    standby_priority: 120
osk-rt-02:
  GigabitEthernet0:
    description: LAN switch
    tagged_vlan: ['1,32,33,1002-1005']
  GigabitEthernet1: {status: shutdown}
  GigabitEthernet2: {status: shutdown}
  GigabitEthernet3: {status: shutdown}
  GigabitEthernet4: {status: shutdown}
  GigabitEthernet5: {status: shutdown}
  GigabitEthernet6: {status: shutdown}
  GigabitEthernet7: {status: shutdown}
  GigabitEthernet8: {duplex: auto, speed: auto, status: shutdown}
  GigabitEthernet9: {duplex: full, speed: 10}
  GigabitEthernet9.902:
    description: tagged WAN 2
    primary_ip: 10.240.2.2 255.255.255.128
    tagged_vlan: ['902']
  Vlan1: {status: shutdown}
  Vlan32:
    description: Osaka office LAN
    primary_ip: 10.1.32.253 255.255.255.0
    standby_group: 32
    standby_ip: 10.1.32.254
    standby_option: [preempt, track 1 decrement 10]
    standby_priority: 120
  Vlan33:
    description: Osaka office LAN
    primary_ip: 10.1.33.253 255.255.255.0
    standby_group: 33
    standby_ip: 10.1.33.254
    standby_option: [preempt]
    standby_priority: 115

python pexpectによるcisco / juniperのCLI automation

コンフィグバックアップだけではないマルチパーパス・スクリプト

expectライクなpython pexpectで、cisco / juniper 機器のログ採取と設定変更機能を兼ねたwrapperモジュール(pexpect_wrapper.py)を書いてみた(Linux上のpython2.7, pexpect4.0.1環境で確認)。

  • telnet及びssh対応。また、username: を聞いてきたり、聞いてこなかったり、の応答の違いも自動処理。
  • IOSの場合、"[confirm]"の文字列に対してEnterで自動応答(ただし、clear系コマンドのみ対象とし、reloadの自動応答は排除)。
  • JUNOSの場合、コマンドのtypoが発生すると、正規表現によるprompt待ちを狂わせる。typoを検知したらCTRL+Uを送信し、誤ったコマンドの履歴をクリアしてprompt待ちを正常化すること。
  • typoが発生したら、手入力に切り替えるモード(expectのinteract相当)を備えること。
  • 複数コマンドを投入する際、任意の空行を挿入出来ること(これは趣味)。

メモ

  • sshのオプションで、"-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"にしているので、不要なら削除。
  • telnetクライアントがインストールされていないLinuxディストリビューションの場合、別途インストール。
  • pexpect.spawn()のmaxread = 1024 * 2, searchwindowsize = 1024 * 4は、環境にあわせて要調整(自分の環境では"show tech"の取得が早くなったが・・、デフォルトでもよいかも)。
  • ios_command()の正規表現r"^[\w\-/.]+ ?[>#] *(?:\(enable\))? *$"は、ASAのマルチコンテキスト環境で現れる"/"を考慮。また、CatOSを考慮して\(enable\)も付けているが、不要なら削除。
  • 接続失敗が考慮されていなかったり、exit処理が拙かったので修正(2017/1/27)。

pexpect_wrapper.py

#!/usr/bin/env python
import sys
import re
import pexpect

class Connect:

    def __init__(
            self, host = '', protocol = 'telnet', username = '', password = '',
        ):
        self.host = host
        self.protocol = protocol
        self.username = username
        self.password = password

        if 'tel' in self.protocol:
            cmd = "telnet {0}".format(self.host)
        elif 'ssh' in self.protocol:
            opt = "-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
            cmd = "ssh {0} {1}@{2}".format(opt, self.username, self.host)
        self.session = pexpect.spawn(
            cmd, timeout = 30, maxread = 1024 * 2, searchwindowsize = 1024 * 4
        )

    def __auth_failed(self, message):
        self.session.terminate()
        print message
        sys.exit()

    def __command_error_reporter(self, command):
        print "\nCommand error detected at '{}'.".format(command),
        print "Press '^]' to resume."
        print self.session.match.group(0),
        self.session.interact()

    def __re_compile(self, patterns):
        return [re.compile(pattern, re.S|re.M) for pattern in patterns]

    def login(self, timeout = 10):
        prompt = self.__re_compile([
            r"(?i)(?:username|login): *$",
            r"(?i)password: *$",
            r"[>#$%] *$",
        ])
        try:
            index = self.session.expect_list(prompt, timeout = timeout)
        except:
            self.session.terminate()
            print "Connect error at '{}'.\n".format(self.host),
            print self.session.before
            sys.exit()
        if index == 0:
            self.session.sendline(self.username)
            index = self.session.expect_list(prompt, timeout = timeout)
            if index != 1:
                self.__auth_failed('login failed.')
        if index == 1:
            self.session.sendline(self.password)
            index = self.session.expect_list(prompt, timeout = timeout)
            if index != 2:
                self.__auth_failed('login failed.')

    def logout(self):
        self.session.sendline('exit')
        self.session.expect(pexpect.EOF)
        self.session.close()

    def log(self, logfile = ''):
        if logfile:
            self.session.logfile_read = open(logfile, 'w')
        else:
            self.session.logfile_read = sys.stdout

    def blank_lines(self, blanks):
        for i in range(blanks):
            self.session.sendline('')
            self.session.readline()

    def ios_enable(self, enable_password = '', timeout = 10):
        if enable_password == '': return
        prompt = self.__re_compile([
            r"> *$",
            r"(?i)password: *$",
            r"(?:#|\(enable\)) *$",
        ])
        self.session.sendline('')
        index = self.session.expect_list(prompt, timeout = timeout)
        if index == 0:
            self.session.sendline('enable')
            index = self.session.expect_list(prompt, timeout = timeout)
            if index == 1:
                self.session.sendline(enable_password)
                index = self.session.expect_list(prompt, timeout = timeout)
                if index == 1:
                    self.__auth_failed('enable failed.')

    def ios_command(
            self, command_list, blanks = 0, error_reporting = False,
            timeout = 300,
        ):
        prompt = self.__re_compile([
            r"^[\w\-/.]+ ?[>#] *(?:\(enable\))? *$",
            r"\((?:config|cfg)[^\)]*\) ?# *$",
            r"(?i)^clear.*\[confirm\] *$",
            r"(?i)^% *(?:ambiguous|incomplete|invalid|unknown|\S+ overlaps).*$"
        ])
        self.blank_lines(2)
        for command in command_list:
            self.session.sendline(command)
            self.session.readline()
            index = self.session.expect_list(prompt, timeout = timeout)
            if index == 0:
                if blanks > 0: self.blank_lines(blanks)
            elif index == 1:
                pass
            elif index == 2:
                self.session.sendline('')
                self.session.expect_list(prompt, timeout = timeout)
                if blanks > 0: self.blank_lines(blanks)
            elif index == 3:
                if error_reporting is True:
                    self.__command_error_reporter(command)
                else:
                    self.session.sendcontrol('u')
                    self.session.sendline('')
                    index = self.session.expect_list(prompt, timeout = timeout)
                    if index == 0:
                        if blanks > 0: self.blank_lines(blanks)
        self.blank_lines(2)

    def junos_command(
            self, command_list, blanks = 0, error_reporting = False,
            timeout = 300,
        ):
        prompt = self.__re_compile([
            r"^[\w\-]+@[\w\-.]+(?:\([^\)]*\))? ?[>%] *$",
            r"^[\w\-]+@[\w\-.]+(?:\([^\)]*\))? ?# *$",
            r"^\s+\^",
            r"^(?i)error:"
        ])
        self.blank_lines(2)
        for command in command_list:
            self.session.send(command)
            index = self.session.expect_list(
                [pexpect.TIMEOUT, prompt[2]], timeout = 0.1
            )
            if index == 1:
                self.session.sendcontrol('u')
                if error_reporting is True:
                    self.__command_error_reporter(command)

            self.session.sendline('')
            if error_reporting is True:
                index = self.session.expect_list(prompt, timeout = timeout)
            else:
                index = self.session.expect_list(
                    [prompt[0], prompt[1]], timeout = timeout
                )

            if index == 0:
                if blanks > 0: self.blank_lines(blanks)
            elif index == 1:
                pass
            else:
                self.__command_error_reporter(command)
        self.blank_lines(2)

Example

  • logfile を指定しなれけば、標準出力にログを書き出しながら実行。
  • ios_command()/junos_command()でコマンド毎に2行の空行挿入("blanks = 2")。
  • ios_command()/junos_command()で"error_reporting = True"を指定しているので、不正コマンド応答(例えば、"% invalid ~")が見られたら、pexpectをinteractモードに自動変更(CTRL+]の入力でinteractモードを終了し、スクリプトを継続する)。
#!/usr/bin/env python
import pexpect_wrapper

parameter = {
    'host' : '192.168.0.1',
    'protocol' : 'telnet',
    'username' : 'admin',
    'password' : 'fake_pass',
}

commands = [
    'clear counters',
    'clear logging',
    'conf t',
    'interface lo0',
    'ip address 192.0.2.1 255.255.255.255',
    'end',
    'wr mem',
    'ter len 0',
    'show run',
    'show log',
]

c = pexpect_wrapper.Connect(**parameter)
c.login()
c.ios_enable(enable_password = 'fake_pass')
c.log(logfile = 'ios_router.log')
c.ios_command(commands, blanks = 2, error_reporting = True,)
c.logout()

parameter = {
    'host' : '192.168.0.2',
    'protocol' : 'telnet',
    'username' : 'admin',
    'password' : 'fake_pass',
}
commands = [
    'set cli screen-length 0',
    'set cli screen-width 0',
    'show config | display set',
    'config',
    'set interface lo0 unit 0 family inet address 192.0.2.2/32',
    'commit and-q',
    'show config | display set',
    'show log messages | last 20',
]

c = pexpect_wrapper.Connect(**parameter)
c.login()
c.log(logfile = 'junos_router.log')
c.junos_command(commands, blanks = 2, error_reporting = True,)
c.logout()