feature-image

I wrote previously about how I created a remote control application to automate changing my TV source and click to a Roku channel. I ran into some challenges with ubuntu server 20.04 running on this raspberry pi 2 that I have. First the power management features kept putting the raspberry pi to sleep which made this whole app I made less reliable. I found a great article on a simple way to turn off that power management feature. While I love the flexibility of node-red the raspberry pi 2 struggled. The python API I wrote did its job well and thanks to ThreadPoolExecutor the long running logic did not block while everything ran. Though I puzzled much too long figuring out how to host my code through gunicorn. After all this set up I looked at what I created and I was dissatisfied because I wanted to change things but I did not have a lot of time and energy to figure out every piece of this pipeline.

Finding a more dynamic way

When I worked at an automotive marketing company one system was tasked with parsing dealer feeds and processing that data into a database for displaying on dealer website inventory search pages. This vehicle data came in via text files, xml, and the structure could be volatile because someone somewhere had more information to add about a vehicle. This could break the ingest of all further vehicle data. Dealerships run on accurate vehicle data and this is a problem that cannot stay broken long or take long to fix.

This scenario applies to the problem I have because I’m a hobbyist programmer at this point and my time is limited and precious. While I want to retain and grow my software development skills spending a weekend fixing or redeploying an assortment of my applications is not something I want to deal with.

chain of action model

Implementation

I sketched out what I wanted to achieve based on constraints I encountered on the first attempt. I did not find a go package that had a simple way to control the LG WebOSTV I have. Python seemed easy to run dynamic code with and I already had fully functioning implementation of what I wanted to achieve. For simplicity I want to use sqlite as the database and NATS as the message oriented middleware communication bridge between the Go API and the Python worker. RabbitMQ is solid but I want to avoid Erlang administration and NATS is go based.

I visited SwaggerHub and downloaded the go version of my remote procedure call API. I added a couple data types to represent a list of content to watch, installed packages for sqlite3 and NATS, and added logic for reading the database and publishing a message over NATS. I spent much more time on the Makefile and figuring out how to compile for the raspbery pi on my old 2009 macbook because of the sqlite3 library’s dependency on using CGO for compiling. I did not figure out how to get the right version of gcc on this mac and found that the raspberry pi did not struggle to hard to compile this very small API.

import (
	"database/sql"
	"encoding/json"
	"log"
	"net/http"
	"os"
	"strings"

	//the linter says i should write here
	_ "github.com/mattn/go-sqlite3"
	"github.com/nats-io/nats.go"
)

//Content object
type Content struct {
	Name string
}

//ContentList of content objects
type ContentList []Content

//WatchList returns a list of content that is triggerable
func WatchList(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusOK)
	home, err := os.UserHomeDir()
	db, err := sql.Open("sqlite3", home+"/contentScript.db")
	checkErr(err)

	// query
	rows, err := db.Query("SELECT name FROM watchcontentscripts;")
	checkErr(err)

	var name string
	var contentList ContentList

	for rows.Next() {
		err = rows.Scan(&name)
		checkErr(err)
		watchContent := Content{
			Name: name,
		}
		contentList = append(contentList, watchContent)
	}

	rows.Close() //good habit to close
	b, err := json.Marshal(contentList)
	w.Write(b)

}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

//WatchTrigger queues up a message for the python worker to activate
func WatchTrigger(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusOK)
	nc, err := nats.Connect("127.0.01:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	path := strings.Split(r.URL.String(), "/")
	message := path[len(path)-1]
	//Publish the message
	if err := nc.Publish("watch", []byte(message)); err != nil {
		log.Fatal(err)
	}
}

Executing Dynamic Code

For the Python worker I adapted the example provided by the NATS subscriber and placed my new worker code in the subscribe_handler function. There were a few extra things I did to make this easier and more secure. If an error happens in exec it will not be captured unless you redirect stdout while exec is occuring. Exec can take three arguments the first is the code in a string form to run the second are globals accessible to exec, and the third are local variables / functions accessible to exec. This allows constrainting what code can run ideally decreasing the risk of executing arbitrary code. Lastly since this worker is async and exec is synchronous I had to use

await loop.run_in_executor(None, exec,scriptUtils+script,userglobals)
 to ensure the scripts ran successfully. A world of warning with exec, if you plan on exposing this for users other than yourself you will need to do more work to lock it down and it is not fully secure just by reducing the amount of globals or locals that exec has access to. Since this project is only for me I’m not concerned with as much security.

import asyncio, signal, os, logging, time, sqlite3
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from pywebostv.connection import WebOSClient
from pywebostv.controls import ApplicationControl, SourceControl
from roku import Roku
from dataclasses import dataclass
from pathlib import Path
home = Path.home()

