import errno import sys import telnetlib import time import fcntl from FiberHome import FiberHome from Furukawa import Furukawa from Huawei import Huawei from ZTE import ZTE from OLTBase import OLTBase from ObjectConnection import ObjectConnection class ObjectTelnet(ObjectConnection): def __init__(self): ObjectConnection.__init__(self) self.tn = None def connect(self): """ Connect by ssh to olt """ if self.brand.upper() == ZTE.__name__.upper(): self.olt = ZTE(self.model, False) elif self.brand.upper() == FiberHome.__name__.upper(): self.olt = FiberHome(self.model, False) elif self.brand.upper() == Furukawa.__name__.upper(): self.olt = Furukawa(self.model, False) elif self.brand.upper() == Huawei.__name__.upper(): self.olt = Huawei(self.model, False) else: self.olt = OLTBase(None, False) print("Telnet client ...\n") f = open("/var/lock/" + self.hostname, "w") print("Open lock file\n") fcntl.flock(f, fcntl.LOCK_EX) print("Lock Acquire\n") print("Login in...\n") self.tn = telnetlib.Telnet(self.hostname, self.port) # self.tn.set_debuglevel(1) error = self.read_data([self.olt.get_expected_name()]) if error != -1: self.tn.write(self.user + self.NEW_LINE_UNIX) error = self.read_data([self.olt.get_expected_password()]) if error != -1: self.tn.write(self.password + self.NEW_LINE_UNIX) print("Loged in...\n") print self.tn.read_very_lazy() error = self.read_data([self.olt.get_expected_initial()]) if error != -1: if self.file_name is not None: self.connection_file() elif self.data is not None: self.connection_data() else: self.connection_old() print("fin-----") nc = 0 while self.QUIT: if nc == 20: self.QUIT = False # send quit to terminal until exit confirmation self.command_quit(self.olt.get_write_exit()) contador += 1 self.tn.close() self.save_log() exit(self.RUN_OK) exit(self.RUN_ERROR) def connection_old(self): """ Old method """ while 1: line = sys.stdin.readline() if not line: break print(self.tn.read_until(self.olt.get_expected_cardinal())) time.sleep(1) def connection_data(self): """ Recive data separeted by ; """ # $(sed ':a;N;$!ba;s/\n/;/g' file) content = self.data.split(";") for line in content: self.command_execute(line, self.olt.get_expected_cardinal()) def connection_file(self): """ Execute command from file """ with open(self.file_name) as f: content = f.readlines() for line in content: self.command_execute(line, self.olt.get_expected_cardinal()) def command_enable(self, command): """ Check's if the command is enable and execute then :param command: :return: Return True if enable otherwise False """ if command.lower() == self.olt.get_write_enable().lower(): if self.olt.run_enable(): self.tn.write(self.olt.get_write_enable() + self.NEW_LINE_UNIX) if self.olt.run_enable_password(): self.read_data([self.olt.get_expected_enable_password()]) self.tn.write(self.password_enable + self.NEW_LINE_UNIX) self.read_data([self.olt.get_expected_cardinal()]) return True return False def command_quit(self, command): """ Check's if the command is quit and execute then :param command: :return: Return True if enable otherwise False """ if command.lower() == self.olt.get_write_exit().lower(): try: self.tn.write(self.olt.get_write_exit() + self.NEW_LINE_UNIX) read_data = None with_confirmation = False if self.olt.get_expected_exit() is not None: read_data = [self.olt.get_expected_cardinal(), self.olt.get_expected_exit()] with_confirmation = True if self.olt.get_expected_exit_with_confirmation() is not None: read_data = [self.olt.get_expected_cardinal(), self.olt.get_expected_exit_with_confirmation()] with_confirmation = False if read_data is not None: position = self.read_data(read_data) if position == 2: # quit terminal if with_confirmation: self.tn.write(self.olt.get_write_exit_confirmation() + self.NEW_LINE_UNIX) self.QUIT = False # except self.tn.error, e: except IOError, e: if e.errno == errno.EPIPE: self.QUIT = False return True return False def command_execute(self, command, expected): """ Check's the command :param command: :param expected: Expected string to stop listening """ command = command.strip() if not (command[:-1] == self.NEW_LINE_UNIX or command[:-2] == self.NEW_LINE_WINDOWS): self.command_print(command) if not self.command_enable(command) and not self.command_quit(command): command = command.rstrip(self.NEW_LINE_WINDOWS).rstrip(self.NEW_LINE_UNIX) if command.__len__() > 0: self.tn.write(command + self.NEW_LINE_UNIX) self.read_data([expected]) def read_data(self, expected): """ Read channel waiting parameter character :param expected: List of expected string """ iteration = 0 position = -1 while iteration < 6: (i, obj, all_data) = self.tn.expect(expected, 10) nc = 1 for ch in expected: if ch in all_data: position = nc break nc += 1 if position == -1: iteration += 1 else: iteration = 6 print(str(all_data)) self.all_data = self.all_data + str(all_data) return position