Interactive Brokers TWS API Tutorial (Python)
In this tutorial, we will code a simple trading app powered by the Interactive Brokers TWS API and TradingView Lightweight Charts.
It will look like this:
Features
The app will have the following features (see video for demo):
- Historical Data - request data on different timeframes from the TWS API
- Charts - visualize price bars on TradingView lightweight charts
- Real-Time Data - update charts with real-time data
- Drawing Tools - Draw trendlines, rays, and horizontal lines
- Market Scanner - scan for top percent gainers, hot by volume, and more
- Hot Keys - keyboard combinations to place orders or perform other tasks
- Screenshots - press a button to take a screenshot of your chart's state
- Indicators - run calculations on price data and render them on the chart
- Ordering - place an order programmatically
What You Will Learn
The Interactive Brokers TWS API has a somewhat steep learning curve relative to simple REST API's. However, it offers unique features and is a rewarding learning  experience. By implementing a practical application, we will learn:
Programming Concepts
- Python programming
- User interfaces
- Data visualization
- Data analysis with pandas
- Object-Oriented Programming
- Asynchronous Programming
- Network Programming
- Sockets
- Threads
- Queues
- Events and callback functions
Financial Concepts
- Market Data
- Financial Instruments / Contracts
- Exchanges
- Order Types
- Risk Management
Tools Used
- Python - We will be using Python due to its ease of use. This tutorial assumes you already have Python installed and know how to run a Python program. The TWS API also has support for C++, Java, and C#, but these are beyond the scope of this tutorial.
- Visual Studio Code - Microsoft's popular open source code editor. To enable autocompletion and debugging, we will be using two VS Code extensions: the Python extension and the Pylance extension.
- Interactive Brokers TWS API - The official API for Interactive Brokers Trader Workstation. Note: we will be using the classic TWS, not IBKR Desktop.
- TradingView Lightweight Charts Python - an open source library by louisnw on Github that provides a pythonic interface to TradingView charts.
Getting Started
There is a fair amount of setup involved in using the TWS API. The good news is much of this setup is a one time thing. Once you go through these initial steps, it is much easier to create subsequent programs.
Setting Up Trader Workstation
First download and install Trader Workstation. Since we are using the TWS API, be sure to download Trader Workstation and not IBKR Desktop. Also be sure you are downloading the version for your operating system (Windows, Mac, or Linux).
Paper Trading
Trading stocks, options, and crypto is a risky activity. You can lose money.
The good news is, there is a place where you can practice, experiment, and learn in a realistic environment without risking money. This is what paper trading is for. I recommend that if you are following along, you try everything in paper trading mode.
Start Trader Workstation and select the Paper Trading tab. You should see a red bar that says "Simulated Trading". Log in to your account.
Global Configuration
Find Global Configuration in the menu:
Click API, then Settings. Check the box next to "Enable ActiveX and Socket Clients". For Paper Trading, set the port to 7497. Also check the boxes for API log files.
Live Trading
While we're at it, you should also verify and setup your live trading configuration. Click the Live Trading tab and log in. If you have two factor authentication set up, you will be prompted to complete the 2FA step.
Again go to the Global Configuration -> API Settings, and Enable ActiveX and Socket Clients. For Live Trading, make sure the port is set to 7496. Save and apply all of your settings. We'll come back to Trader Workstation in a bit.
Setting Up Your Coding Environment
Now let's set up our coding environment. First download and install VS Code. It is also helpful to add the Python and Pylance extensions. To install these, click the extensions icon on the left sidebar (it looks like blocks). Search for Python and install the extension. Then do the same for Pylance:
Your Project Folder
Create a new folder for your project. I'll call mine "ib-trading". We'll put our code in this folder. Create a new python script called app.py. Then click the play button to run the script. If all is working correctly, you should see the script run in a terminal window at the bottom of your editor.
Currently, the script doesn't do anything, so there is no output.
Creating a Virtual environment
Our project will require us to install a number of Python packages. When dealing with many project dependencies, it is best practice to create a virtual environment.
A virtual environment in Python is a self-contained directory that enables you to maintain multiple isolated development environments for Python projects. This tool allows you to manage project dependencies separately, ensuring that each project has access to the specific versions of libraries or Python itself it needs, without conflicting with other projects or the global Python installation.
In your VS Code terminal, type the following:
python3 -m venv venv
. venv/bin/activate
Visual Studio Code should detect that you have created a virtual environment and give you the option of using it in your project. Click yes:
Now when you click the play button, VS Code will use your virtual environment. You can also just run your program from the terminal:
python3 app.py
Be sure that your virtual environment is activated. You should see the (venv) in your terminal.
Installing Dependencies
Now that we have our virtual environment set up, we can install the packages we will be using. We will do this using the pip command.
First installing the lightweight-charts package:
pip install lightweight-charts
Next download the TWS API. Be sure to download the latest stable package for your operating system. Place the zip file in your project directory (eg. ib-trading) and unzip it.
You should now see an IBJts directory.
Change to the IBJts/source/pythonclient directory. Then type:
python3 setup.py install
If all is set up properly, you should be able to import ibapi classes in app.py and run it without errors. Furthermore, if Pylance is configured correctly, you should be able to use autocomplete and use keyboard shortcuts to jump into these classes for further exploration. If the classes are underlined yellow, you can do "Quick Fix" to add them to Pylance.
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
Exploring the IB API Source Code
Let's take a moment to explore the source code in this package.
Client Code
Look in the source directory and note that there are clients for C++, Java, and Python. We're focused on Python for this tutorial, but let me know if there is demand for other languages.
I find it extremely helpful to look at the documentation for the Interactive Brokers API, but sometimes it is nice to look at the client source code to really see what is going on.
For instance, have a look at IBJts/source/pythonclient/ibapi/client.py. Note that in a method like reqHistoricalData(), it is sometimes helpful to look at the source code and comments. Sometimes you might see something that either isn't documented, or is more fully explained by the comments in the code.
Sample Code
Next, look in the samples directory and take a quick look at the ContractSamples.py and OrderSamples.py. Â
The contract samples show you how to construct contracts for trading indices, commodities, options, bonds, stocks, option combos, futures, and many other financial instruments.
The order samples show you how to place various order types. For example, you can place market on open, market on close, bracket orders, stop limit orders, and many other order types.
Creating an Interactive Brokers Client
To connect to the Trader Workstation, we will create a Client class. This class will extend the EClient and EWrapper classes. I'll show the code first, then talk about what is going on here.
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from threading import Thread
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
client = IBClient('127.0.0.1', 7497, 1)
The IBClient
acts as the client, initiating requests for data or actions. The Interactive Brokers server responds to these requests. The IBClient
connects using a host address and port number.
The client code makes practical use of several object-oriented programming concepts.
class IBClient(EWrapper, EClient):
- Classes - We define a new class named IBClient:
- Inheritance - The class inherits from both
EWrapper
andEClient
. Â IBClient is a child class that inherits methods and attributes from multiple parent classes. This is an example of multiple inheritance, whereEWrapper
is for handling incoming messages, andEClient
is for sending requests to the IB server. Notice that we called a function called connect(). Where is it defined? It was inherited from EClient.
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
- Constructor - The arguments or inputs '127.0.0.1', 7497, and 1 are passed to "__init__". The "__init__" function is called automatically when you create a new object.
client = IBClient('127.0.0.1', 7497, 1)
- Objects - Interactive Brokers allows for multiple clients, so we can create multiple client objects using the same class. Here the client object is an instance of IBClient.
Callbacks
Callbacks allow a piece of code to notify or inform another part of the program when certain events occur or when a specific task is completed. Callbacks are a fundamental concept in many programming paradigms, especially in asynchronous programming, event-driven programming, and in scenarios involving I/O operations, like network requests or file reading. The first example of a callback we have is the error() function:
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
Callbacks in the Context of the EWrapper
Class
The EWrapper
class uses callback methods to handle various types of information received from the server. When you implement the EWrapper
interface, you define what should happen when these events occur (e.g., receiving market data, order updates, or error messages). Each method in the EWrapper
interface acts as a callback that gets executed in response to specific messages from the IB server. We'll implement a variety of EWrapper callbacks in this project.
Requesting Historical Data
Now that we have a client and can successfully connect to Trader Workstation, let's actually do something with our connection. We will start with a simple request for historical data.
import time, datetime
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract
from threading import Thread
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def historicalData(self, req_id, bar):
print(bar)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
if __name__ == '__main__':
client = IBClient('127.0.0.1', 7497, 1)
time.sleep(1)
contract = Contract()
contract.symbol = 'TSM'
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
client.reqHistoricalData(
2, contract, '', '30 D', '5 mins', what_to_show, True, 2, False, []
)
time.sleep(1)
Initializing the Chart
Now that we can request historical data, we will want to visually display this data. To do this, we will use the TradingView Lightweight Charts Package.
While we are doing this, let's also put the request for historical data into its own function. We will want to be able to change symbols and timeframes in the future, so we'll create a function called get_bar_data that wraps the Contract creation and request for data. We'll also begin creating some constants so that we can switch between paper trading and live trading.
import time, datetime
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract
from lightweight_charts import Chart
from threading import Thread
INITIAL_SYMBOL = "TSM"
DEFAULT_HOST = '127.0.0.1'
DEFAULT_CLIENT_ID = 1
LIVE_TRADING = False
LIVE_TRADING_PORT = 7496
PAPER_TRADING_PORT = 7497
TRADING_PORT = PAPER_TRADING_PORT
if LIVE_TRADING:
TRADING_PORT = LIVE_TRADING_PORT
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def historicalData(self, req_id, bar):
print(bar)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
def get_bar_data(symbol, timeframe):
print(f"getting bar data for {symbol} {timeframe}")
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
client.reqHistoricalData(
2, contract, '', '30 D', timeframe, what_to_show, True, 2, False, []
)
time.sleep(1)
chart.watermark(symbol)
if __name__ == '__main__':
client = IBClient(DEFAULT_HOST, TRADING_PORT, DEFAULT_CLIENT_ID)
time.sleep(1)
chart = Chart(toolbox=True, width=1000, inner_width=0.6, inner_height=1)
get_bar_data(INITIAL_SYMBOL, '5 mins')
chart.show(block=True)
Rendering the Historical Chart
At this point, we have initialized the chart, but it is empty. To use Interactive Brokers data with our charting library, we need to collect the bar objects into a pandas dataframe and notify the chart object to render the collected data. How will we handle communication between the Interactive Brokers client and our Desktop GUI? We will put the bar data in a queue as each bar is received, then retrieve data from the queue for rendering when all available bars are ready.
import time, datetime
import queue
import pandas as pd
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract
from lightweight_charts import Chart
from threading import Thread
INITIAL_SYMBOL = "TSM"
DEFAULT_HOST = '127.0.0.1'
DEFAULT_CLIENT_ID = 1
LIVE_TRADING = False
LIVE_TRADING_PORT = 7496
PAPER_TRADING_PORT = 7497
TRADING_PORT = PAPER_TRADING_PORT
if LIVE_TRADING:
TRADING_PORT = LIVE_TRADING_PORT
data_queue = queue.Queue()
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def historicalData(self, req_id, bar):
print(bar)
t = datetime.datetime.fromtimestamp(int(bar.date))
# creation bar dictionary for each bar received
data = {
'date': t,
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': int(bar.volume)
}
print(data)
# Put the data into the queue
data_queue.put(data)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
update_chart()
def get_bar_data(symbol, timeframe):
print(f"getting bar data for {symbol} {timeframe}")
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
#now = datetime.datetime.now().strftime('%Y%m%d %H:%M:%S')
#chart.spinner(True)
client.reqHistoricalData(
2, contract, '', '30 D', timeframe, what_to_show, True, 2, False, []
)
time.sleep(1)
chart.watermark(symbol)
# called when we want to update what is rendered on the chart
def update_chart():
try:
bars = []
while True: # Keep checking the queue for new data
data = data_queue.get_nowait()
bars.append(data)
except queue.Empty:
print("empty queue")
finally:
# once we have received all the data, convert to pandas dataframe
df = pd.DataFrame(bars)
print(df)
# set the data on the chart
if not df.empty:
chart.set(df)
# once we get the data back, we don't need a spinner anymore
#chart.spinner(False)
if __name__ == '__main__':
client = IBClient(DEFAULT_HOST, TRADING_PORT, DEFAULT_CLIENT_ID)
time.sleep(1)
chart = Chart(toolbox=True, width=1000, inner_width=0.6, inner_height=1)
chart.legend(True)
chart.topbar.textbox('symbol', INITIAL_SYMBOL)
get_bar_data(INITIAL_SYMBOL, '5 mins')
time.sleep(1)
chart.show(block=True)
Symbol and Timeframe Switching
It's great that we can retrieve and visualize 5 minute bars for TSM, but what if we want to change symbols or timeframes? Let's add a symbol search box and a timeframe switcher.
import time, datetime
import queue
import pandas as pd
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract
from lightweight_charts import Chart
from threading import Thread
INITIAL_SYMBOL = "TSM"
DEFAULT_HOST = '127.0.0.1'
DEFAULT_CLIENT_ID = 1
LIVE_TRADING = False
LIVE_TRADING_PORT = 7496
PAPER_TRADING_PORT = 7497
TRADING_PORT = PAPER_TRADING_PORT
if LIVE_TRADING:
TRADING_PORT = LIVE_TRADING_PORT
data_queue = queue.Queue()
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def historicalData(self, req_id, bar):
print(bar)
t = datetime.datetime.fromtimestamp(int(bar.date))
# creation bar dictionary for each bar received
data = {
'date': t,
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': int(bar.volume)
}
print(data)
# Put the data into the queue
data_queue.put(data)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
update_chart()
def get_bar_data(symbol, timeframe):
print(f"getting bar data for {symbol} {timeframe}")
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
#now = datetime.datetime.now().strftime('%Y%m%d %H:%M:%S')
chart.spinner(True)
client.reqHistoricalData(
2, contract, '', '30 D', timeframe, what_to_show, True, 2, False, []
)
time.sleep(1)
chart.watermark(symbol)
# called when we want to update what is rendered on the chart
def update_chart():
try:
bars = []
while True: # Keep checking the queue for new data
data = data_queue.get_nowait()
bars.append(data)
except queue.Empty:
print("empty queue")
finally:
# once we have received all the data, convert to pandas dataframe
df = pd.DataFrame(bars)
print(df)
# set the data on the chart
if not df.empty:
chart.set(df)
# once we get the data back, we don't need a spinner anymore
chart.spinner(False)
# get new bar data when the user changes timeframes
def on_timeframe_selection(chart):
print("selected timeframe")
print(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
get_bar_data(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
# get new bar data when the user enters a different symbol
def on_search(chart, searched_string):
get_bar_data(searched_string, chart.topbar['timeframe'].value)
chart.topbar['symbol'].set(searched_string)
if __name__ == '__main__':
client = IBClient(DEFAULT_HOST, TRADING_PORT, DEFAULT_CLIENT_ID)
time.sleep(1)
chart = Chart(toolbox=True, width=1000, inner_width=0.6, inner_height=1)
chart.legend(True)
chart.topbar.textbox('symbol', INITIAL_SYMBOL)
chart.topbar.switcher('timeframe', ('5 mins', '15 mins', '1 hour'), default='5 mins', func=on_timeframe_selection)
# set up a function to call when searching for symbol
chart.events.search += on_search
get_bar_data(INITIAL_SYMBOL, '5 mins')
time.sleep(1)
chart.show(block=True)
Adding a Chart Screenshot Button
Since my original video on TradingView Lightweight Charts, the author has added a lot of new functionality, including the ability to capture screenshots of the chart. Let's add this feature.
import time, datetime
import queue
import pandas as pd
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract
from lightweight_charts import Chart
from threading import Thread
INITIAL_SYMBOL = "TSM"
DEFAULT_HOST = '127.0.0.1'
DEFAULT_CLIENT_ID = 1
LIVE_TRADING = False
LIVE_TRADING_PORT = 7496
PAPER_TRADING_PORT = 7497
TRADING_PORT = PAPER_TRADING_PORT
if LIVE_TRADING:
TRADING_PORT = LIVE_TRADING_PORT
data_queue = queue.Queue()
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def historicalData(self, req_id, bar):
print(bar)
t = datetime.datetime.fromtimestamp(int(bar.date))
# creation bar dictionary for each bar received
data = {
'date': t,
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': int(bar.volume)
}
print(data)
# Put the data into the queue
data_queue.put(data)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
update_chart()
def get_bar_data(symbol, timeframe):
print(f"getting bar data for {symbol} {timeframe}")
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
#now = datetime.datetime.now().strftime('%Y%m%d %H:%M:%S')
chart.spinner(True)
client.reqHistoricalData(
2, contract, '', '30 D', timeframe, what_to_show, True, 2, False, []
)
time.sleep(1)
chart.watermark(symbol)
# called when we want to update what is rendered on the chart
def update_chart():
try:
bars = []
while True: # Keep checking the queue for new data
data = data_queue.get_nowait()
bars.append(data)
except queue.Empty:
print("empty queue")
finally:
# once we have received all the data, convert to pandas dataframe
df = pd.DataFrame(bars)
print(df)
# set the data on the chart
if not df.empty:
chart.set(df)
# once we get the data back, we don't need a spinner anymore
chart.spinner(False)
# get new bar data when the user changes timeframes
def on_timeframe_selection(chart):
print("selected timeframe")
print(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
get_bar_data(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
# get new bar data when the user enters a different symbol
def on_search(chart, searched_string):
get_bar_data(searched_string, chart.topbar['timeframe'].value)
chart.topbar['symbol'].set(searched_string)
# handler for the screenshot button
def take_screenshot(key):
img = chart.screenshot()
t = time.time()
with open(f"screenshot-{t}.png", 'wb') as f:
f.write(img)
if __name__ == '__main__':
client = IBClient(DEFAULT_HOST, TRADING_PORT, DEFAULT_CLIENT_ID)
time.sleep(1)
chart = Chart(toolbox=True, width=1000, inner_width=0.6, inner_height=1)
chart.legend(True)
chart.topbar.textbox('symbol', INITIAL_SYMBOL)
chart.topbar.switcher('timeframe', ('5 mins', '15 mins', '1 hour'), default='5 mins', func=on_timeframe_selection)
# set up a function to call when searching for symbol
chart.events.search += on_search
get_bar_data(INITIAL_SYMBOL, '5 mins')
time.sleep(1)
chart.topbar.button('screenshot', 'Screenshot', func=take_screenshot)
chart.show(block=True)
Hotkey Ordering
Another new feature that was added was support for keyboard shortcuts. Let's implement this while also learning how to programmatically place orders with the Interactive Brokers API.
import time, datetime
import queue
import pandas as pd
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract, Order
from lightweight_charts import Chart
from threading import Thread
INITIAL_SYMBOL = "TSM"
DEFAULT_HOST = '127.0.0.1'
DEFAULT_CLIENT_ID = 1
LIVE_TRADING = False
LIVE_TRADING_PORT = 7496
PAPER_TRADING_PORT = 7497
TRADING_PORT = PAPER_TRADING_PORT
if LIVE_TRADING:
TRADING_PORT = LIVE_TRADING_PORT
data_queue = queue.Queue()
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def nextValidId(self, orderId: int):
super().nextValidId(orderId)
self.order_id = orderId
print(f"next valid id is {self.order_id}")
# callback to log order status, we can put more behavior here if needed
def orderStatus(self, order_id, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld, mktCapPrice):
print(f"order status {order_id} {status} {filled} {remaining} {avgFillPrice}")
def historicalData(self, req_id, bar):
print(bar)
t = datetime.datetime.fromtimestamp(int(bar.date))
# creation bar dictionary for each bar received
data = {
'date': t,
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': int(bar.volume)
}
print(data)
# Put the data into the queue
data_queue.put(data)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
update_chart()
def get_bar_data(symbol, timeframe):
print(f"getting bar data for {symbol} {timeframe}")
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
#now = datetime.datetime.now().strftime('%Y%m%d %H:%M:%S')
chart.spinner(True)
client.reqHistoricalData(
2, contract, '', '30 D', timeframe, what_to_show, True, 2, False, []
)
time.sleep(1)
chart.watermark(symbol)
# called when we want to update what is rendered on the chart
def update_chart():
try:
bars = []
while True: # Keep checking the queue for new data
data = data_queue.get_nowait()
bars.append(data)
except queue.Empty:
print("empty queue")
finally:
# once we have received all the data, convert to pandas dataframe
df = pd.DataFrame(bars)
print(df)
# set the data on the chart
if not df.empty:
chart.set(df)
# once we get the data back, we don't need a spinner anymore
chart.spinner(False)
# get new bar data when the user changes timeframes
def on_timeframe_selection(chart):
print("selected timeframe")
print(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
get_bar_data(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
# get new bar data when the user enters a different symbol
def on_search(chart, searched_string):
get_bar_data(searched_string, chart.topbar['timeframe'].value)
chart.topbar['symbol'].set(searched_string)
# handler for the screenshot button
def take_screenshot(key):
img = chart.screenshot()
t = time.time()
with open(f"screenshot-{t}.png", 'wb') as f:
f.write(img)
# handles when the user uses an order hotkey combination
def place_order(key):
# get current symbol
symbol = chart.topbar['symbol'].value
# build contract object
contract = Contract()
contract.symbol = symbol
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = "SMART"
# build order object
order = Order()
order.orderType = "MKT"
order.totalQuantity = 1
# get next order id
client.reqIds(-1)
time.sleep(2)
# set action to buy or sell depending on key pressed
# shift+O is for a buy order
if key == 'O':
print("buy order")
order.action = "BUY"
# shift+P for a sell order
if key == 'P':
print("sell order")
order.action = "SELL"
# place the order
if client.order_id:
print("got order id, placing buy order")
client.placeOrder(client.order_id, contract, order)
if __name__ == '__main__':
client = IBClient(DEFAULT_HOST, TRADING_PORT, DEFAULT_CLIENT_ID)
time.sleep(1)
chart = Chart(toolbox=True, width=1000, inner_width=0.6, inner_height=1)
chart.legend(True)
# hotkey to place a buy order
chart.hotkey('shift', 'O', place_order)
# hotkey to place a sell order
chart.hotkey('shift', 'P', place_order)
chart.topbar.textbox('symbol', INITIAL_SYMBOL)
chart.topbar.switcher('timeframe', ('5 mins', '15 mins', '1 hour'), default='5 mins', func=on_timeframe_selection)
# set up a function to call when searching for symbol
chart.events.search += on_search
get_bar_data(INITIAL_SYMBOL, '5 mins')
time.sleep(1)
chart.topbar.button('screenshot', 'Screenshot', func=take_screenshot)
chart.show(block=True)
Market Scanners
Finally, you'll notice that the right side of the screen has been empty so far. This is because we have reserved room for a market scanner. Let's learn how to use the new Lightweight Charts table object while also implementing an Interactive Brokers scanner.
import time, datetime
import queue
import pandas as pd
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract, Order, ScannerSubscription
from ibapi.tag_value import TagValue
from lightweight_charts import Chart
from threading import Thread
INITIAL_SYMBOL = "TSM"
DEFAULT_HOST = '127.0.0.1'
DEFAULT_CLIENT_ID = 1
LIVE_TRADING = False
LIVE_TRADING_PORT = 7496
PAPER_TRADING_PORT = 7497
TRADING_PORT = PAPER_TRADING_PORT
if LIVE_TRADING:
TRADING_PORT = LIVE_TRADING_PORT
data_queue = queue.Queue()
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def nextValidId(self, orderId: int):
super().nextValidId(orderId)
self.order_id = orderId
print(f"next valid id is {self.order_id}")
# callback to log order status, we can put more behavior here if needed
def orderStatus(self, order_id, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld, mktCapPrice):
print(f"order status {order_id} {status} {filled} {remaining} {avgFillPrice}")
def historicalData(self, req_id, bar):
print(bar)
t = datetime.datetime.fromtimestamp(int(bar.date))
# creation bar dictionary for each bar received
data = {
'date': t,
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': int(bar.volume)
}
print(data)
# Put the data into the queue
data_queue.put(data)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
update_chart()
# callback for when a scan finishes
def scannerData(self, req_id, rank, details, distance, benchmark, projection, legsStr):
super().scannerData(req_id, rank, details, distance, benchmark, projection, legsStr)
print("got scanner data")
print(details.contract)
data = {
'secType': details.contract.secType,
'secId': details.contract.secId,
'exchange': details.contract.primaryExchange,
'symbol': details.contract.symbol
}
print(data)
# Put the data into the queue
data_queue.put(data)
def get_bar_data(symbol, timeframe):
print(f"getting bar data for {symbol} {timeframe}")
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
#now = datetime.datetime.now().strftime('%Y%m%d %H:%M:%S')
chart.spinner(True)
client.reqHistoricalData(
2, contract, '', '30 D', timeframe, what_to_show, True, 2, False, []
)
time.sleep(1)
chart.watermark(symbol)
# called when we want to update what is rendered on the chart
def update_chart():
try:
bars = []
while True: # Keep checking the queue for new data
data = data_queue.get_nowait()
bars.append(data)
except queue.Empty:
print("empty queue")
finally:
# once we have received all the data, convert to pandas dataframe
df = pd.DataFrame(bars)
print(df)
# set the data on the chart
if not df.empty:
chart.set(df)
# once we get the data back, we don't need a spinner anymore
chart.spinner(False)
# get new bar data when the user changes timeframes
def on_timeframe_selection(chart):
print("selected timeframe")
print(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
get_bar_data(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
# get new bar data when the user enters a different symbol
def on_search(chart, searched_string):
get_bar_data(searched_string, chart.topbar['timeframe'].value)
chart.topbar['symbol'].set(searched_string)
# handler for the screenshot button
def take_screenshot(key):
img = chart.screenshot()
t = time.time()
with open(f"screenshot-{t}.png", 'wb') as f:
f.write(img)
# handles when the user uses an order hotkey combination
def place_order(key):
# get current symbol
symbol = chart.topbar['symbol'].value
# build contract object
contract = Contract()
contract.symbol = symbol
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = "SMART"
# build order object
order = Order()
order.orderType = "MKT"
order.totalQuantity = 1
# get next order id
client.reqIds(-1)
time.sleep(2)
# set action to buy or sell depending on key pressed
# shift+O is for a buy order
if key == 'O':
print("buy order")
order.action = "BUY"
# shift+P for a sell order
if key == 'P':
print("sell order")
order.action = "SELL"
# place the order
if client.order_id:
print("got order id, placing buy order")
client.placeOrder(client.order_id, contract, order)
# implement an Interactive Brokers market scanner
def do_scan(scan_code):
scannerSubscription = ScannerSubscription()
scannerSubscription.instrument = "STK"
scannerSubscription.locationCode = "STK.US.MAJOR"
scannerSubscription.scanCode = scan_code
tagValues = []
tagValues.append(TagValue("optVolumeAbove", "1000"))
tagValues.append(TagValue("avgVolumeAbove", "10000"))
client.reqScannerSubscription(7002, scannerSubscription, [], tagValues)
time.sleep(1)
display_scan()
client.cancelScannerSubscription(7002)
# called when we want to render scan results
def display_scan():
# function to call when one of the scan results is clicked
def on_row_click(row):
chart.topbar['symbol'].set(row['symbol'])
get_bar_data(row['symbol'], '5 mins')
# create a table on the UI, pass callback function for when a row is clicked
table = chart.create_table(
width=0.4,
height=0.5,
headings=('symbol', 'value'),
widths=(0.7, 0.3),
alignments=('left', 'center'),
position='left', func=on_row_click
)
# poll queue for any new scan results
try:
while True:
data = data_queue.get_nowait()
# create a new row in the table for each scan result
table.new_row(data['symbol'], '')
except queue.Empty:
print("empty queue")
finally:
print("done")
if __name__ == '__main__':
client = IBClient(DEFAULT_HOST, TRADING_PORT, DEFAULT_CLIENT_ID)
time.sleep(1)
chart = Chart(toolbox=True, width=1000, inner_width=0.6, inner_height=1)
chart.legend(True)
# hotkey to place a buy order
chart.hotkey('shift', 'O', place_order)
# hotkey to place a sell order
chart.hotkey('shift', 'P', place_order)
chart.topbar.textbox('symbol', INITIAL_SYMBOL)
chart.topbar.switcher('timeframe', ('5 mins', '15 mins', '1 hour'), default='5 mins', func=on_timeframe_selection)
# set up a function to call when searching for symbol
chart.events.search += on_search
get_bar_data(INITIAL_SYMBOL, '5 mins')
time.sleep(1)
chart.topbar.button('screenshot', 'Screenshot', func=take_screenshot)
# run a market scanner
do_scan("HOT_BY_VOLUME")
chart.show(block=True)
Adding Lines
To finish up this project, let's add a couple of lines of interest. How do we add a horizontal line and a simple moving average, for instance?
import time, datetime
import queue
import pandas as pd
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract, Order, ScannerSubscription
from ibapi.tag_value import TagValue
from lightweight_charts import Chart
from threading import Thread
INITIAL_SYMBOL = "TSM"
DEFAULT_HOST = '127.0.0.1'
DEFAULT_CLIENT_ID = 1
LIVE_TRADING = False
LIVE_TRADING_PORT = 7496
PAPER_TRADING_PORT = 7497
TRADING_PORT = PAPER_TRADING_PORT
if LIVE_TRADING:
TRADING_PORT = LIVE_TRADING_PORT
data_queue = queue.Queue()
# a list for keeping track of any indicator lines
current_lines = []
class IBClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def nextValidId(self, orderId: int):
super().nextValidId(orderId)
self.order_id = orderId
print(f"next valid id is {self.order_id}")
# callback to log order status, we can put more behavior here if needed
def orderStatus(self, order_id, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld, mktCapPrice):
print(f"order status {order_id} {status} {filled} {remaining} {avgFillPrice}")
def historicalData(self, req_id, bar):
print(bar)
t = datetime.datetime.fromtimestamp(int(bar.date))
# creation bar dictionary for each bar received
data = {
'date': t,
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': int(bar.volume)
}
print(data)
# Put the data into the queue
data_queue.put(data)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
update_chart()
# callback for when a scan finishes
def scannerData(self, req_id, rank, details, distance, benchmark, projection, legsStr):
super().scannerData(req_id, rank, details, distance, benchmark, projection, legsStr)
print("got scanner data")
print(details.contract)
data = {
'secType': details.contract.secType,
'secId': details.contract.secId,
'exchange': details.contract.primaryExchange,
'symbol': details.contract.symbol
}
print(data)
# Put the data into the queue
data_queue.put(data)
def get_bar_data(symbol, timeframe):
print(f"getting bar data for {symbol} {timeframe}")
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
#now = datetime.datetime.now().strftime('%Y%m%d %H:%M:%S')
chart.spinner(True)
client.reqHistoricalData(
2, contract, '', '30 D', timeframe, what_to_show, True, 2, False, []
)
time.sleep(1)
chart.watermark(symbol)
# callback for when the user changes the position of the horizontal line
def on_horizontal_line_move(chart, line):
print(f'Horizontal line moved to: {line.price}')
# called when we want to update what is rendered on the chart
def update_chart():
global current_lines
try:
bars = []
while True: # Keep checking the queue for new data
data = data_queue.get_nowait()
bars.append(data)
except queue.Empty:
print("empty queue")
finally:
# once we have received all the data, convert to pandas dataframe
df = pd.DataFrame(bars)
print(df)
# set the data on the chart
if not df.empty:
chart.set(df)
# draw a horizontal line at the high
chart.horizontal_line(df['high'].max(), func=on_horizontal_line_move)
# if there were any indicator lines on the chart already (eg. SMA), clear them so we can recalculate
if current_lines:
for l in current_lines:
l.delete()
current_lines = []
# calculate any new lines to render
# create a line with SMA label on the chart
line = chart.create_line(name='SMA 50')
line.set(pd.DataFrame({
'time': df['date'],
f'SMA 50': df['close'].rolling(window=50).mean()
}).dropna())
current_lines.append(line)
# once we get the data back, we don't need a spinner anymore
chart.spinner(False)
# get new bar data when the user changes timeframes
def on_timeframe_selection(chart):
print("selected timeframe")
print(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
get_bar_data(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
# get new bar data when the user enters a different symbol
def on_search(chart, searched_string):
get_bar_data(searched_string, chart.topbar['timeframe'].value)
chart.topbar['symbol'].set(searched_string)
# handler for the screenshot button
def take_screenshot(key):
img = chart.screenshot()
t = time.time()
with open(f"screenshot-{t}.png", 'wb') as f:
f.write(img)
# handles when the user uses an order hotkey combination
def place_order(key):
# get current symbol
symbol = chart.topbar['symbol'].value
# build contract object
contract = Contract()
contract.symbol = symbol
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = "SMART"
# build order object
order = Order()
order.orderType = "MKT"
order.totalQuantity = 1
# get next order id
client.reqIds(-1)
time.sleep(2)
# set action to buy or sell depending on key pressed
# shift+O is for a buy order
if key == 'O':
print("buy order")
order.action = "BUY"
# shift+P for a sell order
if key == 'P':
print("sell order")
order.action = "SELL"
# place the order
if client.order_id:
print("got order id, placing buy order")
client.placeOrder(client.order_id, contract, order)
# implement an Interactive Brokers market scanner
def do_scan(scan_code):
scannerSubscription = ScannerSubscription()
scannerSubscription.instrument = "STK"
scannerSubscription.locationCode = "STK.US.MAJOR"
scannerSubscription.scanCode = scan_code
tagValues = []
tagValues.append(TagValue("optVolumeAbove", "1000"))
tagValues.append(TagValue("avgVolumeAbove", "10000"))
client.reqScannerSubscription(7002, scannerSubscription, [], tagValues)
time.sleep(1)
display_scan()
client.cancelScannerSubscription(7002)
# called when we want to render scan results
def display_scan():
# function to call when one of the scan results is clicked
def on_row_click(row):
chart.topbar['symbol'].set(row['symbol'])
get_bar_data(row['symbol'], '5 mins')
# create a table on the UI, pass callback function for when a row is clicked
table = chart.create_table(
width=0.4,
height=0.5,
headings=('symbol', 'value'),
widths=(0.7, 0.3),
alignments=('left', 'center'),
position='left', func=on_row_click
)
# poll queue for any new scan results
try:
while True:
data = data_queue.get_nowait()
# create a new row in the table for each scan result
table.new_row(data['symbol'], '')
except queue.Empty:
print("empty queue")
finally:
print("done")
if __name__ == '__main__':
client = IBClient(DEFAULT_HOST, TRADING_PORT, DEFAULT_CLIENT_ID)
time.sleep(1)
chart = Chart(toolbox=True, width=1000, inner_width=0.6, inner_height=1)
chart.legend(True)
# hotkey to place a buy order
chart.hotkey('shift', 'O', place_order)
# hotkey to place a sell order
chart.hotkey('shift', 'P', place_order)
chart.topbar.textbox('symbol', INITIAL_SYMBOL)
chart.topbar.switcher('timeframe', ('5 mins', '15 mins', '1 hour'), default='5 mins', func=on_timeframe_selection)
# set up a function to call when searching for symbol
chart.events.search += on_search
get_bar_data(INITIAL_SYMBOL, '5 mins')
time.sleep(1)
chart.topbar.button('screenshot', 'Screenshot', func=take_screenshot)
# run a market scanner
do_scan("HOT_BY_VOLUME")
chart.show(block=True)
Final Source Code
Below is the final source code for the project. Hope you learned something new!
import time, datetime
import queue
import pandas as pd
from threading import Thread
from lightweight_charts import Chart
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.client import Contract, Order, ScannerSubscription
from ibapi.tag_value import TagValue
# create a queue for data coming from Interactive Brokers API
data_queue = queue.Queue()
# a list for keeping track of any indicator lines
current_lines = []
# initial chart symbol to show
INITIAL_SYMBOL = "TSM"
# settings for live trading vs. paper trading mode
LIVE_TRADING = False
LIVE_TRADING_PORT = 7496
PAPER_TRADING_PORT = 7497
TRADING_PORT = PAPER_TRADING_PORT
if LIVE_TRADING:
TRADING_PORT = LIVE_TRADING_PORT
# these defaults are fine
DEFAULT_HOST = '127.0.0.1'
DEFAULT_CLIENT_ID = 1
# Client for connecting to Interactive Brokers
class PTLClient(EWrapper, EClient):
def __init__(self, host, port, client_id):
EClient.__init__(self, self)
self.connect(host, port, client_id)
# create a new Thread
thread = Thread(target=self.run)
thread.start()
def error(self, req_id, code, msg, misc):
if code in [2104, 2106, 2158]:
print(msg)
else:
print('Error {}: {}'.format(code, msg))
def nextValidId(self, orderId: int):
super().nextValidId(orderId)
self.order_id = orderId
print(f"next valid id is {self.order_id}")
# callback when historical data is received from Interactive Brokers
def historicalData(self, req_id, bar):
t = datetime.datetime.fromtimestamp(int(bar.date))
# creation bar dictionary for each bar received
data = {
'date': t,
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': int(bar.volume)
}
# Put the data into the queue
data_queue.put(data)
# callback when all historical data has been received
def historicalDataEnd(self, reqId, start, end):
print(f"end of data {start} {end}")
# we can update the chart once all data has been received
update_chart()
# callback to log order status, we can put more behavior here if needed
def orderStatus(self, order_id, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld, mktCapPrice):
print(f"order status {order_id} {status} {filled} {remaining} {avgFillPrice}")
# callback for when a scan finishes
def scannerData(self, req_id, rank, details, distance, benchmark, projection, legsStr):
super().scannerData(req_id, rank, details, distance, benchmark, projection, legsStr)
print("got scanner data")
print(details.contract)
data = {
'secType': details.contract.secType,
'secId': details.contract.secId,
'exchange': details.contract.primaryExchange,
'symbol': details.contract.symbol
}
print(data)
# Put the data into the queue
data_queue.put(data)
# called by charting library when the
def get_bar_data(symbol, timeframe):
print(f"getting bar data for {symbol} {timeframe}")
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
what_to_show = 'TRADES'
#now = datetime.datetime.now().strftime('%Y%m%d %H:%M:%S')
chart.spinner(True)
client.reqHistoricalData(
2, contract, '', '30 D', timeframe, what_to_show, True, 2, False, []
)
time.sleep(1)
chart.watermark(symbol)
# handler for the screenshot button
def take_screenshot(key):
img = chart.screenshot()
t = time.time()
with open(f"screenshot-{t}.png", 'wb') as f:
f.write(img)
# handles when the user uses an order hotkey combination
def place_order(key):
# get current symbol
symbol = chart.topbar['symbol'].value
# build contract object
contract = Contract()
contract.symbol = symbol
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = "SMART"
# build order object
order = Order()
order.orderType = "MKT"
order.totalQuantity = 1
# get next order id
client.reqIds(-1)
time.sleep(2)
# set action to buy or sell depending on key pressed
# shift+O is for a buy order
if key == 'O':
print("buy order")
order.action = "BUY"
# shift+P for a sell order
if key == 'P':
print("sell order")
order.action = "SELL"
# place the order
if client.order_id:
print("got order id, placing buy order")
client.placeOrder(client.order_id, contract, order)
# implement an Interactive Brokers market scanner
def do_scan(scan_code):
scannerSubscription = ScannerSubscription()
scannerSubscription.instrument = "STK"
scannerSubscription.locationCode = "STK.US.MAJOR"
scannerSubscription.scanCode = scan_code
tagValues = []
tagValues.append(TagValue("optVolumeAbove", "1000"))
tagValues.append(TagValue("avgVolumeAbove", "10000"))
client.reqScannerSubscription(7002, scannerSubscription, [], tagValues)
time.sleep(1)
display_scan()
client.cancelScannerSubscription(7002)
# get new bar data when the user enters a different symbol
def on_search(chart, searched_string):
get_bar_data(searched_string, chart.topbar['timeframe'].value)
chart.topbar['symbol'].set(searched_string)
# get new bar data when the user changes timeframes
def on_timeframe_selection(chart):
print("selected timeframe")
print(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
get_bar_data(chart.topbar['symbol'].value, chart.topbar['timeframe'].value)
# callback for when the user changes the position of the horizontal line
def on_horizontal_line_move(chart, line):
print(f'Horizontal line moved to: {line.price}')
# called when we want to render scan results
def display_scan():
# function to call when one of the scan results is clicked
def on_row_click(row):
chart.topbar['symbol'].set(row['symbol'])
get_bar_data(row['symbol'], '5 mins')
# create a table on the UI, pass callback function for when a row is clicked
table = chart.create_table(
width=0.4,
height=0.5,
headings=('symbol', 'value'),
widths=(0.7, 0.3),
alignments=('left', 'center'),
position='left', func=on_row_click
)
# poll queue for any new scan results
try:
while True:
data = data_queue.get_nowait()
# create a new row in the table for each scan result
table.new_row(data['symbol'], '')
except queue.Empty:
print("empty queue")
finally:
print("done")
# called when we want to update what is rendered on the chart
def update_chart():
global current_lines
try:
bars = []
while True: # Keep checking the queue for new data
data = data_queue.get_nowait()
bars.append(data)
except queue.Empty:
print("empty queue")
finally:
# once we have received all the data, convert to pandas dataframe
df = pd.DataFrame(bars)
print(df)
# set the data on the chart
chart.set(df)
if not df.empty:
# draw a horizontal line at the high
chart.horizontal_line(df['high'].max(), func=on_horizontal_line_move)
# if there were any indicator lines on the chart already (eg. SMA), clear them so we can recalculate
if current_lines:
for l in current_lines:
l.delete()
current_lines = []
# calculate any new lines to render
# create a line with SMA label on the chart
line = chart.create_line(name='SMA 50')
line.set(pd.DataFrame({
'time': df['date'],
f'SMA 50': df['close'].rolling(window=50).mean()
}).dropna())
current_lines.append(line)
# once we get the data back, we don't need a spinner anymore
chart.spinner(False)
if __name__ == '__main__':
# create a client object
client = PTLClient(DEFAULT_HOST, TRADING_PORT, DEFAULT_CLIENT_ID)
# create chart object, specify display settings
chart = Chart(toolbox=True, width=1000, inner_width=0.6, inner_height=1)
# hotkey to place a buy order
chart.hotkey('shift', 'O', place_order)
# hotkey to place a sell order
chart.hotkey('shift', 'P', place_order)
chart.legend(True)
# set up a function to call when searching for symbol
chart.events.search += on_search
# set up top bar
chart.topbar.textbox('symbol', INITIAL_SYMBOL)
# give ability to switch between timeframes
chart.topbar.switcher('timeframe', ('5 mins', '15 mins', '1 hour'), default='5 mins', func=on_timeframe_selection)
# populate initial chart
get_bar_data(INITIAL_SYMBOL, '5 mins')
# run a market scanner
do_scan("HOT_BY_VOLUME")
# create a button for taking a screenshot of the chart
chart.topbar.button('screenshot', 'Screenshot', func=take_screenshot)
# show the chart
chart.show(block=True)
Bonus GPT4o Integration
I released a follow up video where I make the take_screenshot() function call GPT4o. Here is a modification you can make to the take_screenshot() function. Note that we import analyze_chart() from a new file that is included down below:
from gpt4o_technical_analyst import analyze_chart
# handler for the screenshot button
def take_screenshot(key):
img = chart.screenshot()
t = time.time()
chart_filename = f"screenshots/screenshot-{t}.png"
analysis_filename = f"screenshots/screenshot-{t}.md"
with open(chart_filename, 'wb') as f:
f.write(img)
analysis = analyze_chart(chart_filename)
print(analysis)
with open(analysis_filename, "w") as text_file:
text_file.write(analysis)
Then just define an analyze_chart() function, which you can save in a file named gpt4o_technical_analyst.py:
import os
import base64
from openai import OpenAI
OPENAI_API_KEY = ""
OPENAI_ORG_ID = ""
os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY
client = OpenAI(organization=OPENAI_ORG_ID)
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def analyze_chart(chart_path):
base64_image = encode_image(chart_path)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Analyze this chart. Include the symbol and discuss the price action."},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
},
},
],
}
]
)
return response.choices[0].message.content