userglobals = {
    "__builtins__": {
        "ApplicationControl": ApplicationControl,
        "SourceControl": SourceControl,
        "print": print,
        "logging": logging,
        "WebOSClient": WebOSClient,
        "dataclass": dataclass,
        "time": time,
        "__build_class__":__build_class__,
        "__name__":__name__,
        "Roku": Roku
    }
}


async def run(loop):
    nc = NATS()
    async def closed_cb():
        print("Connection to NATS is closed.")
        await asyncio.sleep(0.1)
        loop.stop()

    options = {
        "servers": ["nats://localhost:4222"],
        "loop": loop,
        "closed_cb": closed_cb
    }
    await nc.connect(**options)
    print(f"Connected to NATS at {nc.connected_url.netloc}...")

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        conn = sqlite3.connect(home.joinpath("contentScript.db"))
        c = conn.cursor()
        c.execute("SELECT script FROM utilities WHERE name=?", ("tv_and_sources",))
        #Load the tv data class and the sources it has
        scriptUtils = c.fetchone()[0]
        c.execute("SELECT script FROM watchcontentscripts WHERE name=?", (data,))
        script = c.fetchone()[0]

        import sys
        from io import StringIO
        old_stdout = sys.stdout
        redirected_output = sys.stdout = StringIO()
        try:
            print(f"Executing command {data}")
            #Execute the commands to switch between sources
            await loop.run_in_executor(None, exec,scriptUtils+script,userglobals)
        except:
            raise
        finally:
            sys.stdout = old_stdout
        print(redirected_output.getvalue())
        c.close()

    # Simple publisher and async subscriber via coroutine.
    await nc.subscribe("watch", cb=message_handler)

    def signal_handler():
        if nc.is_closed:
            return
        print("Disconnecting...")
        loop.create_task(nc.close())

    for sig in ("SIGINT", "SIGTERM"):
        loop.add_signal_handler(getattr(signal, sig), signal_handler)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    try:
        loop.run_forever()
    finally:
        loop.close()

When I hit my API to watch content like MBC news the worker will pull the code from the database and execute it. My Television is a dataclass that stores a connection object to the TV’s client session as well as the connection to Roku. It’s only other responsibility is to invoke functions provided to it. From now on I can leave the go API and the Python worker alone once they’re deployed and managed by systemd unit files. If I want to change functionality I can do that by defining new names of content and adding new functions in one place and avoid a whole development cycle.

@dataclass
class Television:
    def __init__(self):
        self.connection = self.__connectTv__("SECRET_KEY","TVIP")
        self.roku = Roku('ROKU_IP',timeout=10)

    def __connectTv__(self,client_key,tv_ip):
        store = {'client_key': client_key}
        client = WebOSClient(tv_ip)
        client.connect()
        register = client.register(store)
        for status in register:
            logging.debug(status)
        return client

    def invoke(self, delegate):
        delegate(self)

def selectRoku(tv):
    source_control = SourceControl(tv.connection)
    for source in source_control.list_sources():
        if source.label == "Roku":
            source_control.set_source(source)

def selectSwitch(tv):
    source_control = SourceControl(tv.connection)
    for source in source_control.list_sources():
        if source.label == "Switch":
            source_control.set_source(source)

def selectChromecast(tv):
    source_control = SourceControl(tv.connection)
    for source in source_control.list_sources():
        if source.label == "Chromecast":
            source_control.set_source(source)

def selectYouTube(tv):
    app = ApplicationControl(tv.connection)
    apps = app.list_apps()
    for thisapp in apps:
        if thisapp.data.get('title') == "YouTube":
            app.launch(thisapp)

def selectMBC(tv):
    tv.roku['OnDemandKorea Plus'].launch()
    time.sleep(40)
    tv.roku.up()
    time.sleep(1)
    tv.roku.right()
    time.sleep(1)
    tv.roku.right()
    time.sleep(1)
    tv.roku.select()
    time.sleep(1)
    tv.roku.down()
    tv.roku.down()
    tv.roku.down()
    tv.roku.select()
    time.sleep(7)
    tv.roku.up()
    tv.roku.up()
    time.sleep(5)
    tv.roku.right()
    tv.roku.select()
    time.sleep(10)
    tv.roku.right()
    tv.roku.select()
    tv.roku.right()
    tv.roku.right()
    tv.roku.right()
    tv.roku.right()
    tv.roku.select()
    time.sleep(7)
    tv.roku.select()
    time.sleep(7)
    tv.roku.select()

def selectOnDemandKorea(tv):
    tv.roku['OnDemandKorea Plus'].launch()


tv = Television()
tv.invoke(selectMBC)
Contact me

Let’s Start a Project

Sitemap