Skip to content
Snippets Groups Projects
DataTransferLogic.cs 13.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • Vaclav Svaton's avatar
    Vaclav Svaton committed
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reflection;
    using HaaSMiddleware.BusinessLogicTier.Factory;
    using HaaSMiddleware.DataAccessTier.UnitOfWork;
    using HaaSMiddleware.DomainObjects.DataTransfer;
    using HaaSMiddleware.DomainObjects.UserAndLimitationManagement;
    using HaaSMiddleware.DomainObjects.JobManagement.JobInformation;
    using HaaSMiddleware.HpcConnectionFramework;
    using log4net;
    using System.Threading;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using HaaSMiddleware.BusinessLogicTier.Logic.DataTransfer.Exceptions;
    
    namespace HaaSMiddleware.BusinessLogicTier.Logic.DataTransfer {
    	internal class DataTransferLogic : IDataTransferLogic {
    		private static readonly ILog log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
    		private readonly IUnitOfWork unitOfWork;
    
            //local sockets for jobs and ips
            private static Dictionary<long, Dictionary<string, Socket>> jobIpSockets = new Dictionary<long, Dictionary<string, Socket>>();
            private static Dictionary<string, int> ipLocalPorts = new Dictionary<string, int>();
           
            internal DataTransferLogic(IUnitOfWork unitOfWork)
            {
    			this.unitOfWork = unitOfWork;
    		}
    
            public DataTransferMethod GetDataTransferMethod(string ipAddress, int port, SubmittedJobInfo jobInfo, AdaptorUser loggedUser)
            {
                if (jobInfo.State > JobState.Running)
                    throw new UnableToCreateConnectionException("JobId " + jobInfo.Id + " is not a currently running job.");
    
    			log.Info("Getting data transfer method for submitted job info ID " + jobInfo.Id + " with user " + loggedUser.GetLogIdentification());
                try
                {
                    //create tunnel
                    int allocatedPort = CreateSshTunnel(jobInfo, "127.0.0.1", 4000, "salomon.it4i.cz", ipAddress, port);
                    ipLocalPorts.Add(ipAddress, allocatedPort);
    
                    //create socket
                    CreateLocalSocketConnection(jobInfo.Id, ipAddress, allocatedPort);
    
                    DataTransferMethod dtMethod = new DataTransferMethod();
                    dtMethod.submittedJobId = jobInfo.Id;
                    dtMethod.IpAddress = ipAddress;
                    dtMethod.Port = port;
    
                    return dtMethod;
                }
                catch(Exception e)
                {
                    RemoveSshTunnel(jobInfo, ipAddress);
                    CloseLocalSocketConnection(jobInfo.Id, ipAddress);
    
                    log.Error("GetDataTransferMethod error: {0}", e);
                    return null;
                }
    		}
    
            public void EndDataTransfer(DataTransferMethod transferMethod, SubmittedJobInfo jobInfo, AdaptorUser loggedUser) 
            {
    			log.Info("Removing data transfer method for submitted job info ID " + transferMethod.submittedJobId + " with user " + loggedUser.GetLogIdentification());
                if (jobIpSockets.Keys.Contains(transferMethod.submittedJobId))
                {
                    try
                    {
                        //remove tunnel
                        ipLocalPorts.Remove(transferMethod.IpAddress);
                        RemoveSshTunnel(jobInfo, transferMethod.IpAddress);
                        //remove socket
                        CloseLocalSocketConnection(transferMethod.submittedJobId, transferMethod.IpAddress);
                    }
                    catch (Exception e)
                    {
                        log.Error("Error in removing data transfer method for submitted job info ID " + transferMethod.submittedJobId + ": {0}", e);
                    }
                }
                else log.Error("Job " + transferMethod.submittedJobId + " with IP " + transferMethod.IpAddress + " does not have an active socket.");
    		}
    
            public int WriteDataToJobNode(byte[] data, long submittedJobInfoId, string ipAddress, string sessionCode, bool closeConnection)
            {
                SubmittedJobInfo jobInfo = unitOfWork.SubmittedJobInfoRepository.GetById(submittedJobInfoId);
    
                int bytesSent = 0;
    
                if (SshTunnelExist(jobInfo, ipAddress))
                {
                    if (!jobIpSockets.Keys.Contains(submittedJobInfoId) || !jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
                    {
                        int allocatedPort = ipLocalPorts[ipAddress];
    
                        //create new socket
                        CreateLocalSocketConnection(submittedJobInfoId, ipAddress, allocatedPort);
                    }
                    //else
                    //{
                        if (data != null)
                        {
                            Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
                            try
                            {
                                bytesSent = sock.Send(data);
                                log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " sent " + bytesSent + " bytes.");
                            }
                            catch (Exception e)
                            {
                                log.Error("Socket write error: {0}", e);
                            }
                        }
                        else
                        {
                            log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " : nothing to send, NULL data value.");
                        }
    
                        if (closeConnection)
                        {
                            Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
                            sock.Shutdown(SocketShutdown.Send);
                            log.Info("Closed SEND direction in socket for submitted job info Id: " + submittedJobInfoId + " and remote IP address: " + ipAddress);
                        }
                    //}
                }
                else log.Error("Job " + submittedJobInfoId + " with IP remote " + ipAddress + " does not have an active connection.");
    
                /**
                if (jobIpSockets.Keys.Contains(submittedJobInfoId) && jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
                {
                    Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
                    try
                    {
                        bytesSent = sock.Send(data);
                        log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " sent " + bytesSent + " bytes.");
                    }
                    catch (Exception e)
                    {
                        log.Error("Socket write error: {0}", e);
                    }
    
                    if (closeConnection)
                    {
                        sock.Shutdown(SocketShutdown.Send);
                        log.Info("Closed SEND direction in socket for submitted job info Id: " + submittedJobInfoId + " and remote IP address: " + ipAddress);
                    }
                }
                else if (SshTunnelExist(jobInfo, ipAddress))
                {
                    int allocatedPort = ipLocalPorts[ipAddress];
    
                    //create new socket
                    CreateLocalSocketConnection(submittedJobInfoId, ipAddress, allocatedPort);
                }
                else log.Error("Job " + submittedJobInfoId + " with IP remote " + ipAddress + " does not have an active connection.");
                */
    
                return bytesSent;
            }
    
            public byte[] ReadDataFromJobNode(long submittedJobInfoId, string ipAddress, string sessionCode)
            {
                if (jobIpSockets.Keys.Contains(submittedJobInfoId) && jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
                {
                    Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
                    byte[] buffer = new byte[1048576];
                    byte[] retData = new byte[0];
    
                    try
                    {
                        int bytesRec = sock.Receive(buffer);
                        if (bytesRec > 0)
                        {
                            retData = new byte[bytesRec];
                            Array.Copy(buffer, retData, bytesRec);
                        } else {
                            log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " received 0 bytes - returning null and closing socket.");
                            CloseLocalSocketConnection(submittedJobInfoId, ipAddress);
                            return null; 
                        }
                        log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " received " + bytesRec + " bytes.");
                    }
                    catch (Exception e)
                    {
                        log.Error("Socket read error: {0}", e);
                    }
                    return retData;
                }
                else
                {
                    log.Error("Job " + submittedJobInfoId + " with remote IP " + ipAddress + " does not have an active socket.");
                    return null;
                }
            }
    
            public List<long> GetJobIdsForOpenSockets()
            {
                List<long> keyList = new List<long>(jobIpSockets.Keys);
                return keyList;
            }
    
            public void CloseAllConnectionsForJob(SubmittedJobInfo jobInfo)
            {
                foreach (string ipAddress in jobIpSockets[jobInfo.Id].Keys.ToList())
                {
                    //close tunnel
                    RemoveSshTunnel(jobInfo, ipAddress);
                    //close socket
                    CloseLocalSocketConnection(jobInfo.Id, ipAddress);
                }
                jobIpSockets.Remove(jobInfo.Id);
                log.InfoFormat("Closed all connections for jobId {0}", jobInfo.Id);
            }
    
            #region Protected methods
    
            protected int CreateSshTunnel(SubmittedJobInfo jobInfo, string localHost, int localPort, string loginHost, string nodeHost, int nodePort)
            {
                while (!IsPortFree(localHost, localPort))
                    ++localPort;
    
                SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).CreateSshTunnel(
                    jobInfo.Specification.Id, localHost, localPort, loginHost, nodeHost, nodePort, jobInfo.Specification.ClusterUser
                );
    
                return localPort;
            }
    
            protected void RemoveSshTunnel(SubmittedJobInfo jobInfo, string nodeHost)
            {
                SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).RemoveSshTunnel(
                    jobInfo.Specification.Id, nodeHost
                );
            }
    
            protected bool SshTunnelExist(SubmittedJobInfo jobInfo, string nodeHost)
            {
                return SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).SshTunnelExist(
                    jobInfo.Specification.Id, nodeHost
                );
            }
    
            protected bool IsPortFree(string ipAddress, int port)
            {
                try
                {
                    IPAddress address = IPAddress.Parse(ipAddress);
                    TcpListener tcpListener = new TcpListener(address, port);
                    tcpListener.Start();
                    tcpListener.Stop();
                    return true;
                }
                catch (SocketException ex)
                {
                    return false;
                }
            }
    
            protected void CreateLocalSocketConnection(long jobId, string remoteIp, int localPort)
            {
                if (jobIpSockets.Keys.Contains(jobId) && jobIpSockets[jobId].Keys.Contains(remoteIp))
                    log.Error("Job " + jobId + " with remote IP " + remoteIp + " already has socket a connection.");
                else
                {
                    Socket socket = null;
                    try
                    {
                        //create socket
                        IPAddress address = IPAddress.Parse("127.0.0.1");
                        IPEndPoint remoteEP = new IPEndPoint(address, localPort);
                        socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                        socket.NoDelay = true;
                        
                        //connect to remote host
                        socket.Connect(remoteEP);
                        
                        //add socket to dictionary
                        if (jobIpSockets.Keys.Contains(jobId))
                            jobIpSockets[jobId].Add(remoteIp, socket);
                        else jobIpSockets.Add(jobId, new Dictionary<string, Socket> { { remoteIp, socket } });
    
                        log.Info("New socket connection created for job " + jobId + " with remote IP " + remoteIp + " and local port " + localPort);
                    }
                    catch (Exception e)
                    {
                        log.Error("Socket creation error: {0}", e);
                    }
                }
            }
    
            protected bool SocketConnected(Socket socket)
            {
                try
                {
                    return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0);
                }
                catch (SocketException) { return false; }
            }
    
            protected void CloseLocalSocketConnection(long jobId, string remoteIp)
            {
                if (jobIpSockets.Keys.Contains(jobId) && jobIpSockets[jobId].Keys.Contains(remoteIp))
                {
                    Socket sock = jobIpSockets[jobId][remoteIp];
                    try
                    {
                        sock.Shutdown(SocketShutdown.Both);
                        sock.Close();
                    }
                    catch (Exception e)
                    {
                        log.Error("Error in closing socket connection for submitted job info ID " + jobId + ": {0}", e);
                    }
                    jobIpSockets[jobId].Remove(remoteIp);
                    log.Info("Closed socket for submitted job info Id: " + jobId + " and remote IP address: " + remoteIp);
                }
                else log.Error("Job " + jobId + " with IP " + remoteIp + " does not have an active socket.");
            }
    
            #endregion
        }
    }