Thursday, December 13, 2012

Writing a Server Socket for Bloomberg's blpapi

Problem:

You have several internal applications (Java, Python, Excel, etc.) that need prices from Bloomberg. What is an ideal way to deal with this?

Solution:

Use Bloomberg's blpapi and create an application that listens to a particular port.

Walkthrough of Implementation:

Bloomberg's blpapi now come in 4 flavors - Java, .NET, C/C++, Python. A prerequisite in being able to talk to Bloomberg servers is having a Bloomberg terminal installed in the local machine. These Bloomberg terminals are naturally Windows only applications and therefore our language of choice is .NET.

I started from scratch. I downloaded the latest blpapi from here. I also downloaded VS 2012 Express for Windows Desktop here.

Next, I created a windows application project (not a console application). Why? Here are a few reasons:
  • I want this application to have a single Form in order for me to diagnose.
  • I don't want my end-user to see a console lying around the desktop.
  • I also want this Form to live in the system tray once the application has started up. I only bring it up when I want to diagnose.
Using the PROJECT menu:
  • Add Bloomberglp.Blpapi.dll via Add References.
  • Install log4net package via Manage NuGet Packages. 
 Now we are ready to code.

Start by writing the class which will handle all the requests.

using Event = Bloomberglp.Blpapi.Event;
using Element = Bloomberglp.Blpapi.Element;
using InvalidRequestException = Bloomberglp.Blpapi.InvalidRequestException;
using Message = Bloomberglp.Blpapi.Message;
using Name = Bloomberglp.Blpapi.Name;
using Request = Bloomberglp.Blpapi.Request;
using Service = Bloomberglp.Blpapi.Service;
using Session = Bloomberglp.Blpapi.Session;
using SessionOptions = Bloomberglp.Blpapi.SessionOptions;
using Identity = Bloomberglp.Blpapi.Identity;
using CorrelationID = Bloomberglp.Blpapi.CorrelationID;
using TraceLevel = System.Diagnostics.TraceLevel;
using Datatype = Bloomberglp.Blpapi.Schema.Datatype;
using String = System.String;
using ArrayList = System.Collections.ArrayList;
using Math = System.Math;
using Exception = System.Exception;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using System.Xml;
using System.IO;
using log4net;

