Usage examples¶
Synchronous query¶
Following example presents how to execute simple, synchronous query against a remote q process:
from qpython import qconnection
if __name__ == '__main__':
# create connection object
q = qconnection.QConnection(host='localhost', port=5000)
# initialize connection
q.open()
print(q)
print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
# simple query execution via: QConnection.__call__
data = q('{`int$ til x}', 10)
print('type: %s, numpy.dtype: %s, meta.qtype: %s, data: %s ' % (type(data), data.dtype, data.meta.qtype, data))
# simple query execution via: QConnection.sync
data = q.sync('{`long$ til x}', 10)
print('type: %s, numpy.dtype: %s, meta.qtype: %s, data: %s ' % (type(data), data.dtype, data.meta.qtype, data))
# low-level query and read
q.query(qconnection.MessageType.SYNC, '{`short$ til x}', 10) # sends a SYNC query
msg = q.receive(data_only=False, raw=False) # retrieve entire message
print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(msg), msg.type, msg.size, msg.is_compressed))
data = msg.data
print('type: %s, numpy.dtype: %s, meta.qtype: %s, data: %s ' % (type(data), data.dtype, data.meta.qtype, data))
# close connection
q.close()
This code prints to the console:
:localhost:5000
IPC version: 3. Is connected: True
type: <class 'qpython.qcollection.QList'>, numpy.dtype: int32, meta.qtype: 6, data: [0 1 2 3 4 5 6 7 8 9]
type: <class 'qpython.qcollection.QList'>, numpy.dtype: int64, meta.qtype: 7, data: [0 1 2 3 4 5 6 7 8 9]
type: <class 'qpython.qreader.QMessage'>, message type: 2, data size: 34, is_compressed: False
type: <class 'qpython.qcollection.QList'>, numpy.dtype: int16, meta.qtype: 5, data: [0 1 2 3 4 5 6 7 8 9]
Asynchronous query¶
Following example presents how to execute simple, asynchronous query against a remote q process:
import random
import threading
import time
from qpython import qconnection
from qpython.qtype import QException
from qpython.qconnection import MessageType
from qpython.qcollection import QDictionary
class ListenerThread(threading.Thread):
def __init__(self, q):
super(ListenerThread, self).__init__()
self.q = q
self._stopper = threading.Event()
def stop(self):
self._stopper.set()
def stopped(self):
return self._stopper.isSet()
def run(self):
while not self.stopped():
print('.')
try:
message = self.q.receive(data_only = False, raw = False) # retrieve entire message
if message.type != MessageType.ASYNC:
print('Unexpected message, expected message of type: ASYNC')
print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed))
print(message.data)
if isinstance(message.data, QDictionary):
# stop after 10th query
if message.data[b'queryid'] == 9:
self.stop()
except QException as e:
print(e)
if __name__ == '__main__':
# create connection object
q = qconnection.QConnection(host = 'localhost', port = 5000)
# initialize connection
q.open()
print(q)
print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
try:
# definition of asynchronous multiply function
# queryid - unique identifier of function call - used to identify
# the result
# a, b - parameters to the query
q.sync('asynchMult:{[queryid;a;b] res:a*b; (neg .z.w)(`queryid`result!(queryid;res)) }');
t = ListenerThread(q)
t.start()
for x in range(10):
a = random.randint(1, 100)
b = random.randint(1, 100)
print('Asynchronous call with queryid=%s with arguments: %s, %s' % (x, a, b))
q.async('asynchMult', x, a, b);
time.sleep(1)
finally:
q.close()
Interactive console¶
This example depicts how to create a simple interactive console for communication with a q process:
import qpython
from qpython import qconnection
from qpython.qtype import QException
try:
input = raw_input
except NameError:
pass
if __name__ == '__main__':
print('qPython %s Cython extensions enabled: %s' % (qpython.__version__, qpython.__is_cython_enabled__))
with qconnection.QConnection(host = 'localhost', port = 5000) as q:
print(q)
print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
while True:
try:
x = input('Q)')
except EOFError:
print('')
break
if x == '\\\\':
break
try:
result = q(x)
print(type(result))
print(result)
except QException as msg:
print('q error: \'%s' % msg)
Twisted integration¶
This example presents how the qPython can be used along with Twisted to build asynchronous client:
Note
This sample code overwrites .u.sub and .z.ts functions on q process.
import struct
import sys
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import reactor
from qpython.qconnection import MessageType, QAuthenticationException
from qpython.qreader import QReader
from qpython.qwriter import QWriter, QWriterException
class IPCProtocol(Protocol):
class State(object):
UNKNOWN = -1
HANDSHAKE = 0
CONNECTED = 1
def connectionMade(self):
self.state = IPCProtocol.State.UNKNOWN
self.credentials = self.factory.username + ':' + self.factory.password if self.factory.password else ''
self.transport.write(self.credentials + '\3\0')
self._message = None
def dataReceived(self, data):
if self.state == IPCProtocol.State.CONNECTED:
try:
if not self._message:
self._message = self._reader.read_header(source=data)
self._buffer = ''
self._buffer += data
buffer_len = len(self._buffer) if self._buffer else 0
while self._message and self._message.size <= buffer_len:
complete_message = self._buffer[:self._message.size]
if buffer_len > self._message.size:
self._buffer = self._buffer[self._message.size:]
buffer_len = len(self._buffer) if self._buffer else 0
self._message = self._reader.read_header(source=self._buffer)
else:
self._message = None
self._buffer = ''
buffer_len = 0
self.factory.onMessage(self._reader.read(source=complete_message, numpy_temporals=True))
except:
self.factory.onError(sys.exc_info())
self._message = None
self._buffer = ''
elif self.state == IPCProtocol.State.UNKNOWN:
# handshake
if len(data) == 1:
self._init(data)
else:
self.state = IPCProtocol.State.HANDSHAKE
self.transport.write(self.credentials + '\0')
else:
# protocol version fallback
if len(data) == 1:
self._init(data)
else:
raise QAuthenticationException('Connection denied.')
def _init(self, data):
self.state = IPCProtocol.State.CONNECTED
self.protocol_version = min(struct.unpack('B', data)[0], 3)
self._writer = QWriter(stream=None, protocol_version=self.protocol_version)
self._reader = QReader(stream=None)
self.factory.clientReady(self)
def query(self, msg_type, query, *parameters):
if parameters and len(parameters) > 8:
raise QWriterException('Too many parameters.')
if not parameters or len(parameters) == 0:
self.transport.write(self._writer.write(query, msg_type))
else:
self.transport.write(self._writer.write([query] + list(parameters), msg_type))
class IPCClientFactory(ClientFactory):
protocol = IPCProtocol
def __init__(self, username, password, connect_success_callback, connect_fail_callback, data_callback, error_callback):
self.username = username
self.password = password
self.client = None
# register callbacks
self.connect_success_callback = connect_success_callback
self.connect_fail_callback = connect_fail_callback
self.data_callback = data_callback
self.error_callback = error_callback
def clientConnectionLost(self, connector, reason):
print('Lost connection. Reason: %s' % reason)
# connector.connect()
def clientConnectionFailed(self, connector, reason):
if self.connect_fail_callback:
self.connect_fail_callback(self, reason)
def clientReady(self, client):
self.client = client
if self.connect_success_callback:
self.connect_success_callback(self)
def onMessage(self, message):
if self.data_callback:
self.data_callback(self, message)
def onError(self, error):
if self.error_callback:
self.error_callback(self, error)
def query(self, msg_type, query, *parameters):
if self.client:
self.client.query(msg_type, query, *parameters)
def onConnectSuccess(source):
print('Connected, protocol version: %s' % source.client.protocol_version)
source.query(MessageType.SYNC, '.z.ts:{(handle)(`timestamp$100?1000000000000000000)}')
source.query(MessageType.SYNC, '.u.sub:{[t;s] handle:: neg .z.w}')
source.query(MessageType.ASYNC, '.u.sub', 'trade', '')
def onConnectFail(source, reason):
print('Connection refused: %s' % reason)
def onMessage(source, message):
print('Received: %s %s' % (message.type, message.data))
def onError(source, error):
print('Error: %s' % error)
if __name__ == '__main__':
factory = IPCClientFactory('user', 'pwd', onConnectSuccess, onConnectFail, onMessage, onError)
reactor.connectTCP('localhost', 5000, factory)
reactor.run()
Subscribing to tick service¶
This example depicts how to subscribe to standard kdb+ tickerplant service:
import numpy
import threading
import sys
from qpython import qconnection
from qpython.qtype import QException
from qpython.qconnection import MessageType
from qpython.qcollection import QTable
class ListenerThread(threading.Thread):
def __init__(self, q):
super(ListenerThread, self).__init__()
self.q = q
self._stopper = threading.Event()
def stopit(self):
self._stopper.set()
def stopped(self):
return self._stopper.is_set()
def run(self):
while not self.stopped():
print('.')
try:
message = self.q.receive(data_only = False, raw = False) # retrieve entire message
if message.type != MessageType.ASYNC:
print('Unexpected message, expected message of type: ASYNC')
print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed))
if isinstance(message.data, list):
# unpack upd message
if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable):
for row in message.data[2]:
print(row)
except QException as e:
print(e)
if __name__ == '__main__':
with qconnection.QConnection(host = 'localhost', port = 17010) as q:
print(q)
print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
print('Press <ENTER> to close application')
# subscribe to tick
response = q.sync('.u.sub', numpy.string_('trade'), numpy.string_(''))
# get table model
if isinstance(response[1], QTable):
print('%s table data model: %s' % (response[0], response[1].dtype))
t = ListenerThread(q)
t.start()
sys.stdin.readline()
t.stopit()
Data publisher¶
This example shows how to stream data to the kdb+ process using standard tickerplant API:
import datetime
import numpy
import random
import threading
import sys
import time
from qpython import qconnection
from qpython.qcollection import qlist
from qpython.qtype import QException, QTIME_LIST, QSYMBOL_LIST, QFLOAT_LIST
class PublisherThread(threading.Thread):
def __init__(self, q):
super(PublisherThread, self).__init__()
self.q = q
self._stopper = threading.Event()
def stop(self):
self._stopper.set()
def stopped(self):
return self._stopper.isSet()
def run(self):
while not self.stopped():
print('.')
try:
# publish data to tick
# function: .u.upd
# table: ask
self.q.sync('.u.upd', numpy.string_('ask'), self.get_ask_data())
time.sleep(1)
except QException as e:
print(e)
except:
self.stop()
def get_ask_data(self):
c = random.randint(1, 10)
today = numpy.datetime64(datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0))
time = [numpy.timedelta64((numpy.datetime64(datetime.datetime.now()) - today), 'ms') for x in range(c)]
instr = ['instr_%d' % random.randint(1, 100) for x in range(c)]
src = ['qPython' for x in range(c)]
ask = [random.random() * random.randint(1, 100) for x in range(c)]
data = [qlist(time, qtype=QTIME_LIST), qlist(instr, qtype=QSYMBOL_LIST), qlist(src, qtype=QSYMBOL_LIST), qlist(ask, qtype=QFLOAT_LIST)]
print(data)
return data
if __name__ == '__main__':
with qconnection.QConnection(host='localhost', port=17010) as q:
print(q)
print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
print('Press <ENTER> to close application')
t = PublisherThread(q)
t.start()
sys.stdin.readline()
t.stop()
t.join()