FIX::ThreadedSocketConnection Class Reference

Encapsulates a socket file descriptor (multi-threaded). More...

#include <ThreadedSocketConnection.h>

Inheritance diagram for FIX::ThreadedSocketConnection:
Collaboration diagram for FIX::ThreadedSocketConnection:

Public Types

typedef std::set< SessionIDSessions

Public Member Functions

 ThreadedSocketConnection (int s, Sessions sessions, Log *pLog)
 ThreadedSocketConnection (const SessionID &, int s, const std::string &address, short port, Log *pLog, const std::string &sourceAddress="", short sourcePort=0)
virtual ~ThreadedSocketConnection ()
SessiongetSession () const
int getSocket () const
bool connect ()
void disconnect ()
bool read ()

Private Member Functions

bool readMessage (std::string &msg) throw ( SocketRecvFailed )
void processStream ()
bool send (const std::string &)
bool setSession (const std::string &msg)
Private Member Functions inherited from FIX::Responder
virtual ~Responder ()

Private Attributes

int m_socket
char m_buffer [BUFSIZ]
std::string m_address
int m_port
std::string m_sourceAddress
int m_sourcePort
Logm_pLog
Parser m_parser
Sessions m_sessions
Sessionm_pSession
bool m_disconnect
fd_set m_fds

Detailed Description

Encapsulates a socket file descriptor (multi-threaded).

Definition at line 44 of file ThreadedSocketConnection.h.

Member Typedef Documentation

◆ Sessions

Definition at line 47 of file ThreadedSocketConnection.h.

Constructor & Destructor Documentation

◆ ThreadedSocketConnection() [1/2]

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( int s,
Sessions sessions,
Log * pLog )

◆ ThreadedSocketConnection() [2/2]

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( const SessionID & sessionID,
int s,
const std::string & address,
short port,
Log * pLog,
const std::string & sourceAddress = "",
short sourcePort = 0 )

Definition at line 44 of file ThreadedSocketConnection.cpp.

49 : m_socket( s ), m_address( address ), m_port( port ),
50 m_sourceAddress( sourceAddress ), m_sourcePort( sourcePort ),
51 m_pLog( pLog ),
52 m_pSession( Session::lookupSession( sessionID ) ),
53 m_disconnect( false )
54{
55 FD_ZERO( &m_fds );
56 FD_SET( m_socket, &m_fds );
57 if ( m_pSession ) m_pSession->setResponder( this );
58}
static Session * lookupSession(const SessionID &)
Definition Session.cpp:1496

References m_address, m_disconnect, m_fds, m_pLog, m_port, m_pSession, m_socket, m_sourceAddress, and m_sourcePort.

◆ ~ThreadedSocketConnection()

FIX::ThreadedSocketConnection::~ThreadedSocketConnection ( )
virtual

Definition at line 60 of file ThreadedSocketConnection.cpp.

61{
62 if ( m_pSession )
63 {
64 m_pSession->setResponder( 0 );
65 Session::unregisterSession( m_pSession->getSessionID() );
66 }
67}
static void unregisterSession(const SessionID &)
Definition Session.cpp:1547

References m_pSession, and FIX::Session::unregisterSession().

Member Function Documentation

◆ connect()

bool FIX::ThreadedSocketConnection::connect ( )

Definition at line 82 of file ThreadedSocketConnection.cpp.

83{
84 // do the bind in the thread as name resolution may block
85 if ( !m_sourceAddress.empty() || m_sourcePort )
87
88 return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0;
89}
int socket_connect(int socket, const char *address, int port)
Definition Utility.cpp:148
int socket_bind(int socket, const char *hostname, int port)
Definition Utility.cpp:103

References getSocket(), m_address, m_port, m_socket, m_sourceAddress, m_sourcePort, FIX::socket_bind(), and FIX::socket_connect().

Referenced by FIX::ThreadedSocketInitiator::socketThread().

◆ disconnect()

void FIX::ThreadedSocketConnection::disconnect ( )
virtual

◆ getSession()

Session * FIX::ThreadedSocketConnection::getSession ( ) const
inline

Definition at line 56 of file ThreadedSocketConnection.h.

56{ return m_pSession; }

References m_pSession.

Referenced by FIX::ThreadedSocketInitiator::socketThread().

◆ getSocket()

int FIX::ThreadedSocketConnection::getSocket ( ) const
inline

◆ processStream()

void FIX::ThreadedSocketConnection::processStream ( )
private

Definition at line 156 of file ThreadedSocketConnection.cpp.

157{
158 std::string msg;
159 while( readMessage(msg) )
160 {
161 if ( !m_pSession )
162 {
163 if ( !setSession( msg ) )
164 { disconnect(); continue; }
165 }
166 try
167 {
168 m_pSession->next( msg, UtcTimeStamp() );
169 }
170 catch( InvalidMessage& )
171 {
172 if( !m_pSession->isLoggedOn() )
173 {
174 disconnect();
175 return;
176 }
177 }
178 }
179}
bool setSession(const std::string &msg)

References disconnect(), m_pSession, readMessage(), and setSession().

Referenced by read().

◆ read()

bool FIX::ThreadedSocketConnection::read ( )

Definition at line 97 of file ThreadedSocketConnection.cpp.

