123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- # This is part of Kaylee
- # -- this code is licensed GPLv3
- # Copyright 2015-2017 Clayton G. Hobbs
- # Portions Copyright 2013 Jezra
-
- """Dark Sky weather plugin for Kaylee
-
- This plugin provides weather information `powered by Dark Sky
- <https://darksky.net/poweredby/>`__. The user must provide a key for
- the Dark Sky API, as well as decimal latitude and longitude::
-
- ".darksky": {
- "api_key": "USER_API_KEY",
- "latitude": USER_LATITUDE,
- "longitude": USER_LONGITUDE
- }
-
- Weather information is cached for up to ``cache_max_age`` seconds
- (default 1800, half an hour). To conserve API calls, the cache is not
- updated until the first time the user requests weather information
- after the cache becomes stale.
- """
-
- import json
- import os
- import time
-
- import requests
-
- from .pluginbase import PluginBase
-
-
- class Plugin(PluginBase):
- """Main class for the Dark Sky weather plugin"""
-
- def __init__(self, config, name):
- """Initialize the Dark Sky weather plugin"""
- super().__init__(config, name)
-
- self._cache_filename = os.path.join(config.cache_dir, 'darksky.json')
- self._weather_url = 'https://api.darksky.net/forecast/{}/{},{}'.format(
- self.options['api_key'],
- self.options['latitude'],
- self.options['longitude']
- )
- try:
- self._cache_max_age = self.options['cache_max_age']
- except KeyError:
- self._cache_max_age = 1800
-
- self.commands = {
- 'whats the temperature': self._temperature,
- 'whats todays low': self._todays_low,
- 'whats todays high': self._todays_high,
- 'whats tomorrows high': self._tomorrows_high,
- 'whats the humidity': self._relative_humidity,
- 'whats the weather': self._current_conditions,
- 'whens sunset': self._sunset_time,
- 'hoos your weather provider': self._provider_credits
- }
-
- self.corpus_strings.update(self.commands)
-
- def _format_temperature(self, temperature, unit='Fahrenheit'):
- """Format a temperature for the TTS system"""
- return '{:.1f} degrees {}'.format(temperature, unit)
-
- def _temperature(self):
- return self._format_temperature(self.cache['currently']['temperature'])
-
- def _todays_low(self):
- return self._format_temperature(
- self.cache['daily']['data'][0]['temperatureMin'])
-
- def _todays_high(self):
- return self._format_temperature(
- self.cache['daily']['data'][0]['temperatureMax'])
-
- def _tomorrows_high(self):
- return self._format_temperature(
- self.cache['daily']['data'][1]['temperatureMax'])
-
- def _relative_humidity(self):
- return '{:.0f} percent'.format(
- 100 * self.cache['currently']['humidity'])
-
- def _current_conditions(self):
- return '{}, {}, relative humidity {}'.format(
- self.cache['currently']['summary'],
- self._temperature(),
- self._relative_humidity())
-
- def _sunset_time(self):
- # Get the time in a more useful structure
- t = time.localtime(self.cache['daily']['data'][0]['sunsetTime'])
-
- # Format each part of the time to be pronounced nicely
- hour = time.strftime('%I', t)
- if hour[0] == '0':
- hour = hour[1]
-
- minute = time.strftime('%M', t)
- if minute[0] == '0':
- if minute[1] == '0':
- minute = "o'clock"
- else:
- minute = 'O' + minute[1]
-
- ante_post = time.strftime('%p', t)
- if ante_post[0] == 'A':
- ante_post = 'AE ' + ante_post[1]
-
- # Put the parts together
- return '{} {} {}'.format(hour, minute, ante_post)
-
- def _provider_credits(self):
- return 'Powered by Dark Sky, https://darksky.net/poweredby/.'
-
- def _update_cache_if_stale(self):
- try:
- st = os.stat(self._cache_filename)
- except FileNotFoundError:
- # If there's no cache file, we need to get one
- self._update_cache()
- else:
- if time.time() - st.st_mtime > self._cache_max_age:
- self._update_cache()
-
- def _update_cache(self):
- r = requests.get(self._weather_url, stream=True)
- if r.status_code == 200:
- with open(self._cache_filename, 'wb') as f:
- for chunk in r:
- f.write(chunk)
-
- def _read_cache(self):
- with open(self._cache_filename, 'r') as f:
- return json.load(f)
-
- def confidence(self, text):
- """Return whether or not the command can be handled"""
- return 1 if text in self.corpus_strings else 0
-
- def handle(self, text):
- """Speak reasonably up-to-date weather information"""
- # Is there a matching command?
- if self.confidence(text):
- self._update_cache_if_stale()
- self.cache = self._read_cache()
- self.emit('tts', self.commands[text]())
- return True
- else:
- return False
|