using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using HaaSMiddleware.BusinessLogicTier.Factory; using HaaSMiddleware.DataAccessTier.UnitOfWork; using HaaSMiddleware.DomainObjects.DataTransfer; using HaaSMiddleware.DomainObjects.UserAndLimitationManagement; using HaaSMiddleware.DomainObjects.JobManagement.JobInformation; using HaaSMiddleware.HpcConnectionFramework; using log4net; using System.Threading; using System.Net; using System.Net.Sockets; using System.Text; using HaaSMiddleware.BusinessLogicTier.Logic.DataTransfer.Exceptions; namespace HaaSMiddleware.BusinessLogicTier.Logic.DataTransfer { internal class DataTransferLogic : IDataTransferLogic { private static readonly ILog log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private readonly IUnitOfWork unitOfWork; //local sockets for jobs and ips private static Dictionary<long, Dictionary<string, Socket>> jobIpSockets = new Dictionary<long, Dictionary<string, Socket>>(); private static Dictionary<string, int> ipLocalPorts = new Dictionary<string, int>(); internal DataTransferLogic(IUnitOfWork unitOfWork) { this.unitOfWork = unitOfWork; } public DataTransferMethod GetDataTransferMethod(string ipAddress, int port, SubmittedJobInfo jobInfo, AdaptorUser loggedUser) { if (jobInfo.State > JobState.Running) throw new UnableToCreateConnectionException("JobId " + jobInfo.Id + " is not a currently running job."); log.Info("Getting data transfer method for submitted job info ID " + jobInfo.Id + " with user " + loggedUser.GetLogIdentification()); try { //create tunnel int allocatedPort = CreateSshTunnel(jobInfo, "127.0.0.1", 4000, "salomon.it4i.cz", ipAddress, port); ipLocalPorts.Add(ipAddress, allocatedPort); //create socket CreateLocalSocketConnection(jobInfo.Id, ipAddress, allocatedPort); DataTransferMethod dtMethod = new DataTransferMethod(); dtMethod.submittedJobId = jobInfo.Id; dtMethod.IpAddress = ipAddress; dtMethod.Port = port; return dtMethod; } catch(Exception e) { RemoveSshTunnel(jobInfo, ipAddress); CloseLocalSocketConnection(jobInfo.Id, ipAddress); log.Error("GetDataTransferMethod error: {0}", e); return null; } } public void EndDataTransfer(DataTransferMethod transferMethod, SubmittedJobInfo jobInfo, AdaptorUser loggedUser) { log.Info("Removing data transfer method for submitted job info ID " + transferMethod.submittedJobId + " with user " + loggedUser.GetLogIdentification()); if (jobIpSockets.Keys.Contains(transferMethod.submittedJobId)) { try { //remove tunnel ipLocalPorts.Remove(transferMethod.IpAddress); RemoveSshTunnel(jobInfo, transferMethod.IpAddress); //remove socket CloseLocalSocketConnection(transferMethod.submittedJobId, transferMethod.IpAddress); } catch (Exception e) { log.Error("Error in removing data transfer method for submitted job info ID " + transferMethod.submittedJobId + ": {0}", e); } } else log.Error("Job " + transferMethod.submittedJobId + " with IP " + transferMethod.IpAddress + " does not have an active socket."); } public int WriteDataToJobNode(byte[] data, long submittedJobInfoId, string ipAddress, string sessionCode, bool closeConnection) { SubmittedJobInfo jobInfo = unitOfWork.SubmittedJobInfoRepository.GetById(submittedJobInfoId); int bytesSent = 0; if (SshTunnelExist(jobInfo, ipAddress)) { if (!jobIpSockets.Keys.Contains(submittedJobInfoId) || !jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress)) { int allocatedPort = ipLocalPorts[ipAddress]; //create new socket CreateLocalSocketConnection(submittedJobInfoId, ipAddress, allocatedPort); } //else //{ if (data != null) { Socket sock = jobIpSockets[submittedJobInfoId][ipAddress]; try { bytesSent = sock.Send(data); log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " sent " + bytesSent + " bytes."); } catch (Exception e) { log.Error("Socket write error: {0}", e); } } else { log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " : nothing to send, NULL data value."); } if (closeConnection) { Socket sock = jobIpSockets[submittedJobInfoId][ipAddress]; sock.Shutdown(SocketShutdown.Send); log.Info("Closed SEND direction in socket for submitted job info Id: " + submittedJobInfoId + " and remote IP address: " + ipAddress); } //} } else log.Error("Job " + submittedJobInfoId + " with IP remote " + ipAddress + " does not have an active connection."); /** if (jobIpSockets.Keys.Contains(submittedJobInfoId) && jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress)) { Socket sock = jobIpSockets[submittedJobInfoId][ipAddress]; try { bytesSent = sock.Send(data); log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " sent " + bytesSent + " bytes."); } catch (Exception e) { log.Error("Socket write error: {0}", e); } if (closeConnection) { sock.Shutdown(SocketShutdown.Send); log.Info("Closed SEND direction in socket for submitted job info Id: " + submittedJobInfoId + " and remote IP address: " + ipAddress); } } else if (SshTunnelExist(jobInfo, ipAddress)) { int allocatedPort = ipLocalPorts[ipAddress]; //create new socket CreateLocalSocketConnection(submittedJobInfoId, ipAddress, allocatedPort); } else log.Error("Job " + submittedJobInfoId + " with IP remote " + ipAddress + " does not have an active connection."); */ return bytesSent; } public byte[] ReadDataFromJobNode(long submittedJobInfoId, string ipAddress, string sessionCode) { if (jobIpSockets.Keys.Contains(submittedJobInfoId) && jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress)) { Socket sock = jobIpSockets[submittedJobInfoId][ipAddress]; byte[] buffer = new byte[1048576]; byte[] retData = new byte[0]; try { int bytesRec = sock.Receive(buffer); if (bytesRec > 0) { retData = new byte[bytesRec]; Array.Copy(buffer, retData, bytesRec); } else { log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " received 0 bytes - returning null and closing socket."); CloseLocalSocketConnection(submittedJobInfoId, ipAddress); return null; } log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " received " + bytesRec + " bytes."); } catch (Exception e) { log.Error("Socket read error: {0}", e); } return retData; } else { log.Error("Job " + submittedJobInfoId + " with remote IP " + ipAddress + " does not have an active socket."); return null; } } public List<long> GetJobIdsForOpenSockets() { List<long> keyList = new List<long>(jobIpSockets.Keys); return keyList; } public void CloseAllConnectionsForJob(SubmittedJobInfo jobInfo) { foreach (string ipAddress in jobIpSockets[jobInfo.Id].Keys.ToList()) { //close tunnel RemoveSshTunnel(jobInfo, ipAddress); //close socket CloseLocalSocketConnection(jobInfo.Id, ipAddress); } jobIpSockets.Remove(jobInfo.Id); log.InfoFormat("Closed all connections for jobId {0}", jobInfo.Id); } #region Protected methods protected int CreateSshTunnel(SubmittedJobInfo jobInfo, string localHost, int localPort, string loginHost, string nodeHost, int nodePort) { while (!IsPortFree(localHost, localPort)) ++localPort; SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).CreateSshTunnel( jobInfo.Specification.Id, localHost, localPort, loginHost, nodeHost, nodePort, jobInfo.Specification.ClusterUser ); return localPort; } protected void RemoveSshTunnel(SubmittedJobInfo jobInfo, string nodeHost) { SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).RemoveSshTunnel( jobInfo.Specification.Id, nodeHost ); } protected bool SshTunnelExist(SubmittedJobInfo jobInfo, string nodeHost) { return SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).SshTunnelExist( jobInfo.Specification.Id, nodeHost ); } protected bool IsPortFree(string ipAddress, int port) { try { IPAddress address = IPAddress.Parse(ipAddress); TcpListener tcpListener = new TcpListener(address, port); tcpListener.Start(); tcpListener.Stop(); return true; } catch (SocketException ex) { return false; } } protected void CreateLocalSocketConnection(long jobId, string remoteIp, int localPort) { if (jobIpSockets.Keys.Contains(jobId) && jobIpSockets[jobId].Keys.Contains(remoteIp)) log.Error("Job " + jobId + " with remote IP " + remoteIp + " already has socket a connection."); else { Socket socket = null; try { //create socket IPAddress address = IPAddress.Parse("127.0.0.1"); IPEndPoint remoteEP = new IPEndPoint(address, localPort); socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.NoDelay = true; //connect to remote host socket.Connect(remoteEP); //add socket to dictionary if (jobIpSockets.Keys.Contains(jobId)) jobIpSockets[jobId].Add(remoteIp, socket); else jobIpSockets.Add(jobId, new Dictionary<string, Socket> { { remoteIp, socket } }); log.Info("New socket connection created for job " + jobId + " with remote IP " + remoteIp + " and local port " + localPort); } catch (Exception e) { log.Error("Socket creation error: {0}", e); } } } protected bool SocketConnected(Socket socket) { try { return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0); } catch (SocketException) { return false; } } protected void CloseLocalSocketConnection(long jobId, string remoteIp) { if (jobIpSockets.Keys.Contains(jobId) && jobIpSockets[jobId].Keys.Contains(remoteIp)) { Socket sock = jobIpSockets[jobId][remoteIp]; try { sock.Shutdown(SocketShutdown.Both); sock.Close(); } catch (Exception e) { log.Error("Error in closing socket connection for submitted job info ID " + jobId + ": {0}", e); } jobIpSockets[jobId].Remove(remoteIp); log.Info("Closed socket for submitted job info Id: " + jobId + " and remote IP address: " + remoteIp); } else log.Error("Job " + jobId + " with IP " + remoteIp + " does not have an active socket."); } #endregion } }