|
@@ -0,0 +1,160 @@
|
|
|
+import sys
|
|
|
+import telnetlib
|
|
|
+import time
|
|
|
+import fcntl
|
|
|
+from FiberHome import FiberHome
|
|
|
+from Furukawa import Furukawa
|
|
|
+from Huawei import Huawei
|
|
|
+from ZTE import ZTE
|
|
|
+from OLTBase import OLTBase
|
|
|
+from ObjectConnection import ObjectConnection
|
|
|
+
|
|
|
+
|
|
|
+class ObjectTelnet(ObjectConnection):
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ ObjectConnection.__init__(self)
|
|
|
+ self.tn = None
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ """
|
|
|
+ Connect by ssh to olt
|
|
|
+ """
|
|
|
+ if self.brand.upper() == ZTE.__name__.upper():
|
|
|
+ self.olt = ZTE(self.model, False)
|
|
|
+ elif self.brand.upper() == FiberHome.__name__.upper():
|
|
|
+ self.olt = FiberHome(self.model, False)
|
|
|
+ elif self.brand.upper() == Furukawa.__name__.upper():
|
|
|
+ self.olt = Furukawa(self.model, False)
|
|
|
+ elif self.brand.upper() == Huawei.__name__.upper():
|
|
|
+ self.olt = Huawei(self.model, False)
|
|
|
+ else:
|
|
|
+ self.olt = OLTBase(None, False)
|
|
|
+ print("Telnet client ...\n")
|
|
|
+ f = open("/var/lock/" + self.hostname, "w")
|
|
|
+ print("Open lock file\n")
|
|
|
+ fcntl.flock(f, fcntl.LOCK_EX)
|
|
|
+ print("Lock Acquire\n")
|
|
|
+ print("Login in...\n")
|
|
|
+ self.tn = telnetlib.Telnet(self.hostname, self.port)
|
|
|
+ # self.tn.set_debuglevel(1)
|
|
|
+ self.read_data([self.olt.get_expected_name()])
|
|
|
+ self.tn.write(self.user + self.NEW_LINE_UNIX)
|
|
|
+ self.read_data([self.olt.get_expected_password()])
|
|
|
+ self.tn.write(self.password + self.NEW_LINE_UNIX)
|
|
|
+ print("Loged in...\n")
|
|
|
+ print self.tn.read_very_lazy()
|
|
|
+ self.read_data([self.olt.get_expected_initial()])
|
|
|
+ if self.file_name is not None:
|
|
|
+ self.connection_file()
|
|
|
+ elif self.data is not None:
|
|
|
+ self.connection_data()
|
|
|
+ else:
|
|
|
+ self.connection_old()
|
|
|
+ print("fin-----")
|
|
|
+ while self.QUIT:
|
|
|
+ # send quit to terminal until exit confirmation
|
|
|
+ self.command_quit(self.olt.get_write_exit())
|
|
|
+ self.tn.close()
|
|
|
+ self.save_log()
|
|
|
+ exit(self.RUN_OK)
|
|
|
+
|
|
|
+ def connection_old(self):
|
|
|
+ """
|
|
|
+ Old method
|
|
|
+ """
|
|
|
+ while 1:
|
|
|
+ line = sys.stdin.readline()
|
|
|
+ if not line:
|
|
|
+ break
|
|
|
+ print(self.tn.read_until(self.olt.get_expected_cardinal()))
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def connection_data(self):
|
|
|
+ """
|
|
|
+ Recive data separeted by ;
|
|
|
+ """
|
|
|
+ # $(sed ':a;N;$!ba;s/\n/;/g' file)
|
|
|
+ content = self.data.split(";")
|
|
|
+ for line in content:
|
|
|
+ self.command_execute(line, self.olt.get_expected_cardinal())
|
|
|
+
|
|
|
+ def connection_file(self):
|
|
|
+ """
|
|
|
+ Execute command from file
|
|
|
+ """
|
|
|
+ with open(self.file_name) as f:
|
|
|
+ content = f.readlines()
|
|
|
+ for line in content:
|
|
|
+ self.command_execute(line, self.olt.get_expected_cardinal())
|
|
|
+
|
|
|
+ def command_enable(self, command):
|
|
|
+ """
|
|
|
+ Check's if the command is enable and execute then
|
|
|
+ :param command:
|
|
|
+ :return: Return True if enable otherwise False
|
|
|
+ """
|
|
|
+ if command.lower() == self.olt.get_write_enable().lower():
|
|
|
+ if self.olt.run_enable():
|
|
|
+ self.tn.write(self.olt.get_write_enable() + self.NEW_LINE_UNIX)
|
|
|
+ if self.olt.run_enable_password():
|
|
|
+ self.read_data([self.olt.get_expected_enable_password()])
|
|
|
+ self.tn.write(self.password_enable + self.NEW_LINE_UNIX)
|
|
|
+ self.read_data([self.olt.get_expected_cardinal()])
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ def command_quit(self, command):
|
|
|
+ """
|
|
|
+ Check's if the command is quit and execute then
|
|
|
+ :param command:
|
|
|
+ :return: Return True if enable otherwise False
|
|
|
+ """
|
|
|
+ if command.lower() == self.olt.get_write_exit().lower():
|
|
|
+ self.tn.write(self.olt.get_write_exit() + self.NEW_LINE_UNIX)
|
|
|
+ if self.olt.get_expected_exit() is not None:
|
|
|
+ # data = self.tn.read_all()
|
|
|
+ position = self.read_data([self.olt.get_expected_cardinal(), self.olt.get_expected_exit()])
|
|
|
+ if position == 2:
|
|
|
+ # quit terminal
|
|
|
+ self.tn.write(self.olt.get_write_exit_confirmation() + self.NEW_LINE_UNIX)
|
|
|
+ self.QUIT = False
|
|
|
+ # (i, obj, data) = self.tn.expect([self.olt.get_expected_cardinal(), self.olt.get_expected_exit()], 10)
|
|
|
+ # if self.olt.get_expected_exit() in data:
|
|
|
+ # # quit terminal
|
|
|
+ # self.tn.write(self.olt.get_write_exit_confirmation() + self.NEW_LINE_UNIX)
|
|
|
+ # self.QUIT = False
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ def command_execute(self, command, expected):
|
|
|
+ """
|
|
|
+ Check's the command
|
|
|
+ :param command:
|
|
|
+ :param expected: Expected string to stop listening
|
|
|
+ """
|
|
|
+ command = command.strip()
|
|
|
+ if not (command[:-1] == self.NEW_LINE_UNIX or command[:-2] == self.NEW_LINE_WINDOWS):
|
|
|
+ self.command_print(command)
|
|
|
+ if not self.command_enable(command) and not self.command_quit(command):
|
|
|
+ command = command.rstrip(self.NEW_LINE_WINDOWS).rstrip(self.NEW_LINE_UNIX)
|
|
|
+ if command.__len__() > 0:
|
|
|
+ self.tn.write(command + self.NEW_LINE_UNIX)
|
|
|
+ self.read_data([expected])
|
|
|
+
|
|
|
+ def read_data(self, expected):
|
|
|
+ """
|
|
|
+ Read channel waiting parameter character
|
|
|
+ :param expected: List of expected string
|
|
|
+ """
|
|
|
+ position = 1
|
|
|
+ (i, obj, all_data) = self.tn.expect(expected, 10)
|
|
|
+ nc = 1
|
|
|
+ for ch in expected:
|
|
|
+ if ch in all_data:
|
|
|
+ position = nc
|
|
|
+ break
|
|
|
+ nc += 1
|
|
|
+ print(str(all_data))
|
|
|
+ self.all_data = self.all_data + str(all_data)
|
|
|
+ return position
|