98{
99 struct timeval timeout = { 1, 0 };
100 fd_set readset = m_fds;
101
102 try
103 {
104 // Wait for input (1 second timeout)
105 int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
106
107 if( result > 0 ) // Something to read
108 {
109 // We can read without blocking
110 ssize_t size = socket_recv( m_socket, m_buffer, sizeof(m_buffer) );
111 if ( size <= 0 ) { throw SocketRecvFailed( size ); }
112 m_parser.addToStream( m_buffer, size );
113 }
114 else if( result == 0 && m_pSession ) // Timeout
115 {
116 m_pSession->next();
117 }
118 else if( result < 0 ) // Error
119 {
120 throw SocketRecvFailed( result );
121 }
122
124 return true;
125 }
126 catch ( SocketRecvFailed& e )
127 {
128 if( m_disconnect )
129 return false;
130
131 if( m_pSession )
132 {
133 m_pSession->getLog()->onEvent( e.what() );
134 m_pSession->disconnect();
135 }
136 else
137 {
138 disconnect();
139 }
140
141 return false;
142 }
143}
ssize_t socket_recv(int s, char *buf, size_t length)
Definition Utility.cpp:170

References disconnect(), m_buffer, m_disconnect, m_fds, m_parser, m_pSession, m_socket, processStream(), and FIX::socket_recv().

Referenced by FIX::ThreadedSocketAcceptor::socketConnectionThread(), and FIX::ThreadedSocketInitiator::socketThread().

◆ readMessage()

bool FIX::ThreadedSocketConnection::readMessage ( std::string & msg)
throw (SocketRecvFailed )
private

Definition at line 145 of file ThreadedSocketConnection.cpp.

147{
148 try
149 {
150 return m_parser.readFixMessage( msg );
151 }
152 catch ( MessageParseError& ) {}
153 return true;
154}

References m_parser.

Referenced by processStream().

◆ send()

bool FIX::ThreadedSocketConnection::send ( const std::string & msg)
privatevirtual

Implements FIX::Responder.

Definition at line 69 of file ThreadedSocketConnection.cpp.

70{
71 int totalSent = 0;
72 while(totalSent < (int)msg.length())
73 {
74 ssize_t sent = socket_send( m_socket, msg.c_str() + totalSent, msg.length() );
75 if(sent < 0) return false;
76 totalSent += sent;
77 }
78
79 return true;
80}
ssize_t socket_send(int s, const char *msg, size_t length)
Definition Utility.cpp:175

References m_socket, and FIX::socket_send().

◆ setSession()

bool FIX::ThreadedSocketConnection::setSession ( const std::string & msg)
private

Definition at line 181 of file ThreadedSocketConnection.cpp.

182{
183 m_pSession = Session::lookupSession( msg, true );
184 if ( !m_pSession )
185 {
186 if( m_pLog )
187 {
188 m_pLog->onEvent( "Session not found for incoming message: " + msg );
189 m_pLog->onIncoming( msg );
190 }
191 return false;
192 }
193
194 SessionID sessionID = m_pSession->getSessionID();
195 m_pSession = 0;
196
197 // see if the session frees up within 5 seconds
198 for( int i = 1; i <= 5; i++ )
199 {
200 if( !Session::isSessionRegistered( sessionID ) )
202 if( m_pSession ) break;
203 process_sleep( 1 );
204 }
205
206 if ( !m_pSession )
207 return false;
208 if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
209 return false;
210
211 m_pSession->setResponder( this );
212 return true;
213}
static Session * registerSession(const SessionID &)
Definition Session.cpp:1537
static bool isSessionRegistered(const SessionID &)
Definition Session.cpp:1531
void process_sleep(double s)
Definition Utility.cpp:466

References FIX::Session::isSessionRegistered(), FIX::Session::lookupSession(), m_pLog, m_pSession, m_sessions, FIX::process_sleep(), and FIX::Session::registerSession().

Referenced by processStream().

Member Data Documentation

◆ m_address

std::string FIX::ThreadedSocketConnection::m_address
private

Definition at line 71 of file ThreadedSocketConnection.h.

Referenced by connect(), and ThreadedSocketConnection().

◆ m_buffer

char FIX::ThreadedSocketConnection::m_buffer[BUFSIZ]
private

Definition at line 69 of file ThreadedSocketConnection.h.

Referenced by read().

◆ m_disconnect

bool FIX::ThreadedSocketConnection::m_disconnect
private

◆ m_fds

fd_set FIX::ThreadedSocketConnection::m_fds
private

◆ m_parser

Parser FIX::ThreadedSocketConnection::m_parser
private

Definition at line 77 of file ThreadedSocketConnection.h.

Referenced by read(), and readMessage().

◆ m_pLog

Log* FIX::ThreadedSocketConnection::m_pLog
private

◆ m_port

int FIX::ThreadedSocketConnection::m_port
private

Definition at line 72 of file ThreadedSocketConnection.h.

Referenced by connect(), and ThreadedSocketConnection().

◆ m_pSession

Session* FIX::ThreadedSocketConnection::m_pSession
private

◆ m_sessions

Sessions FIX::ThreadedSocketConnection::m_sessions
private

Definition at line 78 of file ThreadedSocketConnection.h.

Referenced by setSession(), and ThreadedSocketConnection().

◆ m_socket

int FIX::ThreadedSocketConnection::m_socket
private

◆ m_sourceAddress

std::string FIX::ThreadedSocketConnection::m_sourceAddress
private

Definition at line 73 of file ThreadedSocketConnection.h.

Referenced by connect(), and ThreadedSocketConnection().

◆ m_sourcePort

int FIX::ThreadedSocketConnection::m_sourcePort
private

Definition at line 74 of file ThreadedSocketConnection.h.

Referenced by connect(), and ThreadedSocketConnection().


The documentation for this class was generated from the following files:

Generated on for QuickFIX by doxygen 1.15.0 written by Dimitri van Heesch, © 1997-2001