import types import errno import sys import telnetlib import time import socket import sys import fcntl from FiberHome import FiberHome from Furukawa import Furukawa from Huawei import Huawei from ZTE import ZTE from OLTBase import OLTBase from ObjectConnection import ObjectConnection class ObjectTelnet(ObjectConnection): def __init__(self): ObjectConnection.__init__(self) self.tn = None self.debug = False def connect(self): """ Connect by telnet to olt """ if self.brand.upper() == ZTE.__name__.upper(): self.olt = ZTE(self.model, False) elif self.brand.upper() == FiberHome.__name__.upper(): self.olt = FiberHome(self.model, False) elif self.brand.upper() == Furukawa.__name__.upper(): self.olt = Furukawa(self.model, False) elif self.brand.upper() == Huawei.__name__.upper(): self.olt = Huawei(self.model, False) else: self.olt = OLTBase(None, False) print("Telnet client ...\n") f = open("/var/lock/" + self.hostname, "w") print("Open lock file\n") fcntl.flock(f, fcntl.LOCK_EX) print("Lock Acquire\n") print("Login in...\n") try: self.tn = telnetlib.Telnet(self.hostname, self.port, self.timeout) if self.debug: self.tn.set_debuglevel(1) error = self.read_data([self.olt.get_expected_name()]) if error != -1: self.tn.write(self.user + self.NEW_LINE_UNIX) error = self.read_data([self.olt.get_expected_password()]) if error != -1: self.tn.write(self.password + self.NEW_LINE_UNIX) print("Logged in...\n") print self.tn.read_very_lazy() cardinal = self.olt.get_expected_cardinal() if isinstance(cardinal, list): read_data_list = cardinal else: read_data_list = [cardinal] error = self.read_data(read_data_list) if error != -1: if self.file_name is not None: self.connection_file() elif self.data is not None: self.connection_data() else: self.connection_old() print("-----FIN-----") nc = 0 while self.QUIT: if nc == 20: self.QUIT = False # send quit to terminal until exit confirmation self.command_quit(self.olt.get_write_exit()) nc += 1 self.tn.close() self.save_log() exit(self.RUN_OK) exit(self.RUN_ERROR) except socket.error: sys.exit("Timeout Except") def connection_old(self): """ Old method """ while 1: line = sys.stdin.readline() if not line: break print(self.tn.read_until(self.olt.get_expected_cardinal())) time.sleep(1) def connection_data(self): """ Recive data separeted by ; """ # $(sed ':a;N;$!ba;s/\n/;/g' file) content = self.data.split(";") for line in content: self.command_execute(line, self.olt.get_expected_cardinal()) def connection_file(self): """ Execute command from file """ with open(self.file_name) as f: content = f.readlines() for line in content: self.command_execute(line, self.olt.get_expected_cardinal()) def command_enable(self, command): """ Check's if the command is enable and execute then :param command: :return: Return True if enable otherwise False """ if command.lower() == self.olt.get_write_enable().lower(): if self.olt.run_enable(): self.tn.write(self.olt.get_write_enable() + self.NEW_LINE_UNIX) if self.olt.run_enable_password(): self.read_data([self.olt.get_expected_enable_password()]) self.tn.write(self.password_enable + self.NEW_LINE_UNIX) cardinal = self.olt.get_expected_cardinal() if isinstance(cardinal, list): read_data_list = cardinal else: read_data_list = [cardinal] self.read_data(read_data_list) return True return False def command_quit(self, command): """ Check's if the command is quit and execute then :param command: :return: Return True if enable otherwise False """ if command.lower() == self.olt.get_write_exit().lower(): try: self.tn.write(self.olt.get_write_exit() + self.NEW_LINE_UNIX) read_data = None with_confirmation = False if self.olt.get_expected_exit() is not None: cardinal = self.olt.get_expected_cardinal() if isinstance(cardinal, list): read_data = cardinal + [self.olt.get_expected_exit()] else: read_data = [cardinal, self.olt.get_expected_exit()] with_confirmation = True if self.olt.get_expected_exit_with_confirmation() is not None: cardinal = self.olt.get_expected_cardinal() if isinstance(cardinal, list): read_data = cardinal + [self.olt.get_expected_exit_with_confirmation()] else: read_data = [cardinal, self.olt.get_expected_exit_with_confirmation()] with_confirmation = False if read_data is not None: position = self.read_data(read_data) if position == 2: # quit terminal if with_confirmation: self.tn.write(self.olt.get_write_exit_confirmation() + self.NEW_LINE_UNIX) self.QUIT = False # except self.tn.error, e: except IOError, e: if e.errno == errno.EPIPE: self.QUIT = False return True return False def command_execute(self, command, expected): """ Check's the command :param command: :param expected: Expected string to stop listening """ self.command_print(command) command = command.strip() if "\\n" in command and (len(command) == 4 or len(command) == 2): print "Sending ENTER \\r\\n" command = '\r\n' else: command = command.rstrip(self.NEW_LINE_WINDOWS).rstrip(self.NEW_LINE_UNIX) if not self.command_enable(command) and not self.command_quit(command): if command.__len__() > 0: if self.debug: print "Send: " print command self.tn.write(command + self.NEW_LINE_UNIX) self.read_data(expected) def read_data(self, expected): """ Read channel waiting parameter character :param expected: List of expected string """ iteration = 0 position = -1 if isinstance(expected, types.StringType): expected = [expected] while iteration < 6: try: (i, obj, all_data) = self.tn.expect(expected, 10) nc = 1 for ch in expected: if ch in all_data: position = nc break nc += 1 if position == -1: iteration += 1 else: iteration = 6 print(str(all_data)) self.all_data = self.all_data + str(all_data) except EOFError as error: iteration = 6 return position