123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- import sys
- import telnetlib
- import time
- import fcntl
- from FiberHome import FiberHome
- from Furukawa import Furukawa
- from Huawei import Huawei
- from ZTE import ZTE
- from OLTBase import OLTBase
- from ObjectConnection import ObjectConnection
- class ObjectTelnet(ObjectConnection):
- def __init__(self):
- ObjectConnection.__init__(self)
- self.tn = None
- def connect(self):
- """
- Connect by ssh to olt
- """
- if self.brand.upper() == ZTE.__name__.upper():
- self.olt = ZTE(self.model, False)
- elif self.brand.upper() == FiberHome.__name__.upper():
- self.olt = FiberHome(self.model, False)
- elif self.brand.upper() == Furukawa.__name__.upper():
- self.olt = Furukawa(self.model, False)
- elif self.brand.upper() == Huawei.__name__.upper():
- self.olt = Huawei(self.model, False)
- else:
- self.olt = OLTBase(None, False)
- print("Telnet client ...\n")
- f = open("/var/lock/" + self.hostname, "w")
- print("Open lock file\n")
- fcntl.flock(f, fcntl.LOCK_EX)
- print("Lock Acquire\n")
- print("Login in...\n")
- self.tn = telnetlib.Telnet(self.hostname, self.port)
- # self.tn.set_debuglevel(1)
- self.read_data([self.olt.get_expected_name()])
- self.tn.write(self.user + self.NEW_LINE_UNIX)
- self.read_data([self.olt.get_expected_password()])
- self.tn.write(self.password + self.NEW_LINE_UNIX)
- print("Loged in...\n")
- print self.tn.read_very_lazy()
- self.read_data([self.olt.get_expected_initial()])
- if self.file_name is not None:
- self.connection_file()
- elif self.data is not None:
- self.connection_data()
- else:
- self.connection_old()
- print("fin-----")
- while self.QUIT:
- # send quit to terminal until exit confirmation
- self.command_quit(self.olt.get_write_exit())
- self.tn.close()
- self.save_log()
- exit(self.RUN_OK)
- def connection_old(self):
- """
- Old method
- """
- while 1:
- line = sys.stdin.readline()
- if not line:
- break
- print(self.tn.read_until(self.olt.get_expected_cardinal()))
- time.sleep(1)
- def connection_data(self):
- """
- Recive data separeted by ;
- """
- # $(sed ':a;N;$!ba;s/\n/;/g' file)
- content = self.data.split(";")
- for line in content:
- self.command_execute(line, self.olt.get_expected_cardinal())
- def connection_file(self):
- """
- Execute command from file
- """
- with open(self.file_name) as f:
- content = f.readlines()
- for line in content:
- self.command_execute(line, self.olt.get_expected_cardinal())
- def command_enable(self, command):
- """
- Check's if the command is enable and execute then
- :param command:
- :return: Return True if enable otherwise False
- """
- if command.lower() == self.olt.get_write_enable().lower():
- if self.olt.run_enable():
- self.tn.write(self.olt.get_write_enable() + self.NEW_LINE_UNIX)
- if self.olt.run_enable_password():
- self.read_data([self.olt.get_expected_enable_password()])
- self.tn.write(self.password_enable + self.NEW_LINE_UNIX)
- self.read_data([self.olt.get_expected_cardinal()])
- return True
- return False
- def command_quit(self, command):
- """
- Check's if the command is quit and execute then
- :param command:
- :return: Return True if enable otherwise False
- """
- if command.lower() == self.olt.get_write_exit().lower():
- self.tn.write(self.olt.get_write_exit() + self.NEW_LINE_UNIX)
- read_data = None
- with_confirmation = False
- if self.olt.get_expected_exit() is not None:
- read_data = [self.olt.get_expected_cardinal(), self.olt.get_expected_exit()]
- with_confirmation = True
- if self.olt.get_expected_exit_with_confirmation() is not None:
- read_data = [self.olt.get_expected_cardinal(), self.olt.get_expected_exit_with_confirmation()]
- with_confirmation = False
- if read_data is not None:
- position = self.read_data(read_data)
- if position == 2:
- # quit terminal
- if with_confirmation:
- self.tn.write(self.olt.get_write_exit_confirmation() + self.NEW_LINE_UNIX)
- self.QUIT = False
- return True
- return False
- def command_execute(self, command, expected):
- """
- Check's the command
- :param command:
- :param expected: Expected string to stop listening
- """
- command = command.strip()
- if not (command[:-1] == self.NEW_LINE_UNIX or command[:-2] == self.NEW_LINE_WINDOWS):
- self.command_print(command)
- if not self.command_enable(command) and not self.command_quit(command):
- command = command.rstrip(self.NEW_LINE_WINDOWS).rstrip(self.NEW_LINE_UNIX)
- if command.__len__() > 0:
- self.tn.write(command + self.NEW_LINE_UNIX)
- self.read_data([expected])
- def read_data(self, expected):
- """
- Read channel waiting parameter character
- :param expected: List of expected string
- """
- position = 1
- (i, obj, all_data) = self.tn.expect(expected, 10)
- nc = 1
- for ch in expected:
- if ch in all_data:
- position = nc
- break
- nc += 1
- print(str(all_data))
- self.all_data = self.all_data + str(all_data)
- return position
|