namespace your.choice.of.namespace
{
    /// <summary>
    /// Class that takes care of direct communication with Bloomberg's blpapi.
    /// </summary>
    [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
    class BloombergRequestHandler
    {

        private static readonly ILog log = log4net.LogManager.GetLogger(typeof(BloombergRequestHandler));
        protected Session session;
        protected int uuid { get; set; }
        protected String ipAddress { get; set; }

        private const String INPUT_INSTRUMENT = "instrument";
        private const String INPUT_FIELDS = "fields";
        private const String INPUT_OVERRIDES = "overrides";
        private const String INPUT_OVERRIDE = "override";

        private static readonly Name SECURITIES = new Name("securities");
        private static readonly Name FIELDS = new Name("fields");
        private static readonly Name SECURITY_DATA = new Name("securityData");
        private static readonly Name FIELD_DATA = new Name("fieldData");
        private static readonly Name SECURITY = new Name("security");
        private static readonly Name FIELD_ID = new Name("fieldId");
        private static readonly Name VALUE = new Name("value");
        private static readonly Name OVERRIDES = new Name("overrides");
        private static readonly Name EIDDATA = new Name("eidData");

        private const String SESSION_TERMINATED = "SessionTerminated";
        private const String SESSION_STARTUP_FAILURE = "SessionStartupFailure";

        protected const String REFDATASERVICE = "//blp/refdata";
        protected const String REFDATAREQUEST = "ReferenceDataRequest";

        protected const String HOST = "localhost";
        protected const int PORT = 8194;

        public void openServices()
        {
            try
            {
                log.Info("Opening //blp/refdata service...");
                session.OpenService(REFDATASERVICE);
                log.Info("//blp/refdata service opened.");
            }
            catch (Exception)
            {
                log.Fatal("Failed opening //blp/refdata service.");
                log.Fatal("Application was launched on a machine without Bloomberg Communication Process installed.");
                log.Fatal("Please make sure that either Desktop API or Server API is installed.");
            }

        }
        public virtual void startSession()
        {
            SessionOptions sessionOptions = new SessionOptions();
            sessionOptions.ServerHost = HOST;
            sessionOptions.ServerPort = PORT;
            session = new Session(sessionOptions);

            try
            {
                log.Info("Starting session...");
                session.Start();
                log.Info("Session started.");
            }
            catch (Exception)
            {
                log.Error("Failure in startSession");
            }
        }

        public void killSession()
        {
            if (session != null)
            {
                try
                {
                    session.Stop();
                }
                catch (Exception)
                {
                    session = null;
                    log.Error("Failure in killSession");
                }
            }
        }

        public BloombergRequestHandler()
        {
            startSession();
            openServices();
        }

        private String generateResponseXMLNode(String tag, String value)
        {
            StringBuilder sb = new StringBuilder();
            sb.Append("<");
            sb.Append(tag);
            sb.Append(">");
            sb.Append(value);
            sb.Append("</");
            sb.Append(tag);
            sb.Append(">");
            return sb.ToString();
        }

        private void processScalarField(StringBuilder sb, Element secField)
        {
            log.Info("Processing scalar RefData " + secField.Name + " = " + secField.GetValueAsString());
            sb.Append(generateResponseXMLNode(secField.Name.ToString(), secField.GetValueAsString()));
        }

        private void processVectorField(StringBuilder sb, Element secField)
        {
            log.Info("Processing vector RefData " + secField.Name);
            sb.Append("<");
            sb.Append(secField.Name.ToString());
            sb.Append(">");
            int numValues = secField.NumValues;
            for (int bvCtr = 0; bvCtr < numValues; bvCtr++)
            {
                Element bulkElement = secField.GetValueAsElement(bvCtr);
                int numBulkValues = bulkElement.NumElements;
                sb.Append("<");
                sb.Append(bulkElement.Name.ToString());
                sb.Append(">");
                for (int beCtr = 0; beCtr < numBulkValues; beCtr++)
                {
                    Element elem = bulkElement.GetElement(beCtr);
                    sb.Append(generateResponseXMLNode(elem.Name.ToString(), elem.GetValueAsString()));
                }
                sb.Append("</");
                sb.Append(bulkElement.Name.ToString());
                sb.Append(">");
            }
            sb.Append("</");
            sb.Append(secField.Name.ToString());
            sb.Append(">");
        }

        private void processResponseEvent(Event eventObj, StringBuilder sb)
        {
            foreach (Message msg in eventObj)
            {
                Element secDataArray = msg.GetElement(SECURITY_DATA);
                int numSecDataArray = secDataArray.NumValues;
                for (int i = 0; i < numSecDataArray; ++i)
                {
                    Element secData = secDataArray.GetValueAsElement(i);
                    String security = secData.GetElementAsString(SECURITY);
                    log.Info("Processing results for : " + security);
                    if (secData.HasElement(FIELD_DATA))
                    {
                        Element secFields = secData.GetElement(FIELD_DATA);
                        if (secFields.NumElements > 0)
                        {
                            int numFields = secFields.NumElements;
                            for (int j = 0; j < numFields; ++j)
                            {
                                Element secField = secFields.GetElement(j);
                                if (secField.Datatype.Equals(Datatype.SEQUENCE))
                                {
                                    processVectorField(sb, secField);
                                }
                                else
                                {
                                    processScalarField(sb, secField);
                                }
                            }
                        }
                    }
                }
            }
        }

        private void eventLoop(StringBuilder sb)
        {
            bool done = false;
            while (!done)
            {
                Event eventObj = session.NextEvent();
                if (eventObj.Type.Equals(Event.EventType.PARTIAL_RESPONSE))
                {
                    processResponseEvent(eventObj, sb);
                }
                else if (eventObj.Type.Equals(Event.EventType.RESPONSE))
                {
                    processResponseEvent(eventObj, sb);
                    done = true;
                }
                else
                {
                    foreach (Message msg in eventObj)
                    {
                        if (eventObj.Type.Equals(Event.EventType.SESSION_STATUS))
                        {
                            if (msg.MessageType.Equals(SESSION_TERMINATED) || msg.MessageType.Equals(SESSION_STARTUP_FAILURE))
                            {
                                done = true;
                            }
                        }
                    }
                }
            }
        }

        private String sendRefDataRequest(String instrument, ArrayList paramFields, Dictionary<String, object> paramOverrides)
        {
            StringBuilder sb = new StringBuilder();
            if (session != null)
            {

                try
                {
                    Service refDataService = session.GetService(REFDATASERVICE);
                    Request request = refDataService.CreateRequest(REFDATAREQUEST);

                    // add securities to request
                    Element securities = request.GetElement(SECURITIES);
                    securities.AppendValue(instrument);

                    // add fields to request
                    Element fields = request.GetElement(FIELDS);
                    foreach (String field in paramFields)
                    {
                        fields.AppendValue(field.ToString());
                    }

                    // add scalar overrides
                    Element overrides = request.GetElement(OVERRIDES);
                    foreach (KeyValuePair<String, object> pair in paramOverrides)
                    {
                        Element ovr = overrides.AppendElement();
                        ovr.SetElement(FIELD_ID, pair.Key);
                        ovr.SetElement(VALUE, pair.Value.ToString());
                    }

                    session.SendRequest(request, null);
                    eventLoop(sb);
                }
                catch (Exception)
                {
                    // nothing further
                }
            }

            return sb.ToString();
        }

        private Dictionary<String, object> generateInputs(String inputXML)
        {
            Dictionary<String, object> inputs = new Dictionary<String, object>();
            Dictionary<String, object> overrides = new Dictionary<String, object>();
            log.Info("XML: " + inputXML);

            XmlDocument doc = new XmlDocument();
            doc.LoadXml(inputXML);
            XmlElement root = doc.DocumentElement;

            foreach(XmlElement e in root){
                if (e.Name.ToLower().Equals(INPUT_INSTRUMENT)) {
                    inputs.Add(INPUT_INSTRUMENT, e.InnerText);
                }
            }

            foreach (XmlElement e in root)
            {
                if (e.Name.ToLower().Equals(INPUT_FIELDS))
                {
                    ArrayList inputFields = new ArrayList();
                    String[] fields = e.InnerText.Split(',');
                    foreach (String sField in fields)
                    {
                        inputFields.Add(sField);
                    }
                    inputs.Add(INPUT_FIELDS, inputFields);
                }
            }

            foreach (XmlElement e in root)
            {
                if (e.Name.ToLower().Equals(INPUT_OVERRIDES))
                {
                    foreach (XmlElement inner in e)
                    {
                        if (inner.Name.ToLower().Equals(INPUT_OVERRIDE))
                        {
                            String[] fields = inner.InnerText.Split(',');
                            overrides.Add(fields[0], fields[1]);
                        }
                    }
                }
            }

            inputs.Add(INPUT_OVERRIDES, overrides);
            return inputs;
        }

        /// <summary>
        /// Generates an XML output for apps that gather data with BBCommAdapter.
        /// </summary>
        /// <param name="inputXML">input XML that contains instrument, fields, and/or overrides</param>
        /// <returns>XML string whose tags are the Bloomberg FLDS fields name(s) used in the query</returns>
        public String generateBloombergResponse(String inputXML)
        {
            Dictionary<String, object> inputs = generateInputs(inputXML);

            String instrument = null;
            object oInstrument;
            if (inputs.TryGetValue(INPUT_INSTRUMENT, out oInstrument))
            {
                instrument = (String)oInstrument;
            }

            ArrayList fields = new ArrayList();
            object oFields;
            if (inputs.TryGetValue(INPUT_FIELDS, out oFields))
            {
                fields = (ArrayList)oFields;
            }

            Dictionary<String, object> overrides = new Dictionary<String, object>();
            object oOverrides;
            if (inputs.TryGetValue(INPUT_OVERRIDES, out oOverrides))
            {
                overrides = (Dictionary<String, object>)oOverrides;
            }

            String result = sendRefDataRequest(instrument, fields, overrides);
            killSession();

            // finalize the XML response
            StringBuilder sb = new StringBuilder();
            sb.Append("<BBCommAdapterResponse>");
            sb.Append(result);
            sb.Append("</BBCommAdapterResponse>");

            return sb.ToString();
        }
}

Next, write the server socket.

using System;
using System.Text;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using System.IO;
using System.Xml;
using log4net;
using your.choice.of.namespace;

namespace your.choice.of.namespace
{
    /// <summary>
    /// Class that process inbound and outbound communication using port 8195.
    /// Although a new thread is spawned for each TCP/IP connection, the communication is synchronous per thread.
    /// </summary>
    class ServerSocketProcessor
    {
        private static readonly ILog log = log4net.LogManager.GetLogger(typeof(ServerSocketProcessor));

        private const int maxConnections = 10;
        private const int listenPort = 8195;
        private Thread listenerThread;
        private TcpListener serverSocket;
        private IPAddress serverIpAddress;
        private TcpClient client;
        private String clientIpAddress;

        public ServerSocketProcessor()
        {
            try
            {
                IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
                serverIpAddress = ipHostInfo.AddressList[0];
                serverSocket = new TcpListener(serverIpAddress, listenPort);
                listenerThread = new Thread(new ThreadStart(ListenForClients));
                listenerThread.Start();
            }
            catch (Exception)
            {
                log.Error("Unable to set-up a server socket on port: " + listenPort);
            }
        }

        private void HandleClientComm(object client)
        {
            TcpClient tcpClient = (TcpClient)client;
            NetworkStream clientStream = tcpClient.GetStream();

            while (true)
            {

                StringBuilder sb = new StringBuilder();
                byte[] readBuffer = new byte[1024];
                int numBytesRead;

                try
                {
                    numBytesRead = 0;
                    do
                    {
                        numBytesRead = clientStream.Read(readBuffer, 0, readBuffer.Length);
                        sb.Append(Encoding.UTF8.GetString(readBuffer, 0, numBytesRead));
                    } while (clientStream.DataAvailable);
                }
                catch (Exception)
                {
                    log.Error("Client has disconnected.");
                    break;
                }

                if (numBytesRead == 0)
                {
                    log.Info("Client has disconnected.");
                    break;
                }

                String inputXML = sb.ToString();
                UTF8Encoding encoder = new UTF8Encoding();
                byte[] responseBuffer = new byte[4096];
                BloombergRequestHandler handler = new BloombergRequestHandler();
                String result;
                if (serverIpAddress.ToString().Equals(clientIpAddress))
                {
                    log.Info("Processing for allowed client.");
                    result = handler.generateBloombergResponse(inputXML);
                    responseBuffer = encoder.GetBytes(result);
                }
                else
                {
                    log.Info("No processing: Exception caught.");
                    result = "Operation not allowed when client's IP is different from this IP ";
                    responseBuffer = encoder.GetBytes(result);
                }

                // prepare the response
                log.Info("Preparing response...");
                log.Info("Writing response to client stream...");
                clientStream.Write(responseBuffer, 0, responseBuffer.Length);
                log.Info("Flushing stream...");
                clientStream.Flush();
                log.Info("Closing stream...");
                clientStream.Close();
            }
            tcpClient.Close();
        }

        private void ListenForClients()
        {
            this.serverSocket.Start();
            while (true)
            {
                // blocks until a client has connected to server
                client = this.serverSocket.AcceptTcpClient();
                clientIpAddress = client.Client.RemoteEndPoint.ToString().Split(':')[0];
                log.Info("Client has connected...");
                log.Info("Local (Server) IP: " + serverIpAddress + "; Remote (Client) IP: " + clientIpAddress);

                // create a thread to handle communication with connected client
                Thread clientThread = new Thread(new ParameterizedThreadStart(HandleClientComm));
                clientThread.Start(client);
            }
        }
    }
}


Finally, write the entry point of the application.

using System;
using System.Windows.Forms;
using System.Diagnostics;
using log4net;
using log4net.Config;
using your.choice.of.namespace;

namespace your.choice.of.namespace
{
    static class BBCommAdapter
    {
        private static readonly ILog log = log4net.LogManager.GetLogger(typeof(BBCommAdapter));

        public static Process ExistingProcess()
        {
            Process curr = Process.GetCurrentProcess();
            Process[] procs = Process.GetProcessesByName(curr.ProcessName);
            foreach(Process p in procs) {
                if (p.Id != curr.Id && (p.MainModule.FileName == curr.MainModule.FileName)) {
                    return p;
                }
            }
            return null;
        }

        /// <summary>
        /// The main entry point for the application.
        /// </summary>
        [STAThread]
        static void Main()
        {
            log4net.Config.BasicConfigurator.Configure();
            if (ExistingProcess() != null) {
                log.Error("Another instance of BBCommAdapter is already running with pid: " + ExistingProcess().Id);
                return;
            }

            log.Info("Initializing server...");
            ServerSocketProcessor processor = new ServerSocketProcessor();

            Application.EnableVisualStyles();
            Application.SetCompatibleTextRenderingDefault(false);
            BBCommAdapterForm appForm = new BBCommAdapterForm();
           
            log.Info("Starting a fresh instance of BBCommAdapter with pid:" + Process.GetCurrentProcess().Id);
            Application.Run(appForm);

        }
    }
}


As for the Windows Form, add 3 fields, a button, a notifyIcon, and contextMenuStrip. Set the notifyIcon's context menu to contextMenuStrip:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;
using ArrayList = System.Collections.ArrayList;
using System.Diagnostics;
using System.Runtime.InteropServices;
using your.choice.of.namespace;

namespace your.choice.of.namespace
{
    public partial class BBCommAdapterForm : Form
    {

        // used to disable the (X) button on form
        const int MF_BYPOSITION = 0x400;
        [DllImport("User32")]
        private static extern int RemoveMenu(IntPtr hMenu, int nPosition, int wFlags);
        [DllImport("User32")]
        private static extern IntPtr GetSystemMenu(IntPtr hWnd, bool bRevert);
        [DllImport("User32")]
        private static extern int GetMenuItemCount(IntPtr hWnd);


        public BBCommAdapterForm()
        {
            InitializeComponent();
        }

        private String generateInputXML()
        {
            StringBuilder sb = new StringBuilder();
            sb.Append("<TestConnection>");
            sb.Append("<INSTRUMENT>");
            sb.Append(fldInstrument.Text);
            sb.Append("</INSTRUMENT>");
            sb.Append("<FIELDS>");
            sb.Append(fldField.Text);
            sb.Append("</FIELDS>");
            // only here to complete the actual request XML schema
            //sb.Append("<OVERRIDES>");
            //sb.Append("<OVERRIDE>");
            //sb.Append("OVRFLDSKey1, OVRValue1");
            //sb.Append("</OVERRIDE>");
            //sb.Append("<OVERRIDE>");
            //sb.Append("OVRFLDSKey2, OVRValue2");
            //sb.Append("</OVERRIDE>");
            //sb.Append("</OVERRIDES>");
            //sb.Append("</TestConnection>");
            return sb.ToString();
        }

        private void btnTestConnection_Click(object sender, EventArgs e)
        {
            BloombergRequestHandler handler = new BloombergUnrestrictedRequestHandler();
            String inputXML = generateInputXML();
            String results = handler.generateBloombergResponse(inputXML);
            fldResult.Text = results;

        }
        private void BBCommAdapterForm_Resize(object sender, EventArgs e)
        {
            if (this.WindowState == FormWindowState.Minimized)
            {
                notifyIcon.Visible = true;
                notifyIcon.BalloonTipText = "BBCommAdapter is running in background";
                notifyIcon.BalloonTipTitle = "BBCommAdapter";
                notifyIcon.BalloonTipIcon = ToolTipIcon.Info;
                notifyIcon.ShowBalloonTip(500);
                Hide();
            }
            else if (this.WindowState == FormWindowState.Normal)
            {
                Show();
                notifyIcon.Visible = false;
            }
        }

        private void notifyIcon_MouseDoubleClick(object sender, MouseEventArgs e)
        {
            Show();
            this.WindowState = FormWindowState.Normal;
            notifyIcon.Visible = false;
        }

        private void menuRestore_Click(object sender, EventArgs e)
        {
            Show();
            this.WindowState = FormWindowState.Normal;
            notifyIcon.Visible = false;
        }

        private void menuExit_Click(object sender, EventArgs e)
        {
            if (MessageBox.Show("Close BBCommAdapter application?", "Exit BBCommAdapter", MessageBoxButtons.YesNo) != DialogResult.Yes)
            {
                // do nothing
            }
            else
            {
                // kill itself
                Process[] localByName = Process.GetProcessesByName("BBCommAdapter");
                foreach (Process p in localByName)
                {
                    p.Kill();
                }
            }
        }

        private void BBCommAdapterForm_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (MessageBox.Show("Close BBCommAdapter application?", "Exit BBCommAdapter", MessageBoxButtons.YesNo) != DialogResult.Yes)
            {
                e.Cancel = true;
                notifyIcon.Visible = true;
                notifyIcon.BalloonTipText = "BBCommAdapter is running in background";
                notifyIcon.BalloonTipTitle = "BBCommAdapter";
                notifyIcon.BalloonTipIcon = ToolTipIcon.Info;
                notifyIcon.ShowBalloonTip(500);
                Hide();
            }
            else
            {
                // kill itself
                Process[] localByName = Process.GetProcessesByName("BBCommAdapter");
                foreach (Process p in localByName)
                {
                    p.Kill();
                }
            }
        }

        private void BBCommAdapterForm_Shown(object sender, EventArgs e)
        {
            this.Visible = false;
            this.WindowState = FormWindowState.Minimized;
            notifyIcon.Visible = true;
            notifyIcon.BalloonTipText = "BBCommAdapter is running in background";
            notifyIcon.BalloonTipTitle = "BBCommAdapter";
            notifyIcon.BalloonTipIcon = ToolTipIcon.Info;
        }

        private void BBCommAdapterForm_Load(object sender, EventArgs e)
        {
            IntPtr hMenu = GetSystemMenu(this.Handle, false);
            int menuItemCount = GetMenuItemCount(hMenu);
            RemoveMenu(hMenu, menuItemCount - 1, MF_BYPOSITION);
        }
    }
}

1 comment:

  1. Hi, really interesting work..have you never tried to logOn on a remote bloomberg terminal on your lan? I need to specify this parameter and not localhost..tks in Advance

    ReplyDelete